scheduler.py 2.29 KB
Newer Older
kaiyou's avatar
kaiyou committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
import celery
import redis
import json

from amonit import util


class Scheduler(object):
    """ Manages celery schedules and handles check responses
    """

    def __init__(self, config):
        self.config = config
        self.storage = redis.Redis.from_url(config["general"]["storage"])

    def schedule(self, app):
        """ Called by celery upon configuration so we can add beat tasks
        """
        for name, check in app.config["checks"].items():
            app.add_periodic_task(
                check["schedule"],
                check_run.s(
                    name, check["function"],
                    check.get("context", {}), check.get("args", {})
                )
            )

    def update(self, name, context, status, result):
        """ Handle a status update for a given check
        """
        value = self.storage.get(name)
        state = json.loads(value) if value else {"status": True, "count": 0}
        state.update(
            count=state["count"] + 1 if status == state["status"] else 0,
            status=status, result=result
        )
        self.storage.set(name, json.dumps(state))
        state.update(**context)
        self.notify(name, state)

    def notify(self, name, state):
        """ Dispatch notifications for a handled status update
        """
        for name, notifier in self.config["notifiers"].items():
            for field, value in notifier.get("filter", {}).items():
                if state.get(field, object()) != value:
                    break
            else:
                notify_run.s(
                    name, notifier["function"], state,
                    notifier.get("args", {})
                )()


@celery.current_app.task
def check_run(name, function, context, args):
    """ Celery task that runs a single check
    """
    print("Running check {}".format(name))
    try:
        result = util.resolve(function)(**args)
        status = True
    except Exception as error:
        result = str(error)
        status = False
    finally:
        celery.current_app.scheduler.update(name, context, status, result)


@celery.current_app.task
def notify_run(name, function, state, args):
    """ Celery task that runs a single notifier
    """
    print("Running notifier {}".format(function))
    util.resolve(function)(name, state, **args)