Commit ccb77521 authored by kaiyou's avatar kaiyou

Improve upon the concept of context

- Merge the concept of context and state
- Allow for multiple isntances anywhere in the conf
- Introduce up and recurrence in the context
parent 9a8fb231
Pipeline #131 passed with stage
in 54 seconds
import celery
import yaml
import os
from amonit import scheduler
from amonit import config, scheduler
def schedule(sender, **kwargs):
sender.scheduler.schedule(sender)
def main(config=None):
def main(conf=None):
# Load the configuration
if not config:
with open(os.environ.get("AMONIT_CONFIG", "amonit.yaml")) as handle:
config = yaml.load(handle)
if not conf:
conf = config.load()
# Create and initialize the celery app
app = celery.Celery('amonit', broker=config["general"]["broker"])
app.config = config
app.scheduler = scheduler.Scheduler(config)
app = celery.Celery('amonit', broker=conf["general"]["broker"])
app.scheduler = scheduler.Scheduler(conf)
app.on_after_configure.connect(schedule)
return app
......
import requests
def simple(url, expected_code=None, expected_string=None):
def simple(context, url, expected_code=None):
""" Run a GET http request and test the HTTP response code
"""
req = requests.get(url)
if expected_code and req.status_code != expected_code:
raise RuntimeError("HTTP code should be {}".format(expected_code))
if expected_string and string not in req.text:
raise RuntimeError("Result did not contain expected string")
return {
"up": req.status_code != (expected_code or 200),
"code": req.status_code,
"message": "the page returned code {}".format(req.status_code)
}
import yaml
import os
def expand(conf):
""" Expand a configuration object according to the 'instances' field
"""
for key, value in conf.copy().items():
if type(value) is dict:
if "instances" in value:
instances = value["instances"]
del conf[key]["instances"]
conf.update(expand({
key + "_" + instance_key: {**value, **instance}
for instance_key, instance in instances.items()
}))
del conf[key]
else:
expand(conf[key])
return conf
def load():
""" Load the configuration from a yaml file
"""
with open(os.environ.get("AMONIT_CONFIG", "amonit.yaml")) as handle:
conf = yaml.load(handle)
return expand(conf)
......@@ -3,11 +3,9 @@ from matrix_client import client as matrix_client
from amonit import util
def mail(name, state, title, body):
print(name, state, title, body)
def matrix_room(name, state, hs, token, roomid, message):
def matrix_room(context, hs, token, roomid, message):
""" Send a message to a given Matrix room
"""
matrix = matrix_client.MatrixHttpApi(hs, token=token)
message = util.render(message, state)
message = util.render(message, context)
matrix.send_message(roomid, message)
......@@ -5,82 +5,85 @@ import json
from amonit import util
class State(object):
""" Stores the check state in a redis backend
"""
DEFAULT_STATE = {"up": True, "recurrence": 0}
def __init__(self, redis_url):
self.storage = redis.Redis.from_url(redis_url)
def __getitem__(self, key):
value = self.storage.get(key)
return json.loads(value) if value else State.DEFAULT_STATE.copy()
def __setitem__(self, key, value):
self.storage.set(key, json.dumps(value))
class Scheduler(object):
""" Manages celery schedules and handles check responses
"""
def __init__(self, config):
def __init__(self, config=None):
self.config = config
self.storage = redis.Redis.from_url(config["general"]["storage"])
self.state = State(config["general"]["storage"])
def schedule(self, app):
""" Called by celery upon configuration so we can add beat tasks
"""
for name, check in app.config["checks"].items():
instances = check.get("instances", {None: {}})
context = check.get("context", {})
args = check.get("args", {})
for instance_name, instance in instances.items():
instance_context = context.copy()
instance_context.update(instance.get("context", {}))
instance_args = args.copy()
instance_args.update(instance.get("args", {}))
app.add_periodic_task(
instance.get("schedule", check["schedule"]),
check_run.s(
"{}[{}]".format(name, instance_name)
if instance_name else name,
instance.get("function", check["function"]),
instance_context, instance_args
)
for checkid, check in self.config["checks"].items():
app.add_periodic_task(
check["schedule"],
check_run.s(
checkid, check["function"],
check.get("context", {}), check.get("args", {})
)
)
def update(self, name, context, status, result):
def update(self, checkid, context, result):
""" Handle a status update for a given check
"""
value = self.storage.get(name)
state = json.loads(value) if value else {"status": True, "count": 0}
state.update(
count=state["count"] + 1 if status == state["status"] else 0,
name=name, status=status, result=result
context.update(
recurrence=(context["recurrence"] + 1
if result["up"] == context["up"]
else 0),
**result
)
self.storage.set(name, json.dumps(state))
state.update(**context)
self.notify(name, state)
self.state[checkid] = context
self.notify(checkid, context)
def notify(self, name, state):
def notify(self, checkid, context):
""" Dispatch notifications for a handled status update
"""
for name, notifier in self.config["notifiers"].items():
for field, value in notifier.get("filter", {}).items():
if state.get(field, object()) != value:
for notifierid, notifier in self.config["notifiers"].items():
for criteria in notifier.get("filters", []):
for field, value in criteria.items():
if context.get(field, None) != value:
break
else:
notify_run.s(
notifierid, notifier["function"],
context, notifier.get("args", {})
)()
break
else:
notify_run.s(
name, notifier["function"], state,
notifier.get("args", {})
)()
@celery.current_app.task
def check_run(name, function, context, args):
def check_run(checkid, function, context, args):
""" Celery task that runs a single check
"""
print("Running check {}".format(name))
try:
result = util.resolve(function)(**args)
status = True
except Exception as error:
result = str(error)
status = False
finally:
celery.current_app.scheduler.update(name, context, status, result)
print("Running check {}".format(checkid))
context.update(celery.current_app.scheduler.state[checkid])
context.update(checkid=checkid, function=function)
result = util.resolve(function)(context, **args)
celery.current_app.scheduler.update(checkid, context, result)
@celery.current_app.task
def notify_run(name, function, state, args):
def notify_run(notifierid, function, context, args):
""" Celery task that runs a single notifier
"""
print("Running notifier {}".format(function))
util.resolve(function)(name, state, **args)
print("Running notifier {}".format(notifierid))
util.resolve(function)(context, **args)
......@@ -13,3 +13,18 @@ def resolve(function, cache={}):
def render(template, data):
return jinja2.Template(template).render(**data)
def wrap(function):
def replacement(context, *args, **kwargs):
try:
message = str(function(*args, **kwargs))
up = True
except Exception as error:
message = str(error)
up = False
return {
"up": up,
"message": message
}
return replacement
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment