Improve emulation of true SMTP process

* Check the SMTP servers in order of priority instead of random order.
* Handle SMTPServerDisconnected like a 451 status as recommended by RFC
  5321.
* Exit early by directly raising CommunicationError on the first 5xx
  SMTPResponseException.

See also the discussion at
https://github.com/karolyi/py3-validate-email/discussions/61
This commit is contained in:
Reinhard Müller 2021-03-11 16:09:09 +01:00
parent 1d5e7810ae
commit 1b9b0682cd
4 changed files with 43 additions and 34 deletions

View File

@ -19,6 +19,21 @@ class DnsNameStub(object):
return self.value
class DnsRRsetStub(object):
'Stub for `dns.rrset.RRset`.'
def __init__(self, hostnames: list):
self.names = [
SimpleNamespace(exchange=DnsNameStub(value=x)) for x in hostnames]
def processing_order(self):
return self.names
def _answer(hostnames: list):
return SimpleNamespace(rrset=DnsRRsetStub(hostnames=hostnames))
TEST_QUERY = Mock()
@ -28,8 +43,7 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_invalid_hostnames(self):
'Fails when an MX hostname is "."'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='.'))]
TEST_QUERY.return_value = _answer(hostnames=['.'])
with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain1', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@ -37,8 +51,7 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_null_hostnames(self):
'Fails when an MX hostname is invalid.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe'))]
TEST_QUERY.return_value = _answer(hostnames=['asdqwe'])
with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain2', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@ -46,14 +59,13 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_filters_out_invalid_hostnames(self):
'Returns only the valid hostnames.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe.')),
SimpleNamespace(exchange=DnsNameStub(value='.')),
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')),
# This is an intentional duplicate.
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')),
SimpleNamespace(exchange=DnsNameStub(value='valid2.host.')),
]
TEST_QUERY.return_value = _answer(hostnames=[
'asdqwe.',
'.',
'valid.host.',
'valid.host.', # This is an intentional duplicate.
'valid2.host.',
])
result = _get_cleaned_mx_records(domain='testdomain3', timeout=10)
self.assertListEqual(result, ['valid.host', 'valid2.host'])

View File

@ -2,8 +2,7 @@ from smtplib import SMTP
from unittest.case import TestCase
from unittest.mock import patch
from validate_email.exceptions import (
SMTPCommunicationError, SMTPMessage, SMTPTemporaryError)
from validate_email.exceptions import SMTPMessage, SMTPTemporaryError
from validate_email.smtp_check import _SMTPChecker
@ -12,16 +11,16 @@ class SMTPCheckerTest(TestCase):
@patch.object(target=SMTP, attribute='connect')
def test_connect_raises_serverdisconnected(self, mock_connect):
'Connect raises `SMTPServerDisconnected`.'
'Connect raises `SMTPTemporaryError`.'
mock_connect.side_effect = OSError('test message')
checker = _SMTPChecker(
local_hostname='localhost', timeout=5, debug=False,
sender='test@example.com', recip='test@example.com')
with self.assertRaises(SMTPCommunicationError) as exc:
with self.assertRaises(SMTPTemporaryError) as exc:
checker.check(hosts=['testhost'])
self.assertDictEqual(exc.exception.error_messages, {
'testhost': SMTPMessage(
command='connect', code=0, text='test message')
command='connect', code=451, text='test message')
})
@patch.object(target=SMTP, attribute='connect')

View File

@ -11,12 +11,12 @@ from .exceptions import (
NoNameserverError, NoValidMXError)
def _get_mx_records(domain: str, timeout: int) -> list:
def _get_mx_records(domain: str, timeout: int) -> Answer:
'Return the DNS response for checking, optionally raise exceptions.'
try:
return resolve(
qname=domain, rdtype=rdtype_mx, lifetime=timeout,
search=True) # type: Answer
search=True)
except NXDOMAIN:
raise DomainNotFoundError
except NoNameservers:
@ -34,10 +34,10 @@ def _get_cleaned_mx_records(domain: str, timeout: int) -> list:
Return a list of hostnames in the MX record, raise an exception on
any issues.
"""
records = _get_mx_records(domain=domain, timeout=timeout)
answer = _get_mx_records(domain=domain, timeout=timeout)
to_check = list()
host_set = set()
for record in records: # type: MX
for record in answer.rrset.processing_order(): # type: MX
dns_str = record.exchange.to_text().rstrip('.') # type: str
if dns_str in host_set:
continue

View File

@ -38,7 +38,6 @@ class _SMTPChecker(SMTP):
self.set_debuglevel(debuglevel=2 if debug else False)
self.__sender = sender
self.__recip = recip
self.__communication_errors = {}
self.__temporary_errors = {}
# Avoid error on close() after unsuccessful connect
self.sock = None
@ -144,15 +143,16 @@ class _SMTPChecker(SMTP):
self.mail(sender=self.__sender.ace)
code, message = self.rcpt(recip=self.__recip.ace)
except SMTPServerDisconnected as e:
self.__communication_errors[self._host] = SMTPMessage(
command=self.__command, code=0, text=str(e))
self.__temporary_errors[self._host] = SMTPMessage(
command=self.__command, code=451, text=str(e))
return False
except SMTPResponseException as e:
smtp_message = SMTPMessage(
command=self.__command, code=e.smtp_code,
text=e.smtp_error.decode(errors='ignore'))
if e.smtp_code >= 500:
self.__communication_errors[self._host] = smtp_message
raise SMTPCommunicationError(
error_messages={self._host: smtp_message})
else:
self.__temporary_errors[self._host] = smtp_message
return False
@ -169,11 +169,8 @@ class _SMTPChecker(SMTP):
LOGGER.debug(msg=f'Trying {host} ...')
if self._check_one(host=host):
return True
# Raise appropriate exceptions when necessary
if self.__communication_errors:
raise SMTPCommunicationError(
error_messages=self.__communication_errors)
elif self.__temporary_errors:
# Raise exception for collected temporary errors
if self.__temporary_errors:
raise SMTPTemporaryError(error_messages=self.__temporary_errors)
@ -188,11 +185,12 @@ def smtp_check(
Raise an `AddressNotDeliverableError` if any server unambiguously
and permanently refuses to accept the recipient address.
Raise `SMTPTemporaryError` if the server answers with a temporary
error code when validity of the email address can not be determined.
Greylisting or server delivery issues can be a cause for this.
Raise `SMTPTemporaryError` if all the servers answer with a
temporary error code during the SMTP communication. This means that
the validity of the email address can not be determined. Greylisting
or server delivery issues can be a cause for this.
Raise `SMTPCommunicationError` if the SMTP server(s) reply with an
Raise `SMTPCommunicationError` if any SMTP server replies with an
error message to any of the communication steps before the recipient
address is checked, and the validity of the email address can not be
determined either.