Improve SMTP check error handling

* Improve exception class hierarchy
* Raise exception on malformatted "from_address" parameter instead of
  yielding a negative verification result
* Add option to raise exceptions on ambiguous results
* Improve exception parameters to allow for a more structured analysis
  of the negative response
* Yield a negative verification result upon the first permanent error on
  RCPT TO, just like a mail server would also bounce after the first
  permanent error
This commit is contained in:
Reinhard Müller 2021-02-22 14:32:56 +01:00
parent 009182542e
commit 9d662a2f23
4 changed files with 182 additions and 83 deletions

View File

@ -31,9 +31,9 @@ Basic usage::
:code:`check_mx`: check the mx-records and check whether the email actually exists
:code:`from_address`: the email address the probe will be sent from,
:code:`from_address`: the email address the probe will be sent from
:code:`helo_host`: the host to use in SMTP HELO when checking for an email,
:code:`helo_host`: the host to use in SMTP HELO when checking for an email
:code:`smtp_timeout`: seconds until SMTP timeout
@ -45,6 +45,10 @@ Basic usage::
:code:`skip_smtp`: (default :code:`False`) skip the SMTP conversation with the server, after MX checks. Will automatically be set to :code:`True` when :code:`check_mx` is :code:`False`!
:code:`raise_communication_errors`: Affects the SMTP verification step. If set to :code:`True`, any connection error or SMTP error message from the server will lead to a negative verification result, otherwise it will be regarded as an ambiguous result. Defaults to :code:`False`. This option is mainly used in connection with :code:`validate_email_or_fail()`, where the exception raised can be analyzed to find out the reason for the otherwise ambiguous result.
:code:`raise_temporary_errors`: Affects the SMTP verification step. If set to :code:`True`, a temporary error reply of the SMTP server to the :code:`RCPT TO` command (as used, for example, with greylisting) will lead to a negative verification result, otherwise it will be regarded as an ambiguous result. Defaults to :code:`False`. This option is mainly used in connection with :code:`validate_email_or_fail()`, where the exception raised can be analyzed to find out the reason for the otherwise ambiguous result.
The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure instead of returning :code:`False`.
The module will try to negotiate a TLS connection with STARTTLS, and silently fall back to an unencrypted SMTP connection if the server doesn't support it.

View File

@ -1,20 +1,22 @@
from typing import Iterable
from typing import Dict, Tuple
class EmailValidationError(Exception):
'Base class for all exceptions indicating validation failure.'
class Error(Exception):
'Base class for all exceptions of this module.'
message = 'Unknown error.'
def __str__(self):
return self.message
class AddressFormatError(EmailValidationError):
'Raised when the email address has an invalid format.'
message = 'Invalid email address.'
class ParameterError(Error):
"""
Base class for all exceptions indicating a wrong function parameter.
"""
pass
class FromAddressFormatError(EmailValidationError):
class FromAddressFormatError(ParameterError):
"""
Raised when the from email address used for the MX check has an
invalid format.
@ -22,6 +24,16 @@ class FromAddressFormatError(EmailValidationError):
message = 'Invalid "From:" email address.'
class EmailValidationError(Error):
'Base class for all exceptions indicating validation failure.'
pass
class AddressFormatError(EmailValidationError):
'Raised when the email address has an invalid format.'
message = 'Invalid email address.'
class DomainBlacklistedError(EmailValidationError):
"""
Raised when the domain of the email address is blacklisted on
@ -30,43 +42,101 @@ class DomainBlacklistedError(EmailValidationError):
message = 'Domain blacklisted.'
class DomainNotFoundError(EmailValidationError):
class MXError(EmailValidationError):
"""
Base class of all exceptions that indicate failure to determine a
valid MX for the domain of email address.
"""
pass
class DomainNotFoundError(MXError):
'Raised when the domain is not found.'
message = 'Domain not found.'
class NoNameserverError(EmailValidationError):
class NoNameserverError(MXError):
'Raised when the domain does not resolve by nameservers in time.'
message = 'No nameserver found for domain.'
class DNSTimeoutError(EmailValidationError):
class DNSTimeoutError(MXError):
'Raised when the domain lookup times out.'
message = 'Domain lookup timed out.'
class DNSConfigurationError(EmailValidationError):
class DNSConfigurationError(MXError):
"""
Raised when the DNS entries for this domain are falsely configured.
"""
message = 'Misconfigurated DNS entries for domain.'
class NoMXError(EmailValidationError):
'Raised then the domain has no MX records configured.'
class NoMXError(MXError):
'Raised when the domain has no MX records configured.'
message = 'No MX record for domain found.'
class NoValidMXError(EmailValidationError):
class NoValidMXError(MXError):
"""
Raised when the domain has MX records configured, but none of them
has a valid format.
"""
message = 'No valid MX record for domain found.'
class AddressNotDeliverableError(EmailValidationError):
'Raised when a non-ambigious resulted lookup fails.'
message = 'Email address undeliverable:'
class SMTPError(EmailValidationError):
"""
Base class for exceptions raised from unsuccessful SMTP
communication.
def __init__(self, error_messages: Iterable):
`error_messages` is a dictionary with an entry per MX record, where
the hostname is the key and a tuple of command, error code, and
error message is the value.
"""
def __init__(self, error_messages: Dict[str, Tuple[str, int, str]]):
self.error_messages = error_messages
def __str__(self) -> str:
return '\n'.join([self.message] + self.error_messages)
return '\n'.join(
[self.message] +
[f'{k}: {v[1]} {v[2]} (in reply to {v[0]})'
for k, v in self.error_messages.items()]
)
class AddressNotDeliverableError(SMTPError):
"""
Raised when at least one of the MX sends an SMTP reply which
unambiguously indicate an invalid (nonexistant, blocked, expired...)
recipient email address.
This exception indicates that the email address is clearly invalid.
"""
message = 'Email address undeliverable:'
class SMTPCommunicationError(SMTPError):
"""
Raised when the SMTP communication with all MX was unsuccessful for
other reasons than an invalid recipient email address.
This exception indicates a configuration issue either on the host
where this program runs or on the MX. A possible reason is that the
local host ist blacklisted on the MX.
"""
message = 'SMTP communication failure:'
class SMTPTemporaryError(SMTPError):
"""
Raised when the email address cannot be verified because none of the
MX gave a clear "yes" or "no" about the existence of the address,
but at least one gave a temporary error reply to the "RCPT TO:"
command.
This exception indicates that the validity of the email address
cannot be verified, either for reasons of MX configuration (like
greylisting) or due to temporary server issues on the MX.
"""
message = 'Temporary error in email address verification:'

View File

@ -2,7 +2,7 @@ from logging import getLogger
from smtplib import SMTP, SMTPNotSupportedError, SMTPServerDisconnected
from socket import error as SocketError
from socket import gethostname
from typing import Optional
from typing import Optional, Tuple
from dns.exception import Timeout
from dns.rdatatype import MX as rdtype_mx
@ -14,7 +14,8 @@ from .constants import HOST_REGEX
from .email_address import EmailAddress
from .exceptions import (
AddressNotDeliverableError, DNSConfigurationError, DNSTimeoutError,
DomainNotFoundError, NoMXError, NoNameserverError, NoValidMXError)
DomainNotFoundError, NoMXError, NoNameserverError, NoValidMXError,
SMTPCommunicationError, SMTPTemporaryError)
LOGGER = getLogger(name=__name__)
@ -78,7 +79,7 @@ def _smtp_ehlo_tls(smtp: SMTP, helo_host: str):
unavailable.
"""
code, message = smtp.ehlo(name=helo_host)
if code >= 300:
if code >= 400:
# EHLO bails out, no further SMTP commands are acceptable
raise _ProtocolError('EHLO', code, message)
try:
@ -95,19 +96,21 @@ def _smtp_ehlo_tls(smtp: SMTP, helo_host: str):
def _smtp_mail(smtp: SMTP, from_address: EmailAddress):
'Send and evaluate the `MAIL FROM` command.'
code, message = smtp.mail(sender=from_address.ace)
if code >= 300:
if code >= 400:
# MAIL FROM bails out, no further SMTP commands are acceptable
raise _ProtocolError('MAIL FROM', code, message)
def _smtp_converse(
mx_record: str, smtp_timeout: int, debug: bool, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress):
from_address: EmailAddress, email_address: EmailAddress
) -> Tuple[int, str]:
"""
Do the `SMTP` conversation, handle errors in the caller.
Do the `SMTP` conversation with one MX, and return code and message
of the reply to the `RCPT TO:` command.
Raise `_ProtocolError` on error, and `StopIteration` if the
conversation points out an existing email.
If the conversation fails before the `RCPT TO:` command can be
issued, a `_ProtocolError` is raised.
"""
if debug:
LOGGER.debug(msg=f'Trying {mx_record} ...')
@ -119,70 +122,87 @@ def _smtp_converse(
raise _ProtocolError('connect', code, message)
_smtp_ehlo_tls(smtp=smtp, helo_host=helo_host)
_smtp_mail(smtp=smtp, from_address=from_address)
code, message = smtp.rcpt(recip=email_address.ace)
if code == 250:
# Address valid, early exit
raise StopIteration
elif code >= 500:
raise _ProtocolError('RCPT TO', code, message)
def _check_one_mx(
error_messages: list, mx_record: str, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress,
smtp_timeout: int, debug: bool) -> bool:
"""
Check one MX server, return the `is_ambigious` boolean or raise
`StopIteration` if this MX accepts the email.
"""
try:
_smtp_converse(
mx_record=mx_record, smtp_timeout=smtp_timeout, debug=debug,
helo_host=helo_host, from_address=from_address,
email_address=email_address)
except SMTPServerDisconnected:
return True
except (SocketError, _ProtocolError) as error:
error_messages.append(f'{mx_record}: {error}')
return False
return True
return smtp.rcpt(recip=email_address.ace)
def _check_mx_records(
mx_records: list, smtp_timeout: int, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress,
debug: bool) -> Optional[bool]:
debug: bool, raise_communication_errors: bool,
raise_temporary_errors: bool) -> Optional[bool]:
'Check the mx records for a given email address.'
# TODO: Raise an ambigious exception, containing the messages? Will
# be a breaking change.
error_messages = []
found_ambigious = False
communication_errors = {}
temporary_errors = {}
for mx_record in mx_records:
try:
found_ambigious |= _check_one_mx(
error_messages=error_messages, mx_record=mx_record,
code, message = _smtp_converse(
mx_record=mx_record, smtp_timeout=smtp_timeout, debug=debug,
helo_host=helo_host, from_address=from_address,
email_address=email_address, smtp_timeout=smtp_timeout,
debug=debug)
except StopIteration:
# Address valid, early exit
return True
# If any of the mx servers behaved ambigious, return None, otherwise raise
# an exception containing the collected error messages.
if not found_ambigious:
raise AddressNotDeliverableError(error_messages=error_messages)
email_address=email_address)
if code >= 500:
# Address clearly invalid: exit early.
raise AddressNotDeliverableError({mx_record: (
'RCPT TO', code, message.decode(errors='ignore'))})
elif code >= 400:
# Temporary error on this MX: collect message and continue.
temporary_errors[mx_record] = (
'RCPT TO', code, message.decode(errors='ignore'))
else:
# Address clearly valid: exit early.
return True
except (SocketError, SMTPServerDisconnected) as error:
# Connection problem: collect message and continue.
communication_errors[mx_record] = ('connect', 0, error)
except _ProtocolError as error:
# SMTP communication error: collect message and continue.
communication_errors[mx_record] = (
error.command, error.code, error.message)
# Raise exceptions on ambiguous results if desired. If in doubt, raise the
# CommunicationError because that one might point to local configuration or
# blacklisting issues.
if communication_errors and raise_communication_errors:
raise SMTPCommunicationError(communication_errors)
if temporary_errors and raise_temporary_errors:
raise SMTPTemporaryError(temporary_errors)
# Can't verify whether or not email address exists.
return None
def mx_check(
email_address: EmailAddress, debug: bool,
from_address: Optional[EmailAddress] = None,
helo_host: Optional[str] = None, smtp_timeout: int = 10,
dns_timeout: int = 10, skip_smtp: bool = False
) -> Optional[bool]:
email_address: EmailAddress, debug: bool,
from_address: Optional[EmailAddress] = None,
helo_host: Optional[str] = None, smtp_timeout: int = 10,
dns_timeout: int = 10, skip_smtp: bool = False,
raise_communication_errors: bool = False,
raise_temporary_errors: bool = False
) -> Optional[bool]:
"""
Return `True` if the host responds with a deliverable response code,
`False` if not-deliverable. Also, return `None` if there if couldn't
provide a conclusive result (e.g. temporary errors or graylisting).
Verify the given email address by determining the SMTP servers
responsible for the domain and then asking them to deliver an
email to the address. Before the actual message is sent, the
process is interrupted.
Returns `True` as soon as the any server accepts the recipient
address.
Raises a `AddressNotDeliverableError` if any server unambiguously
and permanently refuses to accept the recipient address.
If the server answers with a temporary error code, the validity of
the email address can not be determined. In that case, the function
returns `None`, or an `SMTPTemporaryError` is raised, dependent on
the value of `raise_temporary_errors`. Greylisting is a frequent
cause of this.
If the SMTP server(s) reply with an error message to any of the
communication steps before the recipient address is checked, the
validity of the email address can not be determined either. In that
case, the function returns `None`, or an `SMTPCommunicationError` is
raised, dependent on the value of `raise_communication_errors`.
In case no responsible SMTP servers can be determined, a variety of
exceptions is raised depending on the exact issue, all derived from
`MXError`.
"""
host = helo_host or gethostname()
from_address = from_address or email_address
@ -195,4 +215,6 @@ def mx_check(
return True
return _check_mx_records(
mx_records=mx_records, smtp_timeout=smtp_timeout, helo_host=host,
from_address=from_address, email_address=email_address, debug=debug)
from_address=from_address, email_address=email_address, debug=debug,
raise_communication_errors=raise_communication_errors,
raise_temporary_errors=raise_temporary_errors)

View File

@ -16,7 +16,8 @@ def validate_email_or_fail(
from_address: Optional[str] = None, helo_host: Optional[str] = None,
smtp_timeout: int = 10, dns_timeout: int = 10,
use_blacklist: bool = True, debug: bool = False,
skip_smtp: bool = False) -> Optional[bool]:
skip_smtp: bool = False, raise_communication_errors: bool = False,
raise_temporary_errors: bool = False) -> Optional[bool]:
"""
Return `True` if the email address validation is successful, `None` if the
validation result is ambigious, and raise an exception if the validation
@ -38,7 +39,9 @@ def validate_email_or_fail(
return mx_check(
email_address=email_address, from_address=from_address,
helo_host=helo_host, smtp_timeout=smtp_timeout,
dns_timeout=dns_timeout, skip_smtp=skip_smtp, debug=debug)
dns_timeout=dns_timeout, skip_smtp=skip_smtp, debug=debug,
raise_communication_errors=raise_communication_errors,
raise_temporary_errors=raise_temporary_errors)
def validate_email(email_address: str, *args, **kwargs):