Commit 7c5c99a9 authored by kaiyou's avatar kaiyou

Basic monitoring engine

parent 94409a58
import celery
import yaml
import os
from amonit import scheduler
def schedule(sender, **kwargs):
def main(config=None):
# Load the configuration
if not config:
with open(os.environ.get("AMONIT_CONFIG", "amonit.yaml")) as handle:
config = yaml.load(handle)
# Create and initialize the celery app
app = celery.Celery('amonit', broker=config["general"]["broker"])
app.config = config
app.scheduler = scheduler.Scheduler(config)
return app
import celery
import redis
import json
from amonit import util
class Scheduler(object):
""" Manages celery schedules and handles check responses
def __init__(self, config):
self.config = config = redis.Redis.from_url(config["general"]["storage"])
def schedule(self, app):
""" Called by celery upon configuration so we can add beat tasks
for name, check in app.config["checks"].items():
name, check["function"],
check.get("context", {}), check.get("args", {})
def update(self, name, context, status, result):
""" Handle a status update for a given check
value =
state = json.loads(value) if value else {"status": True, "count": 0}
count=state["count"] + 1 if status == state["status"] else 0,
status=status, result=result
), json.dumps(state))
self.notify(name, state)
def notify(self, name, state):
""" Dispatch notifications for a handled status update
for name, notifier in self.config["notifiers"].items():
for field, value in notifier.get("filter", {}).items():
if state.get(field, object()) != value:
name, notifier["function"], state,
notifier.get("args", {})
def check_run(name, function, context, args):
""" Celery task that runs a single check
print("Running check {}".format(name))
result = util.resolve(function)(**args)
status = True
except Exception as error:
result = str(error)
status = False
celery.current_app.scheduler.update(name, context, status, result)
def notify_run(name, function, state, args):
""" Celery task that runs a single notifier
print("Running notifier {}".format(function))
util.resolve(function)(name, state, **args)
import importlib
def resolve(function, cache={}):
""" Utility function that resolves a fully qualified function name
if function not in cache:
module, name = function.rsplit(".", 1)
cache[function] = getattr(importlib.import_module(module), name)
return cache[function]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment