Commit f308b2e2 authored by kaiyou's avatar kaiyou

Implement a hook system, resolve items from config and clean the code

parent 5f38718d
import celery import celery
import redis
import json
from amonit import util, config from amonit import util, config, state
class State(object):
""" Stores the check state in a redis backend
"""
DEFAULT_STATE = {"up": True, "recurrence": 0}
def __init__(self, redis_url):
self.storage = redis.Redis.from_url(redis_url)
def __getitem__(self, key):
value = self.storage.get(key)
return json.loads(value) if value else State.DEFAULT_STATE.copy()
def __setitem__(self, key, value):
self.storage.set(key, json.dumps(value))
class Scheduler(object): class Scheduler(object):
""" Manages celery schedules and handles check responses """ Manages celery schedules and handles check responses
""" """
def __init__(self, config=None): def __init__(self, config):
self.config = config self.config = config
self.state = State(config["general"]["storage"]) self.state = state.State(config["general"]["storage"])
@classmethod @classmethod
def get_app(cls, conf=None): def get_app(cls, conf=None):
...@@ -43,38 +25,27 @@ class Scheduler(object): ...@@ -43,38 +25,27 @@ class Scheduler(object):
for checkid, check in self.config["checks"].items(): for checkid, check in self.config["checks"].items():
app.add_periodic_task( app.add_periodic_task(
check["schedule"], check["schedule"],
check_run.s( check_run.s(checkid, check["function"],
checkid, check["function"], check.get("context", {}), check.get("args", {})))
check.get("context", {}), check.get("args", {})
)
)
def update(self, checkid, context, result):
""" Handle a status update for a given check
"""
context.update(
recurrence=(context["recurrence"] + 1
if result["up"] == context["up"]
else 0),
**result
)
self.state[checkid] = context
self.notify(checkid, context)
def notify(self, checkid, context): def notify(self, checkid, context):
""" Dispatch notifications for a handled status update """ Dispatch notifications for a handled status update
""" """
for notifierid, notifier in self.config["notifiers"].items(): for notifierid, notifier in self.config["notifiers"].items():
for criteria in notifier.get("filters", []): if util.filter_match(context, notifier.get("filters", [])):
for field, value in criteria.items(): args = notifier.get("args", {})
if context.get(field, None) != value: context, args = self.hook("prenotify", context, args)
break notify_run.s(notifierid, notifier["function"], context, args)()
else:
notify_run.s( def hook(self, trigger, context, io):
notifierid, notifier["function"], """ Handle hooks, io is either the function args or result,
context, notifier.get("args", {}) depending if the hook is run pre or post function
)() """
break for hookid, hook in self.config.get("hooks", {}).items():
if trigger in hook["triggers"]:
function, hook_args = hook["function"], hook.get("args", {})
context, io = util.resolve(function)(context, io, **hook_args)
return context, io
@celery.current_app.task @celery.current_app.task
...@@ -82,12 +53,14 @@ def check_run(checkid, function, context, args): ...@@ -82,12 +53,14 @@ def check_run(checkid, function, context, args):
""" Celery task that runs a single check """ Celery task that runs a single check
""" """
print("Running check {}".format(checkid)) print("Running check {}".format(checkid))
stored = celery.current_app.scheduler.state[checkid] scheduler = celery.current_app.scheduler
stored.update(context) context, args = scheduler.hook("precheck", context, args)
stored.update(checkid=checkid, function=function) context = dict(scheduler.state[checkid], **context)
result = util.resolve(function)(stored, **args) context.update(checkid=checkid, function=function)
celery.current_app.scheduler.update(checkid, stored, result) result = util.resolve(function)(context, **args)
return result context, result = scheduler.hook("postcheck", context, result)
scheduler.state[checkid] = context
scheduler.notify(checkid, context)
@celery.current_app.task @celery.current_app.task
......
...@@ -12,17 +12,38 @@ def resolve(function, cache={}): ...@@ -12,17 +12,38 @@ def resolve(function, cache={}):
def render(template, data): def render(template, data):
""" Quickly render a jinja template with provided data dict
"""
return jinja2.Template(template).render(**data) return jinja2.Template(template).render(**data)
def filter_match(candidate, filters):
""" Apply a list of filters to a candidate dictionary
"""
if type(filters) is list:
return any(
filter_match(candidate, subfilters)
for subfilters in filters
)
elif type(filters) is dict:
return all(
filter_match(candidate.get(key, None), value)
for key, value in filters.items()
)
else:
return candidate == filters
def wrap(function): def wrap(function):
""" Wrap a check function to handle simple results and exceptions
"""
def replacement(*args, **kwargs): def replacement(*args, **kwargs):
try: try:
result = str(function(*args, **kwargs)) result = function(*args, **kwargs)
if type(result) is not dict or "up" not in result: if type(result) is not dict or "up" not in result:
result = { result = {
"up": True, "up": True,
"message": result "message": str(result)
} }
except Exception as error: except Exception as error:
result = { result = {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment