Code cleanup, tests follow

This commit is contained in:
László Károlyi 2021-02-28 15:01:37 +01:00
parent ef15fa994a
commit 903925afc9
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
4 changed files with 101 additions and 100 deletions

View File

@ -45,11 +45,7 @@ Basic usage::
:code:`skip_smtp`: (default :code:`False`) skip the SMTP conversation with the server, after MX checks. Will automatically be set to :code:`True` when :code:`check_mx` is :code:`False`!
:code:`raise_communication_errors`: Affects the SMTP verification step. If set to :code:`True`, any connection error or SMTP error message from the server will lead to a negative verification result, otherwise it will be regarded as an ambiguous result. Defaults to :code:`False`. This option is mainly used in connection with :code:`validate_email_or_fail()`, where the exception raised can be analyzed to find out the reason for the otherwise ambiguous result.
:code:`raise_temporary_errors`: Affects the SMTP verification step. If set to :code:`True`, a temporary error reply of the SMTP server to the :code:`RCPT TO` command (as used, for example, with greylisting) will lead to a negative verification result, otherwise it will be regarded as an ambiguous result. Defaults to :code:`False`. This option is mainly used in connection with :code:`validate_email_or_fail()`, where the exception raised can be analyzed to find out the reason for the otherwise ambiguous result.
The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure instead of returning :code:`False`.
The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure and ambiguous result instead of returning :code:`False` or :code:`None`, respectively.
The module will try to negotiate a TLS connection with STARTTLS, and silently fall back to an unencrypted SMTP connection if the server doesn't support it.

View File

@ -1,4 +1,8 @@
from typing import Dict, Tuple
from collections import namedtuple
from typing import Dict
SMTPMessage = namedtuple(
typename='SmtpErrorMessage', field_names=['command', 'code', 'text'])
class Error(Exception):
@ -13,7 +17,6 @@ class ParameterError(Error):
"""
Base class for all exceptions indicating a wrong function parameter.
"""
pass
class FromAddressFormatError(ParameterError):
@ -26,7 +29,6 @@ class FromAddressFormatError(ParameterError):
class EmailValidationError(Error):
'Base class for all exceptions indicating validation failure.'
pass
class AddressFormatError(EmailValidationError):
@ -47,7 +49,6 @@ class MXError(EmailValidationError):
Base class of all exceptions that indicate failure to determine a
valid MX for the domain of email address.
"""
pass
class DomainNotFoundError(MXError):
@ -90,19 +91,20 @@ class SMTPError(EmailValidationError):
Base class for exceptions raised from unsuccessful SMTP
communication.
`error_messages` is a dictionary with an entry per MX record, where
the hostname is the key and a tuple of command, error code, and
error message is the value.
`error_messages` is a dictionary with a `SMTPMessage` per MX record,
where the hostname is the key and a tuple of command, error code,
and error message is the value.
"""
def __init__(self, error_messages: Dict[str, Tuple[str, int, str]]):
def __init__(self, error_messages: Dict[str, SMTPMessage]):
self.error_messages = error_messages
def __str__(self) -> str:
return '\n'.join(
[self.message] +
[f'{k}: {v[1]} {v[2]} (in reply to {v[0]})'
for k, v in self.error_messages.items()]
)
return '\n'.join([self.message] + [
f'{host}: {message.code} {message.text} '
f'(in reply to {message.command})'
for host, message in self.error_messages.items()
])
class AddressNotDeliverableError(SMTPError):

View File

@ -1,7 +1,7 @@
from logging import getLogger
from smtplib import (
SMTP, SMTPNotSupportedError, SMTPResponseException, SMTPServerDisconnected)
from typing import List, Optional
from typing import List, Optional, Tuple
from dns.exception import Timeout
from dns.rdatatype import MX as rdtype_mx
@ -14,7 +14,7 @@ from .email_address import EmailAddress
from .exceptions import (
AddressNotDeliverableError, DNSConfigurationError, DNSTimeoutError,
DomainNotFoundError, NoMXError, NoNameserverError, NoValidMXError,
SMTPCommunicationError, SMTPTemporaryError)
SMTPCommunicationError, SMTPMessage, SMTPTemporaryError)
LOGGER = getLogger(name=__name__)
@ -71,11 +71,10 @@ class _SMTPChecker(SMTP):
Also, a new method `check` is added to run the check for a given
list of SMTP servers.
"""
def __init__(
self, local_hostname: str, timeout: float, debug: bool,
raise_communication_errors: bool,
raise_temporary_errors: bool,
sender: str, recip: str):
sender: EmailAddress, recip: EmailAddress):
"""
Initialize the object with all the parameters which remain
constant during the check of one email address on all the SMTP
@ -83,8 +82,6 @@ class _SMTPChecker(SMTP):
"""
super().__init__(local_hostname=local_hostname, timeout=timeout)
self.set_debuglevel(debuglevel=2 if debug else False)
self.__raise_communication_errors = raise_communication_errors
self.__raise_temporary_errors = raise_temporary_errors
self.__sender = sender
self.__recip = recip
self.__communication_errors = {}
@ -92,7 +89,7 @@ class _SMTPChecker(SMTP):
# Avoid error on close() after unsuccessful connect
self.sock = None
def putcmd(self, cmd, args=""):
def putcmd(self, cmd: str, args: str = ''):
"""
Like `smtplib.SMTP.putcmd`, but remember the command for later
use in error messages.
@ -101,21 +98,25 @@ class _SMTPChecker(SMTP):
self.__command = f'{cmd} {args}'
else:
self.__command = cmd
super().putcmd(cmd, args)
super().putcmd(cmd=cmd, args=args)
def connect(self, host, *args, **kwargs):
def connect(
self, host: str = 'localhost', port: int = 0,
source_address: str = None) -> Tuple[int, str]:
"""
Like `smtplib.SMTP.connect`, but raise appropriate exceptions on
connection failure or negative SMTP server response.
A code > 400 is an error here.
"""
self.__command = 'connect' # Used for error messages.
self._host = host # Missing in standard smtplib!
self._host = host # Workaround: Missing in standard smtplib!
try:
code, message = super().connect(host, *args, **kwargs)
code, message = super().connect(
host=host, port=port, source_address=source_address)
except OSError as error:
raise SMTPServerDisconnected(str(error))
if code >= 400:
raise SMTPResponseException(code, message)
raise SMTPResponseException(code=code, msg=message)
return code, message
def starttls(self, *args, **kwargs):
@ -132,30 +133,34 @@ class _SMTPChecker(SMTP):
# SSL/TLS support is not available to your Python interpreter
pass
def mail(self, *args, **kwargs):
def mail(self, sender: str, options: tuple = ()):
"""
Like `smtplib.SMTP.mail`, but raise an appropriate exception on
negative SMTP server response.
A code > 400 is an error here.
"""
code, message = super().mail(*args, **kwargs)
code, message = super().mail(sender=sender, options=options)
if code >= 400:
raise SMTPResponseException(code, message)
return code, message
def rcpt(self, *args, **kwargs):
def rcpt(self, recip: str, options: tuple = ()):
"""
Like `smtplib.SMTP.rcpt`, but handle negative SMTP server
responses directly.
"""
code, message = super().rcpt(*args, **kwargs)
code, message = super().rcpt(recip=recip, options=options)
if code >= 500:
# Address clearly invalid: issue negative result
raise AddressNotDeliverableError({self._host: (
'RCPT TO', code, message.decode(errors='ignore'))})
raise AddressNotDeliverableError({
self._host: SMTPMessage(
command='RCPT TO', code=code,
text=message.decode(errors='ignore'))})
elif code >= 400:
# Temporary error on this host: collect message
self.__temporary_errors[self._host] = (
'RCPT TO', code, message.decode(errors='ignore'))
self.__temporary_errors[self._host] = SMTPMessage(
command='RCPT TO', code=code,
text=message.decode(errors='ignore'))
return code, message
def quit(self):
@ -173,55 +178,50 @@ class _SMTPChecker(SMTP):
def _check_one(self, host: str) -> bool:
"""
Run the check for one SMTP server. On positive result, return
`True`. On negative result, raise `AddressNotDeliverableError`.
On ambiguous result (4xx response to `RCPT TO`) or any
communication issue before even reaching `RCPT TO` in the
protocol, collect error message for later use and return
`False`.
Run the check for one SMTP server.
Return `True` on positive result.
Return `False` on ambiguous result (4xx response to `RCPT TO`),
while collecting the error message for later use.
Raise `AddressNotDeliverableError`. on negative result.
"""
try:
self.connect(host)
self.connect(host=host)
self.starttls()
self.ehlo_or_helo_if_needed()
self.mail(self.__sender)
code, message = self.rcpt(self.__recip)
self.mail(sender=self.__sender.ace)
code, message = self.rcpt(recip=self.__recip.ace)
except SMTPServerDisconnected as e:
self.__communication_errors[self._host] = (
self.__command, 0, str(e))
self.__communication_errors[self._host] = SMTPMessage(
command=self.__command, code=0, text=str(e))
return False
except SMTPResponseException as e:
self.__communication_errors[self._host] = (
self.__command, e.smtp_code,
e.smtp_error.decode(errors='ignore'))
self.__communication_errors[self._host] = SMTPMessage(
command=self.__command, code=e.smtp_code,
text=e.smtp_error.decode(errors='ignore'))
return False
finally:
self.quit()
return (code < 400)
return code < 400
def check(self, hosts: List[str]) -> Optional[bool]:
"""
Run the check for all given SMTP servers. On positive result,
return `True`. On negative result, raise
`AddressNotDeliverableError`. On ambiguous result (4xx
response(s) to `RCPT TO`) or any communication issue(s) before
even reaching `RCPT TO` in the protocol, either raise an
exception or return `None` depending on the parameters.
return `True`, else raise exceptions described in `mx_check`.
"""
for host in hosts:
if self.debuglevel > 0:
LOGGER.debug(msg=f'Trying {host} ...')
if self._check_one(host):
if self._check_one(host=host):
return True
# Raise exceptions on ambiguous results if desired. If in doubt, raise
# the CommunicationError because that one might point to local
# configuration or blacklisting issues.
if self.__communication_errors and self.__raise_communication_errors:
# Raise appropriate exceptions when necessary
if self.__communication_errors:
raise SMTPCommunicationError(self.__communication_errors)
if self.__temporary_errors and self.__raise_temporary_errors:
elif self.__temporary_errors:
raise SMTPTemporaryError(self.__temporary_errors)
# Can't verify whether or not email address exists.
return None
# Can't verify whether or not email address exists, return None
def mx_check(
@ -229,32 +229,22 @@ def mx_check(
from_address: Optional[EmailAddress] = None,
helo_host: Optional[str] = None, smtp_timeout: int = 10,
dns_timeout: int = 10, skip_smtp: bool = False,
raise_communication_errors: bool = False,
raise_temporary_errors: bool = False
) -> Optional[bool]:
) -> Optional[bool]:
"""
Verify the given email address by determining the SMTP servers
responsible for the domain and then asking them to deliver an
email to the address. Before the actual message is sent, the
process is interrupted.
Returns `True` as soon as the any server accepts the recipient
address.
Raises a `AddressNotDeliverableError` if any server unambiguously
Raise an `AddressNotDeliverableError` if any server unambiguously
and permanently refuses to accept the recipient address.
If the server answers with a temporary error code, the validity of
the email address can not be determined. In that case, the function
returns `None`, or an `SMTPTemporaryError` is raised, dependent on
the value of `raise_temporary_errors`. Greylisting is a frequent
cause of this.
Raise `SMTPTemporaryError` if the server answers with a temporary
error code when validity of the email address can not be
determined. Greylisting is a frequent cause of this.
If the SMTP server(s) reply with an error message to any of the
communication steps before the recipient address is checked, the
validity of the email address can not be determined either. In that
case, the function returns `None`, or an `SMTPCommunicationError` is
raised, dependent on the value of `raise_communication_errors`.
Raise `SMTPCommunicationError` if the SMTP server(s) reply with an
error message to any of the communication steps before the recipient
address is checked, and the validity of the email address can not be
determined either.
In case no responsible SMTP servers can be determined, a variety of
exceptions is raised depending on the exact issue, all derived from
@ -269,8 +259,6 @@ def mx_check(
if skip_smtp:
return True
smtp_checker = _SMTPChecker(
local_hostname=helo_host, timeout=smtp_timeout, debug=debug,
raise_communication_errors=raise_communication_errors,
raise_temporary_errors=raise_temporary_errors,
sender=from_address.ace, recip=email_address.ace)
return smtp_checker.check(mx_records)
local_hostname=helo_host, timeout=smtp_timeout, debug=debug,
sender=from_address, recip=email_address)
return smtp_checker.check(hosts=mx_records)

View File

@ -4,24 +4,36 @@ from typing import Optional
from .domainlist_check import domainlist_check
from .email_address import EmailAddress
from .exceptions import (
AddressFormatError, EmailValidationError, FromAddressFormatError)
AddressFormatError, EmailValidationError, FromAddressFormatError,
SMTPTemporaryError)
from .mx_check import mx_check
from .regex_check import regex_check
LOGGER = getLogger(name=__name__)
__doc__ = """\
Verify the given email address by determining the SMTP servers
responsible for the domain and then asking them to deliver an email to
the address. Before the actual message is sent, the process is
interrupted.
PLEASE NOTE: Some email providers only tell the actual delivery failure
AFTER having delivered the body which this module doesn't, while others
simply accept everything and send a bounce notification later. Hence, a
100% proper response is not guaranteed.
"""
def validate_email_or_fail(
email_address: str, check_regex: bool = True, check_mx: bool = True,
from_address: Optional[str] = None, helo_host: Optional[str] = None,
smtp_timeout: int = 10, dns_timeout: int = 10,
use_blacklist: bool = True, debug: bool = False,
skip_smtp: bool = False, raise_communication_errors: bool = False,
raise_temporary_errors: bool = False) -> Optional[bool]:
skip_smtp: bool = False) -> Optional[bool]:
"""
Return `True` if the email address validation is successful, `None` if the
validation result is ambigious, and raise an exception if the validation
fails.
Return `True` if the email address validation is successful, `None`
if the validation result is ambigious, and raise an exception if the
validation fails.
"""
email_address = EmailAddress(email_address)
if from_address is not None:
@ -39,9 +51,7 @@ def validate_email_or_fail(
return mx_check(
email_address=email_address, from_address=from_address,
helo_host=helo_host, smtp_timeout=smtp_timeout,
dns_timeout=dns_timeout, skip_smtp=skip_smtp, debug=debug,
raise_communication_errors=raise_communication_errors,
raise_temporary_errors=raise_temporary_errors)
dns_timeout=dns_timeout, skip_smtp=skip_smtp, debug=debug)
def validate_email(email_address: str, *args, **kwargs):
@ -53,6 +63,11 @@ def validate_email(email_address: str, *args, **kwargs):
"""
try:
return validate_email_or_fail(email_address, *args, **kwargs)
except SMTPTemporaryError as error:
message = f'Validation for {email_address!r} ambigious: {error}'
if kwargs.get('debug'):
LOGGER.warning(msg=message)
return
except EmailValidationError as error:
message = f'Validation for {email_address!r} failed: {error}'
if kwargs.get('debug'):