Commit ded1dc81 authored by kaiyou's avatar kaiyou

Make the mail check synchronous

parent c2aa22f3
Pipeline #195 passed with stage
in 1 minute and 20 seconds
import imaplib
import smtplib
import os
import time
from email.mime import text, multipart
......@@ -39,34 +40,11 @@ def send_then_receive(context, account_from, account_to):
""" Send a mail using SMTP, then check that it is received using IMAP
This is useful either on a single server or across servers to ensure
SMTP works properly from server to server.
The canary is first sent, then looked for in the next execution.
"""
# First check the canary if any
result = {"up": False, "message": ""}
if "canary" in context:
canary = context["canary"]
try:
client = get_imap_client(
account_to["host"], account_to.get("port", 143),
account_to.get("encrypt", None)
)
client.login(account_to["username"], account_to["password"])
client.select(account_to["inbox"])
filter = f'(UNSEEN SUBJECT "{canary}")'
status, data = client.uid("search", None, filter)
if status == "OK":
for uid in data[0].split():
client.uid("store", uid, "+FLAGS", "(\\Seen)")
result["up"] = True
if not result["up"]:
domain = account_to["username"].split("@")[1]
result["message"] = f"message not delivered to {domain}"
client.logout()
except Exception as error:
result = {"up": False, "message": str(error)}
# Then drop a new canary
# Generate and store a canary
canary = os.urandom(8).hex()
result = {"canary": canary}
# First send an email
smtp = get_smtp_client(
account_from["host"], account_from.get("port", 587),
account_from.get("encrypt", None)
......@@ -83,5 +61,30 @@ def send_then_receive(context, account_from, account_to):
message.as_string()
)
smtp.quit()
result["canary"] = canary
start_time = time.time()
# Then wait for the delivery
client = get_imap_client(
account_to["host"], account_to.get("port", 143),
account_to.get("encrypt", None)
)
client.login(account_to["username"], account_to["password"])
client.select(account_to["inbox"])
filter_string = f'(UNSEEN SUBJECT "{canary}")'
for _ in range(10):
status, data = client.uid("search", None, filter_string)
delay = int(time.time() - start_time)
print(status, data)
if status == "OK" and data[0]:
for uid in data[0].split():
client.uid("store", uid, "+FLAGS", "(\\Seen)")
result["up"] = True
result["delay"] = delay
result["message"] = f"message delivered in {delay} seconds"
break
time.sleep(10)
else:
result["up"] = False
domain = account_to["username"].split("@")[1]
result["message"] = f"message not delivered to {domain}"
# Return the result
return result
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment