Restructure SMTP check code

This commit is contained in:
Reinhard Müller 2021-02-23 21:28:13 +01:00
parent 59172783ce
commit ef15fa994a
1 changed files with 168 additions and 112 deletions

View File

@ -1,8 +1,7 @@
from logging import getLogger
from smtplib import SMTP, SMTPNotSupportedError, SMTPServerDisconnected
from socket import error as SocketError
from socket import gethostname
from typing import Optional, Tuple
from smtplib import (
SMTP, SMTPNotSupportedError, SMTPResponseException, SMTPServerDisconnected)
from typing import List, Optional
from dns.exception import Timeout
from dns.rdatatype import MX as rdtype_mx
@ -20,21 +19,6 @@ from .exceptions import (
LOGGER = getLogger(name=__name__)
class _ProtocolError(Exception):
"""
Raised when there is an error during the SMTP conversation.
Used only internally.
"""
def __init__(self, command: str, code: int, message: bytes):
self.command = command
self.code = code
self.message = message.decode(errors='ignore')
def __str__(self):
return f'{self.code} {self.message} (in reply to {self.command})'
def _get_mx_records(domain: str, timeout: int) -> list:
'Return the DNS response for checking, optionally raise exceptions.'
try:
@ -73,99 +57,171 @@ def _get_cleaned_mx_records(domain: str, timeout: int) -> list:
return result
def _smtp_ehlo_tls(smtp: SMTP, helo_host: str):
class _SMTPChecker(SMTP):
"""
Try and start the TLS session, fall back to unencrypted when
unavailable.
A specialized variant of `smtplib.SMTP` for checking the validity of
email addresses.
All the commands used in the check process are modified to raise
appropriate exceptions: `SMTPServerDisconnected` on connection
issues and `SMTPResponseException` on negative SMTP server
responses. Note that the methods of `smtplib.SMTP` already raise
these exceptions on some conditions.
Also, a new method `check` is added to run the check for a given
list of SMTP servers.
"""
code, message = smtp.ehlo(name=helo_host)
if code >= 400:
# EHLO bails out, no further SMTP commands are acceptable
raise _ProtocolError('EHLO', code, message)
try:
smtp.starttls()
code, message = smtp.ehlo(name=helo_host)
except SMTPNotSupportedError:
# The server does not support the STARTTLS extension
pass
except RuntimeError:
# SSL/TLS support is not available to your Python interpreter
pass
def __init__(
self, local_hostname: str, timeout: float, debug: bool,
raise_communication_errors: bool,
raise_temporary_errors: bool,
sender: str, recip: str):
"""
Initialize the object with all the parameters which remain
constant during the check of one email address on all the SMTP
servers.
"""
super().__init__(local_hostname=local_hostname, timeout=timeout)
self.set_debuglevel(debuglevel=2 if debug else False)
self.__raise_communication_errors = raise_communication_errors
self.__raise_temporary_errors = raise_temporary_errors
self.__sender = sender
self.__recip = recip
self.__communication_errors = {}
self.__temporary_errors = {}
# Avoid error on close() after unsuccessful connect
self.sock = None
def putcmd(self, cmd, args=""):
"""
Like `smtplib.SMTP.putcmd`, but remember the command for later
use in error messages.
"""
if args:
self.__command = f'{cmd} {args}'
else:
self.__command = cmd
super().putcmd(cmd, args)
def _smtp_mail(smtp: SMTP, from_address: EmailAddress):
'Send and evaluate the `MAIL FROM` command.'
code, message = smtp.mail(sender=from_address.ace)
if code >= 400:
# MAIL FROM bails out, no further SMTP commands are acceptable
raise _ProtocolError('MAIL FROM', code, message)
def _smtp_converse(
mx_record: str, smtp_timeout: int, debug: bool, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress
) -> Tuple[int, str]:
"""
Do the `SMTP` conversation with one MX, and return code and message
of the reply to the `RCPT TO:` command.
If the conversation fails before the `RCPT TO:` command can be
issued, a `_ProtocolError` is raised.
"""
if debug:
LOGGER.debug(msg=f'Trying {mx_record} ...')
with SMTP(timeout=smtp_timeout) as smtp:
smtp._host = mx_record # Workaround for bug in smtplib
smtp.set_debuglevel(debuglevel=2 if debug else False)
code, message = smtp.connect(host=mx_record)
if code >= 400:
raise _ProtocolError('connect', code, message)
_smtp_ehlo_tls(smtp=smtp, helo_host=helo_host)
_smtp_mail(smtp=smtp, from_address=from_address)
return smtp.rcpt(recip=email_address.ace)
def _check_mx_records(
mx_records: list, smtp_timeout: int, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress,
debug: bool, raise_communication_errors: bool,
raise_temporary_errors: bool) -> Optional[bool]:
'Check the mx records for a given email address.'
communication_errors = {}
temporary_errors = {}
for mx_record in mx_records:
def connect(self, host, *args, **kwargs):
"""
Like `smtplib.SMTP.connect`, but raise appropriate exceptions on
connection failure or negative SMTP server response.
"""
self.__command = 'connect' # Used for error messages.
self._host = host # Missing in standard smtplib!
try:
code, message = _smtp_converse(
mx_record=mx_record, smtp_timeout=smtp_timeout, debug=debug,
helo_host=helo_host, from_address=from_address,
email_address=email_address)
if code >= 500:
# Address clearly invalid: exit early.
raise AddressNotDeliverableError({mx_record: (
'RCPT TO', code, message.decode(errors='ignore'))})
elif code >= 400:
# Temporary error on this MX: collect message and continue.
temporary_errors[mx_record] = (
'RCPT TO', code, message.decode(errors='ignore'))
else:
# Address clearly valid: exit early.
code, message = super().connect(host, *args, **kwargs)
except OSError as error:
raise SMTPServerDisconnected(str(error))
if code >= 400:
raise SMTPResponseException(code, message)
return code, message
def starttls(self, *args, **kwargs):
"""
Like `smtplib.SMTP.starttls`, but continue without TLS in case
either end of the connection does not support it.
"""
try:
super().starttls(*args, **kwargs)
except SMTPNotSupportedError:
# The server does not support the STARTTLS extension
pass
except RuntimeError:
# SSL/TLS support is not available to your Python interpreter
pass
def mail(self, *args, **kwargs):
"""
Like `smtplib.SMTP.mail`, but raise an appropriate exception on
negative SMTP server response.
"""
code, message = super().mail(*args, **kwargs)
if code >= 400:
raise SMTPResponseException(code, message)
return code, message
def rcpt(self, *args, **kwargs):
"""
Like `smtplib.SMTP.rcpt`, but handle negative SMTP server
responses directly.
"""
code, message = super().rcpt(*args, **kwargs)
if code >= 500:
# Address clearly invalid: issue negative result
raise AddressNotDeliverableError({self._host: (
'RCPT TO', code, message.decode(errors='ignore'))})
elif code >= 400:
# Temporary error on this host: collect message
self.__temporary_errors[self._host] = (
'RCPT TO', code, message.decode(errors='ignore'))
return code, message
def quit(self):
"""
Like `smtplib.SMTP.quit`, but make sure that everything is
cleaned up properly even if the connection has been lost before.
"""
try:
return super().quit()
except SMTPServerDisconnected:
self.ehlo_resp = self.helo_resp = None
self.esmtp_features = {}
self.does_esmtp = False
self.close()
def _check_one(self, host: str) -> bool:
"""
Run the check for one SMTP server. On positive result, return
`True`. On negative result, raise `AddressNotDeliverableError`.
On ambiguous result (4xx response to `RCPT TO`) or any
communication issue before even reaching `RCPT TO` in the
protocol, collect error message for later use and return
`False`.
"""
try:
self.connect(host)
self.starttls()
self.ehlo_or_helo_if_needed()
self.mail(self.__sender)
code, message = self.rcpt(self.__recip)
except SMTPServerDisconnected as e:
self.__communication_errors[self._host] = (
self.__command, 0, str(e))
return False
except SMTPResponseException as e:
self.__communication_errors[self._host] = (
self.__command, e.smtp_code,
e.smtp_error.decode(errors='ignore'))
return False
finally:
self.quit()
return (code < 400)
def check(self, hosts: List[str]) -> Optional[bool]:
"""
Run the check for all given SMTP servers. On positive result,
return `True`. On negative result, raise
`AddressNotDeliverableError`. On ambiguous result (4xx
response(s) to `RCPT TO`) or any communication issue(s) before
even reaching `RCPT TO` in the protocol, either raise an
exception or return `None` depending on the parameters.
"""
for host in hosts:
if self.debuglevel > 0:
LOGGER.debug(msg=f'Trying {host} ...')
if self._check_one(host):
return True
except (SocketError, SMTPServerDisconnected) as error:
# Connection problem: collect message and continue.
communication_errors[mx_record] = ('connect', 0, error)
except _ProtocolError as error:
# SMTP communication error: collect message and continue.
communication_errors[mx_record] = (
error.command, error.code, error.message)
# Raise exceptions on ambiguous results if desired. If in doubt, raise the
# CommunicationError because that one might point to local configuration or
# blacklisting issues.
if communication_errors and raise_communication_errors:
raise SMTPCommunicationError(communication_errors)
if temporary_errors and raise_temporary_errors:
raise SMTPTemporaryError(temporary_errors)
# Can't verify whether or not email address exists.
return None
# Raise exceptions on ambiguous results if desired. If in doubt, raise
# the CommunicationError because that one might point to local
# configuration or blacklisting issues.
if self.__communication_errors and self.__raise_communication_errors:
raise SMTPCommunicationError(self.__communication_errors)
if self.__temporary_errors and self.__raise_temporary_errors:
raise SMTPTemporaryError(self.__temporary_errors)
# Can't verify whether or not email address exists.
return None
def mx_check(
@ -204,7 +260,6 @@ def mx_check(
exceptions is raised depending on the exact issue, all derived from
`MXError`.
"""
host = helo_host or gethostname()
from_address = from_address or email_address
if email_address.domain_literal_ip:
mx_records = [email_address.domain_literal_ip]
@ -213,8 +268,9 @@ def mx_check(
domain=email_address.domain, timeout=dns_timeout)
if skip_smtp:
return True
return _check_mx_records(
mx_records=mx_records, smtp_timeout=smtp_timeout, helo_host=host,
from_address=from_address, email_address=email_address, debug=debug,
raise_communication_errors=raise_communication_errors,
raise_temporary_errors=raise_temporary_errors)
smtp_checker = _SMTPChecker(
local_hostname=helo_host, timeout=smtp_timeout, debug=debug,
raise_communication_errors=raise_communication_errors,
raise_temporary_errors=raise_temporary_errors,
sender=from_address.ace, recip=email_address.ace)
return smtp_checker.check(mx_records)