Fix XML parsing error when the XML is invalid

This commit is contained in:
László Károlyi 2023-05-05 15:58:35 +02:00
parent d9dee7e239
commit 60fec85afd
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
3 changed files with 46 additions and 24 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
*.yaml
venv/
__pycache__
pyrightconfig.json

View File

@ -15,13 +15,14 @@ from pathlib import Path
from smtplib import SMTP
from sqlite3 import Connection, connect
from time import time
from typing import List, Tuple, Union
from xml.etree.ElementTree import XML, Element
from typing import Union
from xml.etree.ElementTree import XML, Element, ParseError
from zipfile import ZipFile, is_zipfile
from dns.rdatatype import PTR
from dns.resolver import NXDOMAIN, NoAnswer, Timeout, resolve
from dns.reversename import from_address
from magic import from_buffer
try:
from yaml import CLoader as Loader
@ -64,7 +65,7 @@ _DESCRIPTION = (
'DMARC Report analyzer and reporter. ' +
'The default mode is to analyze what\'s in the IMAP folder.')
_STARTTIME = time()
ListOfLogRow = List[LogRow]
ListOfLogRow = list[LogRow]
_EMAIL_HEADER = """\
Since the last log report, you had the following failures in the DMARC \
aggregate reports:
@ -217,9 +218,19 @@ class XmlParser(object):
for record in self._root.findall(path='record'):
self._parse_record(record=record)
def _get_root_or_fail(self) -> bool:
'Try to parse the XML, bail out if it\'s invalid.'
try:
self._root = XML(text=self._content)
return True
except ParseError:
# Silently fail, we don't care
return False
def process(self):
'Start processing.'
self._root = XML(text=self._content)
if not self._get_root_or_fail():
return
self._parse_header()
self._parse_records()
@ -227,7 +238,7 @@ class XmlParser(object):
class ImapHandler(object):
'Handling the IMAP connection'
_conn: IMAP4_SSL
_all_messages: List[bytes]
_all_messages: list[bytes]
def __init__(self, config: dict):
self._config = config
@ -243,16 +254,13 @@ class ImapHandler(object):
if encoding else text.decode()
return ''
def _get_extracted_gzip_content(
self, message: Message) -> Tuple[bytes, ...]:
def _get_extracted_gzip_content(self, content: bytes) -> tuple[bytes, ...]:
"""
Load and return the extracted XML content of the zip file in the
message.
"""
result = tuple()
fd = BytesIO(initial_bytes=message.get_payload(decode=True))
if not message.get_filename().endswith('xml.gz'):
return result
fd = BytesIO(initial_bytes=content)
try:
with gzip_open(filename=fd) as gzip_attachment:
result += (gzip_attachment.read(),)
@ -261,14 +269,13 @@ class ImapHandler(object):
# A finally statement would do here but whatever
return result
def _get_extracted_zip_content(
self, message: Message) -> Tuple[bytes, ...]:
def _get_extracted_zip_content(self, content: bytes) -> tuple[bytes, ...]:
"""
Load and return the extracted XML content of the zip file in the
message.
"""
result = tuple()
fd = BytesIO(initial_bytes=message.get_payload(decode=True))
fd = BytesIO(initial_bytes=content)
if not is_zipfile(filename=fd):
return result
with ZipFile(file=fd) as zip_attachment:
@ -279,7 +286,15 @@ class ImapHandler(object):
result += (zip_fd.read(),)
return result
def _walk_content(self, message: Message) -> Tuple[bytes, ...]:
def _get_message_content_type_and_content(
self, message: Message) -> tuple[str, bytes]:
'Return a content type of a `Message` by using filemagic.'
with BytesIO(initial_bytes=message.get_payload(decode=True)) as fd:
content = fd.read()
mime_type = from_buffer(buffer=content, mime=True)
return mime_type, content
def _walk_content(self, message: Message) -> tuple[bytes, ...]:
'Walk the content of the message recursively.'
result = tuple()
if message.is_multipart():
@ -291,26 +306,26 @@ class ImapHandler(object):
if message.get_content_disposition() is None \
or message.get_filename() is None:
return result
content_type = message.get_content_type()
content_type, content = \
self._get_message_content_type_and_content(message=message)
if content_type == 'application/zip':
result += self._get_extracted_zip_content(message=message)
result += self._get_extracted_zip_content(content=content)
elif content_type == 'application/gzip':
result += self._get_extracted_gzip_content(message=message)
result += self._get_extracted_gzip_content(content=content)
return result
def _parse_message(self, num: str) -> Union[Tuple[bytes, ...], None]:
def _parse_message(self, num: bytes) -> Union[tuple[bytes, ...], None]:
'Return the parsed XML content from the parsed message.'
response, msg = self._conn.fetch(
message_set=num, message_parts='(RFC822)')
message_set=num.decode(encoding='utf-8'), message_parts='(RFC822)')
if response != 'OK' or msg[0] is None or type(msg[0]) is not tuple:
return
message = message_from_bytes(s=msg[0][1])
# subject = self._get_subject(email=message)
extracted_content = self._walk_content(message=message)
# print(subject, extracted_content)
return extracted_content
def _move_processed_messages(self, to_be_moved: List[bytes]):
def _move_processed_messages(self, to_be_moved: list[bytes]):
'Move processed messages to the designated `Trash`.'
message_set = b','.join(to_be_moved).decode()
self._conn.copy(
@ -334,9 +349,14 @@ class ImapHandler(object):
if response != 'OK':
raise ServerException()
to_be_moved = []
for num in self._all_messages[0].split(): # type: bytes
if not len(self._all_messages):
return
response_firstitem = self._all_messages[0]
if not isinstance(response_firstitem, bytes):
return
for num in response_firstitem.split():
extracted_content = self._parse_message(num=num)
if not extracted_content or type(num) is not bytes:
if not extracted_content:
continue
to_be_moved.append(num)
for content_item in extracted_content:

View File

@ -1,2 +1,3 @@
dnspython==2.1.0
dnspython==2.3.0
python-magic==0.4.27
PyYAML==6.0