Initial commit

This commit is contained in:
László Károlyi 2021-11-02 22:23:57 +01:00
commit 94d8748fd8
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
3 changed files with 255 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.db
*.yaml
venv/

250
analyze.py Executable file
View File

@ -0,0 +1,250 @@
#!/usr/bin/env python
from argparse import ArgumentParser, Namespace
from datetime import datetime
from email import message_from_bytes
from email.header import decode_header
from email.message import Message
from imaplib import IMAP4_SSL
from io import BytesIO
from pathlib import Path
from sqlite3 import Connection, Cursor, connect
from time import time
from typing import List, Tuple
from xml.etree.ElementTree import XML, Element
from zipfile import ZipFile, is_zipfile
from yaml import CLoader as Loader
from yaml import load
_CFG_TEMPLATE = """\
sqlite_path: dmarc-analyzer.db
imap:
ssl_host: mail.example.com
username: imap-username
password: my-super-secret-password
folder_path: INBOX
"""
_STARTTIME = time()
class DmarcReporterBase(Exception):
'Base exception for the report analyser.'
class ServerException(DmarcReporterBase):
'Raised when the IMAP server says something is wrong.'
class NotAReport(DmarcReporterBase):
'Raised when the passed email is not a DMARC report.'
class XmlParser(object):
'Parse one report here.'
def __init__(self, content: bytes, config: dict, sql_conn: Connection):
self._content = content
self._sql_conn = sql_conn
self._cursor = sql_conn.cursor()
self._time = datetime
def _parse_header(self):
'Parse data headers.'
date_range = self._root.find(path='./report_metadata/date_range')
self._datetime_start = datetime.utcfromtimestamp(int(
date_range.find(path='begin').text))
self._datetime_end = datetime.utcfromtimestamp(int(
date_range.find(path='end').text))
self._org_name = self._root.find(
path='./report_metadata/org_name').text
self._domain = self._root.find(path='./policy_published/domain').text
self._report_id = self._root.find(
path='./report_metadata/report_id').text
def _note_failed_records(self, ip: str, failed: list, count: int):
'Add the failed records to the sqlite DB.'
self._cursor.execute(
'INSERT INTO reports(checked_at, org_name, domain, report_id, '
'datetime_start, datetime_end, offending_ip, count, failed_types) '
'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', (
_STARTTIME, self._org_name, self._domain, self._report_id,
self._datetime_start, self._datetime_end, ip, count,
', '.join(failed)))
self._sql_conn.commit()
def _parse_record(self, record: Element):
'Parse one record.'
ip = record.find(path='row/source_ip').text
count = int(record.find(path='row/count').text)
policy = record.find(path='row/policy_evaluated')
failed = [x.tag for x in policy.findall(path='.//*[.="fail"]')]
if not failed:
return
self._note_failed_records(ip=ip, failed=failed, count=count)
def _parse_records(self):
'Parse report result records.'
for record in self._root.findall(path='record'):
self._parse_record(record=record)
def start(self):
'Start parsing.'
self._root = XML(text=self._content)
self._parse_header()
print('\n', self._datetime_start.strftime(format='%c'))
print(self._datetime_end.strftime(format='%c'))
print(self._org_name, self._domain, self._report_id)
self._parse_records()
class ImapHandler(object):
'Handling the IMAP connection'
_conn: IMAP4_SSL
def __init__(self, config: dict):
self._config = config
self._sql_conn = connect(
database=Path(config['sqlite_path']).absolute())
def _get_subject(self, email: Message) -> str:
'Extract and return the subject.'
subject = decode_header(header=email['subject'])
for text, encoding in subject:
if encoding is None:
return text
return text.decode(encoding)
return ''
def _get_extracted_zip_content(self, message: Message) -> Tuple[bytes]:
"""
Load and return the extracted XML content of the zip file in the
message.
"""
result = tuple()
fd = BytesIO(initial_bytes=message.get_payload(decode=True))
if not is_zipfile(filename=fd):
return result
with ZipFile(file=fd) as zip_attachment:
for zipped_path in zip_attachment.infolist():
if not zipped_path.filename.endswith('.xml'):
continue
with zip_attachment.open(name=zipped_path) as zip_fd:
result += (zip_fd.read(),)
return result
def _walk_content(self, message: Message) -> Tuple[bytes]:
'Walk the content of the message recursively.'
result = tuple()
if message.is_multipart():
for part in message.walk():
if part == message:
continue
result += self._walk_content(message=part)
else:
if message.get_content_disposition() is None \
or message.get_filename() is None:
return result
if message.get_content_type() == 'application/zip':
result += self._get_extracted_zip_content(message=message)
return result
def _parse_message(self, num: bytes) -> Tuple[bytes]:
'Return the parsed XML content from the parsed message.'
response, msg = self._conn.fetch(
message_set=num, message_parts='(RFC822)')
message = message_from_bytes(s=msg[0][1])
subject = self._get_subject(email=message)
extracted_content = self._walk_content(message=message)
# print(subject, extracted_content)
return extracted_content
def _login_and_run(self):
'Start the IMAP conversation.'
response, result = self._conn.login(
user=self._config['imap']['username'],
password=self._config['imap']['password'])
response, result = self._conn.select(
mailbox=self._config['imap']['folder_path'])
if response != 'OK':
raise ServerException()
self._no_messages = int(result[0])
response, self._all_messages = self._conn.search(
None, 'ALL') # type: Tuple[str, List[bytes]]
if response != 'OK':
raise ServerException()
to_be_deleted = []
for num in self._all_messages[0].split(): # type: bytes
extracted_content = self._parse_message(num=num)
if not extracted_content:
continue
to_be_deleted.append(num)
for content_item in extracted_content:
parser = XmlParser(
content=content_item, config=self._config,
sql_conn=self._sql_conn)
parser.start()
def start(self):
'Start processing.'
with IMAP4_SSL(host=self._config['imap']['ssl_host']) as self._conn:
self._login_and_run()
self._sql_conn.close()
def _get_loaded_cfg(parsed_args: Namespace):
with Path(parsed_args.cfg_file).absolute().open('r') as fd:
data = load(stream=fd, Loader=Loader)
return data
def _init_cfg(parsed_args: Namespace):
'Initialize a config file template.'
path_cfg = Path(parsed_args.cfg_file).absolute()
with path_cfg.open('w') as fd:
fd.write(_CFG_TEMPLATE)
print((
f'Config file template written to {path_cfg}. Please edit it '
'before running this program.'))
def _init_db(parsed_args: Namespace, config: dict):
'Initialize a blank DB.'
conn = connect(database=Path(config['sqlite_path']).absolute())
cursor = conn.cursor()
cursor.execute('DROP TABLE IF EXISTS reports')
cursor.execute(
'CREATE TABLE reports (checked_at INTEGER, org_name TEXT, '
'domain TEXT, report_id TEXT, datetime_start INTEGER, '
'datetime_end INTEGER, offending_ip TEXT, count INTEGER, '
'failed_types TEXT)')
conn.commit()
conn.close()
print(
'Database initialized. You can now run the program without --init-db')
def main():
'Startup of program.'
parser = ArgumentParser(description='DMARC Report analyzer')
parser.add_argument(
'-c', '--config', required=True, help='Configuration file path',
dest='cfg_file')
parser.add_argument(
'--init-db', action='store_true', required=False,
help='Recreate sql database')
parser.add_argument(
'--recreate', action='store_true', required=False,
help='Recreate the config file')
parsed_args = parser.parse_args()
if parsed_args.recreate:
return _init_cfg(parsed_args=parsed_args)
config = _get_loaded_cfg(parsed_args=parsed_args)
if parsed_args.init_db:
return _init_db(parsed_args=parsed_args, config=config)
handler = ImapHandler(config=config)
handler.start()
if __name__ == '__main__':
main()

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
dnspython==2.1.0
PyYAML==6.0