Commit 3a332a74 authored by kaiyou's avatar kaiyou

Make mail and matrix checks more generic

parent b9ac6edd
Pipeline #209 passed with stage
in 1 minute and 19 seconds
import time
import os
def dummy(context):
""" Dummy check used for ticking
......@@ -6,3 +9,38 @@ def dummy(context):
"up": True,
"message": None
}
def send_then_receive(send_function, lookup_function):
""" Generate a canary, send it using the send function, then
look it up using the lookup function.
"""
def decorator(original_check):
def actual_check(context, account_from, account_to,
dest=None, timeout=120.0):
if dest is None:
dest = account_to["username"]
# Generate, store and send a canary
canary = os.urandom(8).hex()
result = {"canary": canary, **context}
send_function(result, account_from, dest, canary)
# Wait for the canary to be received or timeout
start_time = time.time()
delay = 0
found = False
while delay < timeout and not found:
found = lookup_function(result, account_to, dest, canary)
delay = int(time.time() - start_time)
time.sleep(10)
result["up"] = found
result["message"] = f"message delivered in {delay} seconds" \
if found else f"message not delivered"
print(result)
return result
actual_check.__name__ = original_check.__name__
return actual_check
return decorator
......@@ -5,6 +5,8 @@ import time
from email.mime import text, multipart
from amonit.check import base
def get_imap_client(host, port, encrypt):
if encrypt == "starttls":
......@@ -36,59 +38,43 @@ def imap_login(context, host, username, password, port=143, encrypt=None):
client.logout()
def send_then_receive(context, account_from, account_to, timeout=120):
""" Send a mail using SMTP, then check that it is received using IMAP
This is useful either on a single server or across servers to ensure
SMTP works properly from server to server.
def send_mail(context, account, to, message):
""" Send a message to the given account
"""
# Generate and store a canary
canary = os.urandom(8).hex()
result = {"canary": canary}
# First send an email
smtp = get_smtp_client(
account_from["host"], account_from.get("port", 587),
account_from.get("encrypt", None)
)
smtp.login(account_from["username"], account_from["password"])
message = multipart.MIMEMultipart("alternative")
message["Subject"] = f"Monitoring canary {canary}"
message["From"] = account_from["username"]
message["To"] = account_to["username"]
message.attach(text.MIMEText("Simple monitoring!", "text"))
smtp.sendmail(
account_from["username"],
account_to["username"],
message.as_string()
account["host"], account.get("port", 587),
account.get("encrypt", None)
)
smtp.login(account["username"], account["password"])
mail = multipart.MIMEMultipart("alternative")
mail["Subject"] = f"Monitoring canary {message}"
mail["From"] = account["username"]
mail["To"] = to
mail.attach(text.MIMEText("Simple monitoring!", "text"))
smtp.sendmail(account["username"], to, mail.as_string())
smtp.quit()
start_time = time.time()
# Then wait for the delivery
def lookup_mail(context, account, to, message):
""" Look for a specific message in a mailbox, then delete it
"""
client = get_imap_client(
account_to["host"], account_to.get("port", 143),
account_to.get("encrypt", None)
account["host"], account.get("port", 143),
account.get("encrypt", None)
)
client.login(account_to["username"], account_to["password"])
client.select(account_to["inbox"])
filter_string = f'UNSEEN SUBJECT {canary}'
delay = 0
while delay < timeout:
status, data = client.uid("search", None, filter_string)
delay = int(time.time() - start_time)
if status == "OK" and data[0]:
for uid in data[0].split():
client.uid("store", uid, "+FLAGS", "(\\Deleted)")
client.expunge()
result["up"] = True
result["delay"] = delay
result["message"] = f"message delivered in {delay} seconds"
break
time.sleep(10)
client.check()
if "up" not in result:
result["up"] = False
domain = account_to["username"].split("@")[1]
result["message"] = f"message not delivered to {domain} ({canary})"
client.login(account["username"], account["password"])
client.select(account["inbox"])
filter_string = f'UNSEEN SUBJECT {message}'
status, data = client.uid("search", None, filter_string)
if status == "OK" and data[0]:
for uid in data[0].split():
client.uid("store", uid, "+FLAGS", "(\\Deleted)")
client.expunge()
client.close()
client.logout()
# Return the result
return result
return bool(status == "OK" and data[0])
@base.send_then_receive(send_mail, lookup_mail)
def send_then_receive():
pass
......@@ -4,44 +4,38 @@ import time
from matrix_client import client as matrix_client
from amonit.check import base
def send_then_receive(context, account_from, account_to, roomid, timeout=120):
""" Send a message using Matrix, then check that it is received.
This is useful either on a single server or across homeservers to
check that federation work properly.
The canary is first sent, then looked for in the next execution.
def get_client(account):
""" Create a Matrix client
"""
# Generate and store a canary
canary = os.urandom(8).hex()
result = {"canary": canary}
# First drop the canary
client_to = matrix_client.MatrixHttpApi(
account_to["hs"], token=account_to["token"]
return matrix_client.MatrixHttpApi(
account["hs"], token=account["token"]
)
client_to.send_message(roomid, canary)
start_time = time.time()
# Then wait for the delivery
client_from = matrix_client.MatrixHttpApi(
account_from["hs"], token=account_from["token"]
)
filter_string = json.dumps({"room":{"rooms":[roomid]}})
delay = 0
next_batch = context.get("next_batch", None)
while delay < timeout and "up" not in result:
sync = client_from.sync(since=next_batch, filter=filter_string)
delay = int(time.time() - start_time)
next_batch = sync["next_batch"]
if roomid not in sync["rooms"]["join"]:
continue
for event in sync["rooms"]["join"][roomid]["timeline"]["events"]:
if event["content"].get("body", "") == canary:
result["up"] = True
result["message"] = f"message delivered in {delay} seconds"
if "up" not in result:
result["up"] = False
result["message"] = f"message not delivered to {account_to['hs']}"
# Store the next batch for next execution
result["next_batch"] = next_batch
return result
def send_to_matrix(context, account, roomid, message):
""" Send a message to a Matrix room
"""
client = get_client(account)
client.send_message(roomid, message)
def lookup_in_matrix(context, account, roomid, message):
""" Look for the message in recent messages
"""
client = get_client(account)
sync = client.sync(since=context.get("next_batch", None),
filter=json.dumps({"room":{"rooms":[roomid]}}))
context["next_batch"] = sync["next_batch"]
if roomid not in sync["rooms"]["join"]:
return False
for event in sync["rooms"]["join"][roomid]["timeline"]["events"]:
if event["content"].get("body", "") == message:
return True
@base.send_then_receive(send_to_matrix, lookup_in_matrix)
def send_then_receive():
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment