scheduler.py 2.77 KB
Newer Older
kaiyou's avatar
kaiyou committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import celery
import redis
import json

from amonit import util


class Scheduler(object):
    """ Manages celery schedules and handles check responses
    """

    def __init__(self, config):
        self.config = config
        self.storage = redis.Redis.from_url(config["general"]["storage"])

    def schedule(self, app):
        """ Called by celery upon configuration so we can add beat tasks
        """
        for name, check in app.config["checks"].items():
20 21 22 23 24 25 26 27 28 29 30 31 32 33
            instances = check.get("instances", {name: {}})
            context = check.get("context", {})
            args = check.get("args", {})
            for name, instance in instances.items():
                instance_context = context.copy()
                instance_context.update(instance.get("context", {}))
                instance_args = args.copy()
                instance_args.update(instance.get("args", {}))
                app.add_periodic_task(
                    instance.get("schedule", check["schedule"]),
                    check_run.s(
                        name, instance.get("function", check["function"]),
                        instance_context, instance_args
                    )
kaiyou's avatar
kaiyou committed
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
                )

    def update(self, name, context, status, result):
        """ Handle a status update for a given check
        """
        value = self.storage.get(name)
        state = json.loads(value) if value else {"status": True, "count": 0}
        state.update(
            count=state["count"] + 1 if status == state["status"] else 0,
            status=status, result=result
        )
        self.storage.set(name, json.dumps(state))
        state.update(**context)
        self.notify(name, state)

    def notify(self, name, state):
        """ Dispatch notifications for a handled status update
        """
        for name, notifier in self.config["notifiers"].items():
            for field, value in notifier.get("filter", {}).items():
                if state.get(field, object()) != value:
                    break
            else:
                notify_run.s(
                    name, notifier["function"], state,
                    notifier.get("args", {})
                )()


@celery.current_app.task
def check_run(name, function, context, args):
    """ Celery task that runs a single check
    """
    print("Running check {}".format(name))
    try:
        result = util.resolve(function)(**args)
        status = True
    except Exception as error:
        result = str(error)
        status = False
    finally:
        celery.current_app.scheduler.update(name, context, status, result)


@celery.current_app.task
def notify_run(name, function, state, args):
    """ Celery task that runs a single notifier
    """
    print("Running notifier {}".format(function))
    util.resolve(function)(name, state, **args)