dmarc-analyzer/analyze.py

301 lines
10 KiB
Python
Executable File

#!/usr/bin/env python
from argparse import ArgumentParser, Namespace
from collections import namedtuple
from datetime import datetime
from email import message_from_bytes
from email.header import decode_header
from email.message import Message
from imaplib import IMAP4_SSL
from io import BytesIO
from pathlib import Path
from sqlite3 import Connection, Cursor, connect
from time import time
from typing import List, Tuple
from xml.etree.ElementTree import XML, Element
from zipfile import ZipFile, is_zipfile
from yaml import CLoader as Loader
from yaml import load
_INSERTED_FIELDS = (
'checked_at, org_name, domain, report_id, unixtime_start, unixtime_end, '
'offending_ip, count, failed_types')
_FETCHED_FIELDS = f'rowid, {_INSERTED_FIELDS}'
LogRow = namedtuple(typename='LogRow', field_names=_FETCHED_FIELDS)
_CFG_TEMPLATE = """\
sqlite_path: dmarc-analyzer.db
# Imap account where the aggregate reports are stored in
imap:
host: mail.example.com
username: imap-username
password: my-super-secret-password
folder_path: INBOX
# Host settings for sending email reports
smtp:
host: mail.example.com
port: 587
login: sender-login
password: password
"""
_DESCRIPTION = (
'DMARC Report analyzer and reporter. '
'The default mode is to analyze what\'s in the IMAP folder.')
_STARTTIME = time()
class DmarcReporterBase(Exception):
'Base exception for the report analyser.'
class ServerException(DmarcReporterBase):
'Raised when the IMAP server says something is wrong.'
class NotAReport(DmarcReporterBase):
'Raised when the passed email is not a DMARC report.'
def _get_sql_connection(config: dict) -> Connection:
'Return an sqlite `Connection` object from the settings.'
return connect(database=Path(config['sqlite_path']).absolute())
def _get_loaded_cfg(parsed_args: Namespace) -> dict:
with Path(parsed_args.cfg_file).absolute().open('r') as fd:
data = load(stream=fd, Loader=Loader)
return data
def _init_cfg(parsed_args: Namespace):
'Initialize a config file template.'
path_cfg = Path(parsed_args.cfg_file).absolute()
with path_cfg.open('w') as fd:
fd.write(_CFG_TEMPLATE)
print((
f'Config file template written to {path_cfg}. Please edit it '
'before running this program.'))
def _init_db(parsed_args: Namespace, config: dict):
'Initialize a blank DB.'
conn = connect(database=Path(config['sqlite_path']).absolute())
cursor = conn.cursor()
cursor.execute('DROP TABLE IF EXISTS reports')
cursor.execute(
'CREATE TABLE reports (checked_at INTEGER, org_name TEXT, '
'domain TEXT, report_id TEXT, unixtime_start INTEGER, '
'unixtime_end INTEGER, offending_ip TEXT, count INTEGER, '
'failed_types TEXT)')
conn.commit()
conn.close()
print(
'Database initialized. You can now run the program without --init-db')
class XmlParser(object):
'Parse one report here.'
def __init__(self, content: bytes, config: dict, sql_conn: Connection):
self._content = content
self._sql_conn = sql_conn
self._cursor = sql_conn.cursor()
self._time = datetime
def _parse_header(self):
'Parse data headers.'
date_range = self._root.find(path='./report_metadata/date_range')
self._unixtime_start = int(date_range.find(path='begin').text)
self._unixtime_end = int(date_range.find(path='end').text)
self._org_name = self._root.find(
path='./report_metadata/org_name').text
self._domain = self._root.find(path='./policy_published/domain').text
self._report_id = self._root.find(
path='./report_metadata/report_id').text
def _note_failed_records(self, ip: str, failed: list, count: int):
'Add the failed records to the sqlite DB.'
self._cursor.execute(
f'INSERT INTO reports({_INSERTED_FIELDS}) VALUES (?, ?, ?, ?, ?, '
'?, ?, ?, ?)', (
_STARTTIME, self._org_name, self._domain, self._report_id,
self._unixtime_start, self._unixtime_end, ip, count,
', '.join(failed)))
self._sql_conn.commit()
def _parse_record(self, record: Element):
'Parse one record.'
ip = record.find(path='row/source_ip').text
count = int(record.find(path='row/count').text)
policy = record.find(path='row/policy_evaluated')
failed = [x.tag for x in policy.findall(path='.//*[.="fail"]')]
if not failed:
return
self._note_failed_records(ip=ip, failed=failed, count=count)
def _parse_records(self):
'Parse report result records.'
for record in self._root.findall(path='record'):
self._parse_record(record=record)
def process(self):
'Start processing.'
self._root = XML(text=self._content)
self._parse_header()
print('\n', datetime.fromtimestamp(
self._unixtime_start).strftime(format='%c'))
print(datetime.fromtimestamp(self._unixtime_end).strftime(format='%c'))
print(self._org_name, self._domain, self._report_id)
self._parse_records()
class ImapHandler(object):
'Handling the IMAP connection'
_conn: IMAP4_SSL
def __init__(self, config: dict):
self._config = config
self._sql_conn = _get_sql_connection(config=config)
def _get_subject(self, email: Message) -> str:
'Extract and return the subject.'
subject = decode_header(header=email['subject'])
for text, encoding in subject:
if encoding is None:
return text
return text.decode(encoding)
return ''
def _get_extracted_zip_content(self, message: Message) -> Tuple[bytes]:
"""
Load and return the extracted XML content of the zip file in the
message.
"""
result = tuple()
fd = BytesIO(initial_bytes=message.get_payload(decode=True))
if not is_zipfile(filename=fd):
return result
with ZipFile(file=fd) as zip_attachment:
for zipped_path in zip_attachment.infolist():
if not zipped_path.filename.endswith('.xml'):
continue
with zip_attachment.open(name=zipped_path) as zip_fd:
result += (zip_fd.read(),)
return result
def _walk_content(self, message: Message) -> Tuple[bytes]:
'Walk the content of the message recursively.'
result = tuple()
if message.is_multipart():
for part in message.walk():
if part == message:
continue
result += self._walk_content(message=part)
else:
if message.get_content_disposition() is None \
or message.get_filename() is None:
return result
if message.get_content_type() == 'application/zip':
result += self._get_extracted_zip_content(message=message)
return result
def _parse_message(self, num: bytes) -> Tuple[bytes]:
'Return the parsed XML content from the parsed message.'
response, msg = self._conn.fetch(
message_set=num, message_parts='(RFC822)')
message = message_from_bytes(s=msg[0][1])
subject = self._get_subject(email=message)
extracted_content = self._walk_content(message=message)
# print(subject, extracted_content)
return extracted_content
def _login_and_run(self):
'Start the IMAP conversation.'
response, result = self._conn.login(
user=self._config['imap']['username'],
password=self._config['imap']['password'])
response, result = self._conn.select(
mailbox=self._config['imap']['folder_path'])
if response != 'OK':
raise ServerException()
self._no_messages = int(result[0])
response, self._all_messages = self._conn.search(
None, 'ALL') # type: Tuple[str, List[bytes]]
if response != 'OK':
raise ServerException()
to_be_deleted = []
for num in self._all_messages[0].split(): # type: bytes
extracted_content = self._parse_message(num=num)
if not extracted_content:
continue
to_be_deleted.append(num)
for content_item in extracted_content:
parser = XmlParser(
content=content_item, config=self._config,
sql_conn=self._sql_conn)
parser.process()
def process(self):
'Start processing.'
with IMAP4_SSL(host=self._config['imap']['host']) as self._conn:
self._login_and_run()
self._sql_conn.close()
class ReportSender(object):
'Sending reports per email.'
def __init__(self, config: dict):
self._config = config
self._sql_conn = _get_sql_connection(config=config)
self._cursor = self._sql_conn.cursor()
def _read_records(self):
'Read records from the DB.'
self._cursor.execute(f'SELECT {_FETCHED_FIELDS} FROM reports')
rows = list()
while True:
row = self._cursor.fetchone()
if row is None:
break
rows.append(LogRow(*row))
return rows
def process(self):
'Start processing.'
rows = self._read_records()
# self._filter_and_sort_rows()
def main():
'Startup of program.'
parser = ArgumentParser(description=_DESCRIPTION)
parser.add_argument(
'-c', '--config', required=True, help='Configuration file path',
dest='cfg_file')
parser.add_argument(
'--init-db', action='store_true', required=False,
help='Recreate sql database')
parser.add_argument(
'--recreate', action='store_true', required=False,
help='Recreate the config file')
parser.add_argument(
'-r', '--report', action='store_true', required=False,
help='Report from the collected logs')
parsed_args = parser.parse_args()
if parsed_args.recreate:
return _init_cfg(parsed_args=parsed_args)
config = _get_loaded_cfg(parsed_args=parsed_args)
if parsed_args.init_db:
return _init_db(parsed_args=parsed_args, config=config)
if parsed_args.report:
report_sender = ReportSender(config=config)
return report_sender.process()
handler = ImapHandler(config=config)
handler.process()
if __name__ == '__main__':
main()