scheduler.py 2.9 KB
Newer Older
kaiyou's avatar
kaiyou committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import celery
import redis
import json

from amonit import util


class Scheduler(object):
    """ Manages celery schedules and handles check responses
    """

    def __init__(self, config):
        self.config = config
        self.storage = redis.Redis.from_url(config["general"]["storage"])

    def schedule(self, app):
        """ Called by celery upon configuration so we can add beat tasks
        """
        for name, check in app.config["checks"].items():
kaiyou's avatar
kaiyou committed
20
            instances = check.get("instances", {None: {}})
21 22
            context = check.get("context", {})
            args = check.get("args", {})
kaiyou's avatar
kaiyou committed
23
            for instance_name, instance in instances.items():
24 25 26 27 28 29 30
                instance_context = context.copy()
                instance_context.update(instance.get("context", {}))
                instance_args = args.copy()
                instance_args.update(instance.get("args", {}))
                app.add_periodic_task(
                    instance.get("schedule", check["schedule"]),
                    check_run.s(
kaiyou's avatar
kaiyou committed
31 32 33
                        "{}[{}]".format(name, instance_name)
                          if instance_name else name,
                        instance.get("function", check["function"]),
34 35
                        instance_context, instance_args
                    )
kaiyou's avatar
kaiyou committed
36 37 38 39 40 41 42 43 44
                )

    def update(self, name, context, status, result):
        """ Handle a status update for a given check
        """
        value = self.storage.get(name)
        state = json.loads(value) if value else {"status": True, "count": 0}
        state.update(
            count=state["count"] + 1 if status == state["status"] else 0,
45
            name=name, status=status, result=result
kaiyou's avatar
kaiyou committed
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
        )
        self.storage.set(name, json.dumps(state))
        state.update(**context)
        self.notify(name, state)

    def notify(self, name, state):
        """ Dispatch notifications for a handled status update
        """
        for name, notifier in self.config["notifiers"].items():
            for field, value in notifier.get("filter", {}).items():
                if state.get(field, object()) != value:
                    break
            else:
                notify_run.s(
                    name, notifier["function"], state,
                    notifier.get("args", {})
                )()


@celery.current_app.task
def check_run(name, function, context, args):
    """ Celery task that runs a single check
    """
    print("Running check {}".format(name))
    try:
        result = util.resolve(function)(**args)
        status = True
    except Exception as error:
        result = str(error)
        status = False
    finally:
        celery.current_app.scheduler.update(name, context, status, result)


@celery.current_app.task
def notify_run(name, function, state, args):
    """ Celery task that runs a single notifier
    """
    print("Running notifier {}".format(function))
    util.resolve(function)(name, state, **args)