Commit b7ad2573 authored by kaiyou's avatar kaiyou
Browse files

Add a check to send, then receive mails

parent c5b7bc8f
Pipeline #157 passed with stage
in 1 minute and 3 seconds
import imaplib
import smtplib
import os
from email.mime import text, multipart
from amonit import util
def get_imap_client(host, port, encrypt):
if encrypt == "starttls":
client = imaplib.IMAP4(host=host, port=port)
elif encrypt == "tls":
client = imaplib.IMAP4_SSL(host=host, port=port)
client = imaplib.IMAP4(host=host, port=port)
return client
def get_smtp_client(host, port, encrypt):
if encrypt == "starttls":
client = smtplib.SMTP(host=host, port=port)
elif encrypt == "tls":
client = smtp.SMTP_SSL(host=host, port=port)
client = smtplib.SMTP(host=host, port=port)
return client
def imap_login(context, host, username, password, port=143, startls=True):
def imap_login(context, host, username, password, port=143, encrypt=None):
""" Check that IMAP login works properly and accesses the INBOX
client = imaplib.IMAP4(host=host, port=port)
client = get_imap_client(host, port, encrypt)
client.login(username, password)
def send_then_receive(context, account_from, account_to):
""" Send a mail using SMTP, then check that it is received using IMAP
This is useful either on a single server or across servers to ensure
SMTP works properly from server to server.
The canary is first sent, then looked for in the next execution.
# First check the canary if any
result = {"up": False, "message": ""}
if "canary" in context:
canary = context["canary"]
client = get_imap_client(
account_to["host"], account_to.get("port", 143),
account_to.get("encrypt", None)
client.login(account_to["username"], account_to["password"])["inbox"])
filter = f'(UNSEEN SUBJECT "{canary}")'
status, data = client.uid("search", None, filter)
if status == "OK":
for uid in data[0].split():
client.uid("store", uid, "+FLAGS", "(\\Seen)")
result["up"] = True
except Exception as error:
result = {"up": False, "message": str(error)}
# Then drop a new canary
canary = os.urandom(8).hex()
smtp = get_smtp_client(
account_from["host"], account_from.get("port", 587),
account_from.get("encrypt", None)
smtp.login(account_from["username"], account_from["password"])
message = multipart.MIMEMultipart("alternative")
message["Subject"] = f"Monitoring canary {canary}"
message["From"] = account_from["username"]
message["To"] = account_to["username"]
message.attach(text.MIMEText("Simple monitoring!", "text"))
result["canary"] = canary
return result
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment