dmarc-analyzer/analyze.py

500 lines
17 KiB
Python
Executable File

#!/usr/bin/env python
from argparse import ArgumentParser, Namespace
from collections import namedtuple
from datetime import datetime
from email import message_from_bytes
from email.header import decode_header
from email.message import EmailMessage, Message
from gzip import BadGzipFile
from gzip import open as gzip_open
from imaplib import IMAP4_SSL
from io import BytesIO
from operator import attrgetter
from pathlib import Path
from smtplib import SMTP
from sqlite3 import Connection, connect
from time import time
from typing import Union
from xml.etree.ElementTree import XML, Element, ParseError
from zipfile import ZipFile, is_zipfile
from dns.rdatatype import PTR
from dns.resolver import NXDOMAIN, NoAnswer, Timeout, resolve
from dns.reversename import from_address
from magic import from_buffer
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
from yaml import load
_rowid_getter = attrgetter('rowid')
_INSERTED_FIELDS = (
'checked_at, org_name, domain, header_froms, report_id, ' +
'unixtime_start, unixtime_end, offending_ip, count, failed_types')
_FETCHED_FIELDS = f'rowid, {_INSERTED_FIELDS}'
LogRow = namedtuple(typename='LogRow', field_names=_FETCHED_FIELDS)
_CFG_TEMPLATE = """\
sqlite_path: dmarc-analyzer.db
# Imap account where the aggregate reports are stored in
imap:
host: mail.example.com
username: imap-username
password: my-super-secret-password
folder_path: INBOX
trash_path: Trash
# Host settings for sending email reports
smtp:
host: mail.example.com
port: 587
login: sender-login
password: password
# Settings for the report
report:
from_address: reportaddress@example.com
to_address: reportaddress@example.com
subject: DMARC Report analysis output
"""
_DESCRIPTION = (
'DMARC Report analyzer and reporter. ' +
'The default mode is to analyze what\'s in the IMAP folder.')
_STARTTIME = time()
ListOfLogRow = list[LogRow]
_EMAIL_HEADER = """\
Since the last log report, you had the following failures in the DMARC \
aggregate reports:
"""
_EMAIL_ITEMS = """\
Org name: {org_name}
Checked date begin: {datetime_start}
Checked date end: {datetime_end}
Checked domain rule: {domain}
Sender header(s): {header_froms}
Offending IP: {offending_ip} ({hostnames})
Count: {count}
Failed types: {failed_types}
"""
class DmarcReporterBase(Exception):
'Base exception for the report analyser.'
class ServerException(DmarcReporterBase):
'Raised when the IMAP server says something is wrong.'
class ElementNotFound(DmarcReporterBase):
'Raised when an XML `Element` is expected to be not `None`.'
class ElementTextNotFound(DmarcReporterBase):
"""
Raised when a path within an XML `Element` is expected to have a text
property.
"""
def _get_sql_connection(config: dict) -> Connection:
'Return an sqlite `Connection` object from the settings.'
return connect(database=Path(config['sqlite_path']).absolute())
def _get_loaded_cfg(parsed_args: Namespace) -> dict:
with Path(parsed_args.cfg_file).absolute().open('r') as fd:
data = load(stream=fd, Loader=Loader)
return data
def _init_cfg(parsed_args: Namespace):
'Initialize a config file template.'
path_cfg = Path(parsed_args.cfg_file).absolute()
with path_cfg.open('w') as fd:
fd.write(_CFG_TEMPLATE)
print(
f'Config file template written to {path_cfg}. Please edit it ' +
'before running this program.')
def _init_db(config: dict):
'Initialize a blank DB.'
conn = connect(database=Path(config['sqlite_path']).absolute())
cursor = conn.cursor()
cursor.execute('DROP TABLE IF EXISTS reports')
cursor.execute(
'CREATE TABLE reports (checked_at INTEGER, org_name TEXT, ' +
'domain TEXT, header_froms TEXT, report_id TEXT, ' +
'unixtime_start INTEGER, unixtime_end INTEGER, offending_ip TEXT, ' +
'count INTEGER, failed_types TEXT)')
conn.commit()
conn.close()
print(
'Database initialized. You can now run the program without --init-db')
class XmlParser(object):
'Parse one report here.'
def __init__(self, content: bytes, sql_conn: Connection):
self._content = content
self._sql_conn = sql_conn
self._time = datetime
def _get_element_or_fail(self, element: Element, path: str) -> Element:
"""
Return an the text of the element raise `ElementNotFound` if not found.
"""
result = element.find(path=path)
if result is None:
raise ElementNotFound(f'{path} not found within {element}')
return result
def _get_text_or_fail(self, element: Element, path: str) -> str:
"""
Return an the text of the element raise `ElementTextNotFound` if
not found.
"""
element = self._get_element_or_fail(element=element, path=path)
text = element.text
if text is None:
raise ElementTextNotFound(f'{path} within {element} has no text')
return text
def _parse_header(self):
'Parse data headers.'
date_range = self._get_element_or_fail(
element=self._root, path='./report_metadata/date_range')
text_begin = self._get_text_or_fail(element=date_range, path='begin')
self._unixtime_start = int(text_begin)
text_end = self._get_text_or_fail(element=date_range, path='end')
self._unixtime_end = int(text_end)
self._org_name = self._get_text_or_fail(
element=self._root, path='./report_metadata/org_name')
self._domain = self._get_text_or_fail(
element=self._root, path='./policy_published/domain')
self._report_id = self._get_text_or_fail(
element=self._root, path='./report_metadata/report_id')
def _note_failed_records(
self, ip: str, failed: list, count: int, header_froms: list):
'Add the failed records to the sqlite DB.'
_header_froms = ', '.join(header_froms)
_failed = ', '.join(failed)
with self._sql_conn as conn:
params = (
_STARTTIME, self._org_name, self._domain, _header_froms,
self._report_id, self._unixtime_start, self._unixtime_end, ip,
count, _failed)
conn.execute(
f'INSERT INTO reports({_INSERTED_FIELDS}) VALUES ' +
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', params)
self._sql_conn.commit()
def _parse_record(self, record: Element):
'Parse one record.'
policy = self._get_element_or_fail(
element=record, path='row/policy_evaluated')
failed = [x.tag for x in policy.findall(path='.//*[.="fail"]')]
if not failed:
return
ip = self._get_text_or_fail(element=record, path='row/source_ip')
count = int(self._get_text_or_fail(element=record, path='row/count'))
header_froms = [
x.text for x in
record.findall(path='identifiers/header_from')]
self._note_failed_records(
ip=ip, failed=failed, count=count, header_froms=header_froms)
def _parse_records(self):
'Parse report result records.'
for record in self._root.findall(path='record'):
self._parse_record(record=record)
def _get_root_or_fail(self) -> bool:
'Try to parse the XML, bail out if it\'s invalid.'
try:
self._root = XML(text=self._content)
return True
except ParseError:
# Silently fail, we don't care
return False
def process(self):
'Start processing.'
if not self._get_root_or_fail():
return
self._parse_header()
self._parse_records()
class ImapHandler(object):
'Handling the IMAP connection'
_conn: IMAP4_SSL
_all_messages: list[bytes]
def __init__(self, config: dict):
self._config = config
self._sql_conn = _get_sql_connection(config=config)
def _get_subject(self, email: Message) -> str:
'Extract and return the subject.'
subject = decode_header(header=email['subject'])
for text, encoding in subject:
if type(text) is str:
return text
return text.decode(encoding=encoding) \
if encoding else text.decode()
return ''
def _get_extracted_gzip_content(self, content: bytes) -> tuple[bytes, ...]:
"""
Load and return the extracted XML content of the zip file in the
message.
"""
result = tuple()
fd = BytesIO(initial_bytes=content)
try:
with gzip_open(filename=fd) as gzip_attachment:
result += (gzip_attachment.read(),)
except BadGzipFile:
return result
# A finally statement would do here but whatever
return result
def _get_extracted_zip_content(self, content: bytes) -> tuple[bytes, ...]:
"""
Load and return the extracted XML content of the zip file in the
message.
"""
result = tuple()
fd = BytesIO(initial_bytes=content)
if not is_zipfile(filename=fd):
return result
with ZipFile(file=fd) as zip_attachment:
for zipped_path in zip_attachment.infolist():
if not zipped_path.filename.endswith('.xml'):
continue
with zip_attachment.open(name=zipped_path) as zip_fd:
result += (zip_fd.read(),)
return result
def _get_message_content_type_and_content(
self, message: Message) -> tuple[str, bytes]:
'Return a content type of a `Message` by using filemagic.'
with BytesIO(initial_bytes=message.get_payload(decode=True)) as fd:
content = fd.read()
mime_type = from_buffer(buffer=content, mime=True)
return mime_type, content
def _walk_content(self, message: Message) -> tuple[bytes, ...]:
'Walk the content of the message recursively.'
result = tuple()
if message.is_multipart():
for part in message.walk():
if part == message:
continue
result += self._walk_content(message=part)
else:
if message.get_content_disposition() is None \
or message.get_filename() is None:
return result
content_type, content = \
self._get_message_content_type_and_content(message=message)
if content_type == 'application/zip':
result += self._get_extracted_zip_content(content=content)
elif content_type == 'application/gzip':
result += self._get_extracted_gzip_content(content=content)
return result
def _parse_message(self, num: bytes) -> Union[tuple[bytes, ...], None]:
'Return the parsed XML content from the parsed message.'
response, msg = self._conn.fetch(
message_set=num.decode(encoding='utf-8'), message_parts='(RFC822)')
if response != 'OK' or msg[0] is None or type(msg[0]) is not tuple:
return
message = message_from_bytes(s=msg[0][1])
# subject = self._get_subject(email=message)
extracted_content = self._walk_content(message=message)
return extracted_content
def _move_processed_messages(self, to_be_moved: list[bytes]):
'Move processed messages to the designated `Trash`.'
message_set = b','.join(to_be_moved).decode()
self._conn.copy(
message_set=message_set,
new_mailbox=self._config['imap']['trash_path'])
self._conn.store(
message_set=message_set, command='+FLAGS', flags=r'\Deleted')
self._conn.expunge()
def _login_and_run(self):
'Start the IMAP conversation.'
response, result = self._conn.login(
user=self._config['imap']['username'],
password=self._config['imap']['password'])
response, result = self._conn.select(
mailbox=self._config['imap']['folder_path'])
if response != 'OK' or result[0] is None:
raise ServerException()
self._no_messages = int(result[0])
response, self._all_messages = self._conn.search(None, 'ALL')
if response != 'OK':
raise ServerException()
to_be_moved = []
if not len(self._all_messages):
return
response_firstitem = self._all_messages[0]
if not isinstance(response_firstitem, bytes):
return
for num in response_firstitem.split():
extracted_content = self._parse_message(num=num)
if not extracted_content:
continue
to_be_moved.append(num)
for content_item in extracted_content:
parser = XmlParser(
content=content_item, sql_conn=self._sql_conn)
parser.process()
if to_be_moved:
self._move_processed_messages(to_be_moved=to_be_moved)
def process(self):
'Start processing.'
with IMAP4_SSL(host=self._config['imap']['host']) as self._conn:
self._login_and_run()
self._sql_conn.close()
class ReportSender(object):
'Sending reports per email.'
def __init__(self, config: dict):
self._config = config
self._sql_conn = _get_sql_connection(config=config)
def _get_records(self) -> ListOfLogRow:
'Read records from the DB.'
with self._sql_conn as conn:
cursor = conn.execute(f'SELECT {_FETCHED_FIELDS} FROM reports')
rows = list()
while True:
row = cursor.fetchone()
if row is None:
break
rows.append(LogRow(*row))
return rows
def _filter_and_sort_rows(self) -> ListOfLogRow:
'Filter out duplicates and sort the results by starttime.'
filter_state = set()
def filter_param(item: LogRow):
'Filter out duplicates, add new ones.'
key = ','.join([item.org_name, item.report_id, item.offending_ip])
if key in filter_state:
return False
filter_state.add(key)
return True
return sorted(
filter(filter_param, self._rows),
key=lambda x: x.unixtime_start)
def _get_hostnames(self, ip: str) -> str:
'Return a comma separated list of resolved hostnames to an IP.'
qname = from_address(text=ip)
try:
result = resolve(qname=qname, rdtype=PTR)
except (NXDOMAIN, Timeout, NoAnswer):
return '-'
return ', '.join([x.to_text().rstrip('.') for x in result])
def _get_email_body(self, filtered_rows: ListOfLogRow) -> str:
'Compose the email body from the filtered rows.'
text = _EMAIL_HEADER
for row in filtered_rows:
datetime_start = datetime.fromtimestamp(row.unixtime_start)
datetime_end = datetime.fromtimestamp(row.unixtime_end)
text += _EMAIL_ITEMS.format(
**row._asdict(),
datetime_start=datetime_start.strftime('%c'),
datetime_end=datetime_end.strftime('%c'),
hostnames=self._get_hostnames(ip=row.offending_ip))
return text
def _get_emailmessage(self, body: str) -> EmailMessage:
'Return a composed email message.'
msg = EmailMessage()
msg['From'] = self._config['report']['from_address']
msg['To'] = self._config['report']['to_address']
msg['Subject'] = self._config['report']['subject']
msg.set_content(body)
return msg
def _send_email(self, msg: EmailMessage):
'Connect and send the message.'
conf = self._config['smtp']
with SMTP(host=conf['host'], port=conf['port']) as smtp:
smtp.starttls()
smtp.login(user=conf['login'], password=conf['password'])
smtp.send_message(msg=msg)
def _delete_rows(self):
'Delete the fetched rows from the DB.'
paramcount = ', '.join(['?'] * len(self._rows))
with self._sql_conn as cursor:
cursor.execute(
f'DELETE FROM reports where rowid IN ({paramcount})',
tuple(map(_rowid_getter, self._rows)))
def process(self):
'Start processing.'
self._rows = self._get_records()
if not self._rows:
return
filtered_rows = self._filter_and_sort_rows()
body = self._get_email_body(filtered_rows=filtered_rows)
msg = self._get_emailmessage(body=body)
self._send_email(msg=msg)
self._delete_rows()
def main():
'Startup of program.'
parser = ArgumentParser(description=_DESCRIPTION)
parser.add_argument(
'-c', '--config', required=True, help='Configuration file path',
dest='cfg_file')
parser.add_argument(
'--init-db', action='store_true', required=False,
help='Recreate sql database')
parser.add_argument(
'--recreate', action='store_true', required=False,
help='Recreate the config file')
parser.add_argument(
'-r', '--report', action='store_true', required=False,
help='Report from the collected logs')
parsed_args = parser.parse_args()
if parsed_args.recreate:
return _init_cfg(parsed_args=parsed_args)
config = _get_loaded_cfg(parsed_args=parsed_args)
if parsed_args.init_db:
return _init_db(config=config)
if parsed_args.report:
report_sender = ReportSender(config=config)
return report_sender.process()
handler = ImapHandler(config=config)
handler.process()
if __name__ == '__main__':
main()