mail.py 2.31 KB
Newer Older
kaiyou's avatar
kaiyou committed
1
import imaplib
2
3
import smtplib
import os
kaiyou's avatar
kaiyou committed
4
import time
kaiyou's avatar
kaiyou committed
5

6
from email.mime import text, multipart
kaiyou's avatar
kaiyou committed
7

8
9
from amonit.check import base

kaiyou's avatar
kaiyou committed
10

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_imap_client(host, port, encrypt):
    if encrypt == "starttls":
        client = imaplib.IMAP4(host=host, port=port)
        client.starttls()
    elif encrypt == "tls":
        client = imaplib.IMAP4_SSL(host=host, port=port)
    else:
        client = imaplib.IMAP4(host=host, port=port)
    return client


def get_smtp_client(host, port, encrypt):
    if encrypt == "starttls":
        client = smtplib.SMTP(host=host, port=port)
        client.starttls()
    elif encrypt == "tls":
        client = smtp.SMTP_SSL(host=host, port=port)
    else:
        client = smtplib.SMTP(host=host, port=port)
    return client


def imap_login(context, host, username, password, port=143, encrypt=None):
kaiyou's avatar
kaiyou committed
34
35
    """ Check that IMAP login works properly and accesses the INBOX
    """
36
    client = get_imap_client(host, port, encrypt)
kaiyou's avatar
kaiyou committed
37
38
39
    client.login(username, password)
    client.logout()

40

41
42
def send_mail(context, account, to, message):
    """ Send a message to the given account
43
44
    """
    smtp = get_smtp_client(
45
46
        account["host"], account.get("port", 587),
        account.get("encrypt", None)
47
    )
48
49
50
51
52
53
54
    smtp.login(account["username"], account["password"])
    mail = multipart.MIMEMultipart("alternative")
    mail["Subject"] = f"Monitoring canary {message}"
    mail["From"] = account["username"]
    mail["To"] = to
    mail.attach(text.MIMEText("Simple monitoring!", "text"))
    smtp.sendmail(account["username"], to, mail.as_string())
55
    smtp.quit()
56
57
58
59
60


def lookup_mail(context, account, to, message):
    """ Look for a specific message in a mailbox, then delete it
    """
kaiyou's avatar
kaiyou committed
61
    client = get_imap_client(
62
63
        account["host"], account.get("port", 143),
        account.get("encrypt", None)
kaiyou's avatar
kaiyou committed
64
    )
65
66
67
68
69
70
71
72
    client.login(account["username"], account["password"])
    client.select(account["inbox"])
    filter_string = f'UNSEEN SUBJECT {message}'
    status, data = client.uid("search", None, filter_string)
    if status == "OK" and data[0]:
        for uid in data[0].split():
            client.uid("store", uid, "+FLAGS", "(\\Deleted)")
        client.expunge()
kaiyou's avatar
kaiyou committed
73
74
    client.close()
    client.logout()
75
76
77
78
79
80
    return bool(status == "OK" and data[0])


@base.send_then_receive(send_mail, lookup_mail)
def send_then_receive():
    pass