Commit 0a3be672 authored by kaiyou's avatar kaiyou

Add a federation check for Matrix

parent be80f840
Pipeline #159 passed with stage
in 1 minute and 6 seconds
import os
import json
from amonit import util
from matrix_client import client as matrix_client
from matrix_client import room as matrix_room
def send_then_receive(context, account_from, account_to, roomid):
""" Send a message using Matrix, then check that it is received.
This is useful either on a single server or across homeservers to
check that federation work properly.
The canary is first sent, then looked for in the next execution.
"""
result = {"up": False, "message": ""}
if "canary" in context:
canary = context["canary"]
try:
client_from = matrix_client.MatrixHttpApi(
account_from["hs"], token=account_from["token"]
)
filter = json.dumps(
{"room":{"rooms":[roomid]}})
if "next_batch" in context:
since = context["next_batch"]
sync = client_from.sync(since=since, filter=filter)
else:
sync = client_from.sync(filter=filter)
result["next_batch"] = sync["next_batch"]
events = sync["rooms"]["join"][roomid]["timeline"]["events"]
for event in events:
if event["content"].get("body", "") == canary:
result["up"] = True
except Exception as error:
result = {"up": False, "message": str(error)}
# Then drop a new canary
canary = os.urandom(8).hex()
client_to = matrix_client.MatrixHttpApi(
account_to["hs"], token=account_to["token"]
)
client_to.send_message(roomid, canary)
result["canary"] = canary
return result
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment