Next step towards 1.0.0 #63

Merged
reinhard-mueller merged 6 commits from master into version-1.0.0 2021-03-14 13:54:32 +01:00
12 changed files with 344 additions and 133 deletions

View File

@ -1,12 +1,21 @@
1.0.0: 1.0.0:
- New major release with breaking changes! They are: - New major release with breaking changes! They are:
- Parameter names for validate_email() and validate_email_or_fail() have changed:
- check_regex -> check_format
- use_blacklist -> check_blacklist
- check_mx -> check_dns
- skip_smtp -> check_smtp (with inverted logic)
- helo_host -> smtp_helo_host
- from_address -> smtp_from_address
- debug -> smtp_debug
- All parameters except for the first one (the email address to check) are now keyword-only.
- Ambiguous results and the possibility of more of them, to reflect a real world SMTP delivery process: - Ambiguous results and the possibility of more of them, to reflect a real world SMTP delivery process:
- The module will keep trying probing through all MX hosts for validation and emit errors in the end of the full probing procedure. - The module tries all MX hosts in order of priority.
- Any acceptance of the email delivery will be marked as valid, despite any other ambigious or negative result(s). - An acceptance of the email address will yield a positive verification result, no further MX hosts will be tried.
- The validate_email_or_fail() function will raise an SMTPCommunicationError() on a denied email address only in the end. - Any permanent SMTP error (5xx) will yield a negative verification result, no further MX hosts will be tried.
- The validate_email_or_fail() function will now raise an SMTPTemporaryError() on an ambiguous result. That is, greylisting or no servers providing a definitive negative or positive. - Any temporary SMTP error (4xx) or any connection issue will cause the next MX host to be tried. Only if all MX hosts yield these kinds of errors, the overall verification result will be ambiguous. That is, greylisting or no servers providing a definitive negative or positive.
- A server that bails out with a 4xx code at any part of the SMTP conversation, will be marked as ambiguous. - The validate_email_or_fail() function will now raise an SMTPTemporaryError() on an ambiguous result.
- Both of the aforementioned exceptions will contain the occurred communication results in their error_messages class variables. - All exceptions raised by the SMTP check will contain the occurred communication results in their error_messages class variables.
- Internal API changes (refactorings) - Internal API changes (refactorings)
- Check results are now logged with info level, instead of emitting warnings when debug is turned on. - Check results are now logged with info level, instead of emitting warnings when debug is turned on.
- Props to @reinhard-mueller for coming up with the new proposals and helping in refining the idea. - Props to @reinhard-mueller for coming up with the new proposals and helping in refining the idea.

39
FAQ.md
View File

@ -2,15 +2,23 @@
## The module provides false positives: ## The module provides false positives:
Some SMTP Servers (Yahoo's servers for example) are only rejecting The function of this module, and specifically of the SMTP check, relies
on the assumption that the mail server declared responsible for an email
domain will immediately reject any nonexistent address.
Some SMTP servers (Yahoo's servers for example) are only rejecting
nonexistent emails after the end of `DATA` command has been provided in nonexistent emails after the end of `DATA` command has been provided in
the conversation with the server. This module only goes until the the conversation with the server. This module only goes until the
`RCPT TO` and says it's valid if it doesn't get rejected there, since `RCPT TO` and says it's valid if it doesn't get rejected there, since
the `DATA` part of the email is the email body itself. There's not much the `DATA` part of the email is the email body itself.
one can do with it, you have to accept false positives in the case of
yahoo.com and some other providers. I'm not sure if rejecting emails Other SMTP servers accept emails even for nonexistent recipient
after the `DATA` command is a valid behavior based on the SMTP RFC, but addresses and forward them to a different server which will create a
I wouldn't wonder if not. bounce message in a second step. This is the case for many email domains
hosted at Microsoft.
In both cases, there's nothing we can do about it, as the mail server
we talk to seemingly accepts the email address.
## Everything gets rejected: ## Everything gets rejected:
@ -36,7 +44,7 @@ Run this code with the module installed (use your parameters within),
and see the output: and see the output:
```python ```python
python -c 'import logging, sys; logging.basicConfig(stream=sys.stderr, level=logging.DEBUG); from validate_email import validate_email; print(validate_email(\'your.email@address.com\', check_mx=True, debug=True))' python -c 'import logging, sys; logging.basicConfig(stream=sys.stderr, level=logging.DEBUG); from validate_email import validate_email; print(validate_email(\'your.email@address.com\', smtp_debug=True))'
``` ```
If you still don't understand why your code doesn't work as expected by If you still don't understand why your code doesn't work as expected by
@ -44,3 +52,20 @@ looking at the the logs, then (and only then) add an issue explaining
your problem with a REPRODUCIBLE example, and the output of your test your problem with a REPRODUCIBLE example, and the output of your test
run. run.
## How can I pass my email account's credentials? How can I use port 465 or 587 when my provider blocks port 25?
The credentials you got from your email provider, as well as the
instruction to use port 465 or 587, refer to *your provider's* server
for *outgoing* emails.
This module, however, directly talks to the *recipient's* server for
*incoming* emails, so neither your credentials nor the switch to port
465 or 587 is of any use here.
If your internet connection is within an IP pool (often the case for
private use) or it doesn't have a proper reverse DNS entry, the servers
for many email domains (depending on their configuration) will reject
connections from you. This can *not* be solved by using your provider's
mail server. Instead, you have to use the library on a machine with an
internet connection with static IP address and a proper reverse DNS
entry.

View File

@ -25,32 +25,135 @@ USAGE
Basic usage:: Basic usage::
from validate_email import validate_email from validate_email import validate_email
is_valid = validate_email(email_address='example@example.com', check_regex=True, check_mx=True, from_address='my@from.addr.ess', helo_host='my.host.name', smtp_timeout=10, dns_timeout=10, use_blacklist=True, debug=False) is_valid = validate_email(email_address='example@example.com', check_format=True, check_blacklist=True, check_dns=True, dns_timeout=10, check_smtp=True, smtp_timeout=10, smtp_helo_host='my.host.name', smtp_from_address='my@from.addr.ess', smtp_debug=False)
:code:`check_regex` will check will the email address has a valid structure and defaults to True Parameters
----------------------------
:code:`check_mx`: check the mx-records and check whether the email actually exists :code:`email_address`: the email address to check
:code:`from_address`: the email address the probe will be sent from :code:`check_format`: check whether the email address has a valid structure; defaults to :code:`True`
:code:`helo_host`: the host to use in SMTP HELO when checking for an email :code:`check_blacklist`: check the email against the blacklist of domains downloaded from https://github.com/martenson/disposable-email-domains; defaults to :code:`True`
:code:`smtp_timeout`: seconds until SMTP timeout :code:`check_dns`: check the DNS mx-records, defaults to :code:`True`
:code:`dns_timeout`: seconds until DNS timeout; defaults to 10 seconds
:code:`dns_timeout`: seconds until DNS timeout :code:`check_smtp`: check whether the email actually exists by initiating an SMTP conversation; defaults to :code:`True`
:code:`use_blacklist`: use the blacklist of domains downloaded from https://github.com/martenson/disposable-email-domains :code:`smtp_timeout`: seconds until SMTP timeout; defaults to 10 seconds
:code:`debug`: emit debug/warning messages while checking email :code:`smtp_helo_host`: the hostname to use in SMTP HELO/EHLO; if set to :code:`None` (the default), the fully qualified domain name of the local host is used
:code:`skip_smtp`: (default :code:`False`) skip the SMTP conversation with the server, after MX checks. Will automatically be set to :code:`True` when :code:`check_mx` is :code:`False`! :code:`smtp_from_address`: the email address used for the sender in the SMTP conversation; if set to :code:`None` (the default), the :code:`email_address` parameter is used as the sender as well
:code:`smtp_debug`: activate :code:`smtplib`'s debug output which always goes to stderr; defaults to :code:`False`
Result
----------------------------
The function :code:`validate_email()` returns the following results:
:code:`True`
All requested checks were successful for the given email address.
:code:`False`
At least one of the requested checks failed for the given email address.
:code:`None`
None of the requested checks failed, but at least one of them yielded an ambiguous result. Currently, the SMTP check is the only check which can actually yield an ambigous result.
Getting more information
----------------------------
The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure and ambiguous result instead of returning :code:`False` or :code:`None`, respectively. The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure and ambiguous result instead of returning :code:`False` or :code:`None`, respectively.
All these exceptions descend from :code:`EmailValidationError`. Please see below for the exact exceptions raised by the various checks. Note that all exception classes are defined in the module :code:`validate_email.exceptions`.
Please note that :code:`SMTPTemporaryError` indicates an ambigous check result rather than a check failure, so if you use :code:`validate_email_or_fail()`, you probably want to catch this exception.
The checks
============================
By default, all checks are enabled, but each of them can be disabled by one of the :code:`check_...` parameters. Note that, however, :code:`check_smtp` implies :code:`check_dns`.
:code:`check_format`
----------------------------
Check whether the given email address conforms to the general format requirements of valid email addresses.
:code:`validate_email_or_fail()` raises :code:`AddressFormatError` on any failure of this test.
:code:`check_blacklist`
----------------------------
Check whether the domain part of the given email address (the part behind the "@") is known as a disposable and temporary email address domain. These are often used to register dummy users in order to spam or abuse some services.
A list of such domains is maintained at https://github.com/martenson/disposable-email-domains, and this module uses that list.
:code:`validate_email_or_fail()` raises :code:`DomainBlacklistedError` if the email address belongs to a blacklisted domain.
:code:`check_dns`
----------------------------
Check whether there is a valid list of servers responsible for delivering emails to the given email address.
First, a DNS query is issued for the email address' domain to retrieve a list of all MX records. That list is then stripped of duplicates and malformatted entries. If at the end of this procedure, at least one valid MX record remains, the check is considered successful.
On failure of this check, :code:`validate_email_or_fail()` raises one of the following exceptions, all of which descend from :code:`DNSError`:
:code:`DomainNotFoundError`
The domain of the email address cannot be found at all.
:code:`NoNameserverError`
There is no nameserver for the domain.
:code:`DNSTimeoutError`
A timeout occured when querying the nameserver. Note that the timeout period can be changed with the :code:`dns_timeout` parameter.
:code:`DNSConfigurationError`
The nameserver is misconfigured.
:code:`NoMXError`
The nameserver does not list any MX records for the domain.
:code:`NoValidMXError`
The nameserver lists MX records for the domain, but none of them is valid.
:code:`check_smtp`
----------------------------
Check whether the given email address exists by simulating an actual email delivery.
A connection to the SMTP server identified through the domain's MX record is established, and an SMTP conversation is initiated up to the point where the server confirms the existence of the email address. After that, instead of actually sending an email, the conversation is cancelled.
The module will try to negotiate a TLS connection with STARTTLS, and silently fall back to an unencrypted SMTP connection if the server doesn't support it. The module will try to negotiate a TLS connection with STARTTLS, and silently fall back to an unencrypted SMTP connection if the server doesn't support it.
If the SMTP server replies to the :code:`RCPT TO` command with a code 250 (success) response, the check is considered successful.
If the SMTP server replies with a code 5xx (permanent error) response at any point in the conversation, the check is considered failed.
If the SMTP server cannot be connected, unexpectedly closes the connection, or replies with a code 4xx (temporary error) at any stage of the conversation, the check is considered ambiguous.
If there is more than one valid MX record for the domain, they are tried in order of priority until the first time the check is either successful or failed. Only in case of an ambiguous check result, the next server is tried, and only if the check result is ambiguous for all servers, the overall check is considered ambigous as well.
On failure of this check or on ambiguous result, :code:`validate_email_or_fail()` raises one of the following exceptions, all of which descend from :code:`SMTPError`:
:code:`AddressNotDeliverableError`
The SMTP server permanently refused the email address. Technically, this means that the server replied to the :code:`RCPT TO` command with a code 5xx response.
:code:`SMTPCommunicationError`
The SMTP server refused to even let us get to the point where we could ask it about the email address. Technically, this means that the server sent a code 5xx response either immediately after connection, or as a reply to the :code:`EHLO` (or :code:`HELO`) or :code:`MAIL FROM` commands.
:code:`SMTPTemporaryError`
A temporary error occured during the check for all available MX servers. This is considered an ambigous check result. For example, greylisting is a frequent cause for this.
All of the above three exceptions provide further detail about the error response(s) in the exception's instance variable :code:`error_messages`.
Auto-updater Auto-updater
============================ ============================
The package contains an auto-updater for downloading and updating the built-in blacklist.txt. It will run on each module load (and installation), but will try to update the content only if the file is older than 5 days, and if the content is not the same that's already downloaded. The package contains an auto-updater for downloading and updating the built-in blacklist.txt. It will run on each module load (and installation), but will try to update the content only if the file is older than 5 days, and if the content is not the same that's already downloaded.
The update can be triggered manually:: The update can be triggered manually::
@ -68,4 +171,5 @@ The update can be triggered manually::
Read the FAQ_! Read the FAQ_!
============================ ============================
.. _FAQ: https://github.com/karolyi/py3-validate-email/blob/master/FAQ.md .. _FAQ: https://github.com/karolyi/py3-validate-email/blob/master/FAQ.md

View File

@ -20,20 +20,20 @@ class BlacklistCheckTestCase(TestCase):
domainlist_check(EmailAddress('pm2@mailinator.com')) domainlist_check(EmailAddress('pm2@mailinator.com'))
with self.assertRaises(DomainBlacklistedError): with self.assertRaises(DomainBlacklistedError):
validate_email_or_fail( validate_email_or_fail(
email_address='pm2@mailinator.com', check_regex=False, email_address='pm2@mailinator.com', check_format=False,
use_blacklist=True) check_blacklist=True)
with self.assertRaises(DomainBlacklistedError): with self.assertRaises(DomainBlacklistedError):
validate_email_or_fail( validate_email_or_fail(
email_address='pm2@mailinator.com', check_regex=True, email_address='pm2@mailinator.com', check_format=True,
use_blacklist=True) check_blacklist=True)
with self.assertLogs(): with self.assertLogs():
self.assertFalse(expr=validate_email( self.assertFalse(expr=validate_email(
email_address='pm2@mailinator.com', check_regex=False, email_address='pm2@mailinator.com', check_format=False,
use_blacklist=True, debug=True)) check_blacklist=True))
with self.assertLogs(): with self.assertLogs():
self.assertFalse(expr=validate_email( self.assertFalse(expr=validate_email(
email_address='pm2@mailinator.com', check_regex=True, email_address='pm2@mailinator.com', check_format=True,
use_blacklist=True, debug=True)) check_blacklist=True))
def test_blacklist_negative(self): def test_blacklist_negative(self):
'Allows a domain not in the blacklist.' 'Allows a domain not in the blacklist.'

View File

@ -19,6 +19,21 @@ class DnsNameStub(object):
return self.value return self.value
class DnsRRsetStub(object):
'Stub for `dns.rrset.RRset`.'
def __init__(self, hostnames: list):
self.names = [
SimpleNamespace(exchange=DnsNameStub(value=x)) for x in hostnames]
def processing_order(self):
return self.names
def _answer(hostnames: list):
return SimpleNamespace(rrset=DnsRRsetStub(hostnames=hostnames))
TEST_QUERY = Mock() TEST_QUERY = Mock()
@ -28,8 +43,7 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY) @patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_invalid_hostnames(self): def test_fails_with_invalid_hostnames(self):
'Fails when an MX hostname is "."' 'Fails when an MX hostname is "."'
TEST_QUERY.return_value = [ TEST_QUERY.return_value = _answer(hostnames=['.'])
SimpleNamespace(exchange=DnsNameStub(value='.'))]
with self.assertRaises(NoValidMXError) as exc: with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain1', timeout=10) _get_cleaned_mx_records(domain='testdomain1', timeout=10)
self.assertTupleEqual(exc.exception.args, ()) self.assertTupleEqual(exc.exception.args, ())
@ -37,8 +51,7 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY) @patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_null_hostnames(self): def test_fails_with_null_hostnames(self):
'Fails when an MX hostname is invalid.' 'Fails when an MX hostname is invalid.'
TEST_QUERY.return_value = [ TEST_QUERY.return_value = _answer(hostnames=['asdqwe'])
SimpleNamespace(exchange=DnsNameStub(value='asdqwe'))]
with self.assertRaises(NoValidMXError) as exc: with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain2', timeout=10) _get_cleaned_mx_records(domain='testdomain2', timeout=10)
self.assertTupleEqual(exc.exception.args, ()) self.assertTupleEqual(exc.exception.args, ())
@ -46,14 +59,13 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY) @patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_filters_out_invalid_hostnames(self): def test_filters_out_invalid_hostnames(self):
'Returns only the valid hostnames.' 'Returns only the valid hostnames.'
TEST_QUERY.return_value = [ TEST_QUERY.return_value = _answer(hostnames=[
SimpleNamespace(exchange=DnsNameStub(value='asdqwe.')), 'asdqwe.',
SimpleNamespace(exchange=DnsNameStub(value='.')), '.',
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')), 'valid.host.',
# This is an intentional duplicate. 'valid.host.', # This is an intentional duplicate.
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')), 'valid2.host.',
SimpleNamespace(exchange=DnsNameStub(value='valid2.host.')), ])
]
result = _get_cleaned_mx_records(domain='testdomain3', timeout=10) result = _get_cleaned_mx_records(domain='testdomain3', timeout=10)
self.assertListEqual(result, ['valid.host', 'valid2.host']) self.assertListEqual(result, ['valid.host', 'valid2.host'])

View File

@ -1,39 +1,100 @@
from smtplib import SMTP from smtplib import SMTPServerDisconnected
from socket import timeout
from unittest.case import TestCase from unittest.case import TestCase
from unittest.mock import patch from unittest.mock import patch
from validate_email.email_address import EmailAddress
from validate_email.exceptions import ( from validate_email.exceptions import (
SMTPCommunicationError, SMTPMessage, SMTPTemporaryError) AddressNotDeliverableError, SMTPCommunicationError, SMTPTemporaryError)
from validate_email.smtp_check import _SMTPChecker from validate_email.smtp_check import _SMTPChecker, smtp_check
class SMTPCheckerTest(TestCase): class SMTPMock(_SMTPChecker):
'Checking the `_SMTPChecker` class methods.' """
Mock replacement for the SMTP connection.
@patch.object(target=SMTP, attribute='connect') Instead of really communicating with an SMTP server, this class
def test_connect_raises_serverdisconnected(self, mock_connect): works with predefined fake responses. By default, the responses
'Connect raises `SMTPServerDisconnected`.' emulate a successful SMTP conversation, but it can be turned into an
mock_connect.side_effect = OSError('test message') unsuccessful one by patching the `reply` dictionary.
checker = _SMTPChecker( """
local_hostname='localhost', timeout=5, debug=False, reply = {
sender='test@example.com', recip='test@example.com') None: (220, b'Welcome'),
with self.assertRaises(SMTPCommunicationError) as exc: "EHLO": (502, b'Please use HELO'),
checker.check(hosts=['testhost']) 'HELO': (220, b'HELO successful'),
self.assertDictEqual(exc.exception.error_messages, { 'MAIL': (250, b'MAIL FROM successful'),
'testhost': SMTPMessage( 'RCPT': (250, b'RCPT TO successful'),
command='connect', code=0, text='test message') 'QUIT': (221, b'QUIT successful'),
}) }
@patch.object(target=SMTP, attribute='connect') last_command = None
def test_connect_with_error(self, mock_connect):
'Connect raises `SMTPTemporaryError`.' def _get_socket(self, host, port, timeout):
checker = _SMTPChecker( return None
local_hostname='localhost', timeout=5, debug=False,
sender='test@example.com', recip='test@example.com') def send(self, s):
mock_connect.return_value = (400, b'test delay message') self.last_command = s[:4].upper()
with self.assertRaises(SMTPTemporaryError) as exc:
checker.check(hosts=['testhost']) def getreply(self):
self.assertDictEqual(exc.exception.error_messages, { if isinstance(self.reply[self.last_command], Exception):
'testhost': SMTPMessage( self.close()
command='connect', code=400, text='test delay message') raise self.reply[self.last_command]
}) return self.reply[self.last_command]
class SMTPCheckTest(TestCase):
'Collection of tests the `smtp_check` method.'
# All the possible ways to fail we want to test, listed as tuples
# containing (command, reply, expected exception).
failures = [
# Timeout on connection
(None, timeout(), SMTPTemporaryError),
# Connection unexpectedly closed during any stage
(None, SMTPServerDisconnected('Test'), SMTPTemporaryError),
('EHLO', SMTPServerDisconnected('Test'), SMTPTemporaryError),
('HELO', SMTPServerDisconnected('Test'), SMTPTemporaryError),
('MAIL', SMTPServerDisconnected('Test'), SMTPTemporaryError),
('RCPT', SMTPServerDisconnected('Test'), SMTPTemporaryError),
# Temporary error codes
(None, (421, b'Connect failed'), SMTPTemporaryError),
('HELO', (421, b'HELO failed'), SMTPTemporaryError),
('MAIL', (451, b'MAIL FROM failed'), SMTPTemporaryError),
('RCPT', (451, b'RCPT TO failed'), SMTPTemporaryError),
# Permanent error codes
(None, (554, b'Connect failed'), SMTPCommunicationError),
('HELO', (504, b'HELO failed'), SMTPCommunicationError),
('MAIL', (550, b'MAIL FROM failed'), SMTPCommunicationError),
('RCPT', (550, b'RCPT TO failed'), AddressNotDeliverableError),
]
@patch(target='validate_email.smtp_check._SMTPChecker', new=SMTPMock)
def test_smtp_success(self):
'Succeeds on successful SMTP conversation'
self.assertTrue(
smtp_check(
email_address=EmailAddress('alice@example.com'),
mx_records=['smtp.example.com'],
)
)
def _test_one_smtp_failure(self, cmd, reply, exception):
with patch.dict(in_dict=SMTPMock.reply, values={cmd: reply}):
with self.assertRaises(exception) as context:
smtp_check(
email_address=EmailAddress('alice@example.com'),
mx_records=['smtp.example.com'],
)
if isinstance(reply, tuple):
error_messages = context.exception.error_messages
error_info = error_messages['smtp.example.com']
self.assertEqual(error_info.command[:4].upper(), cmd or 'CONN')
self.assertEqual(error_info.code, reply[0])
self.assertEqual(error_info.text, reply[1].decode())
@patch(target='validate_email.smtp_check._SMTPChecker', new=SMTPMock)
def test_smtp_failure(self):
'Fails on unsuccessful SMTP conversation.'
for cmd, reply, exception in self.failures:
with self.subTest(cmd=cmd, reply=reply):
self._test_one_smtp_failure(cmd, reply, exception)

View File

@ -11,12 +11,12 @@ from .exceptions import (
NoNameserverError, NoValidMXError) NoNameserverError, NoValidMXError)
def _get_mx_records(domain: str, timeout: int) -> list: def _get_mx_records(domain: str, timeout: int) -> Answer:
'Return the DNS response for checking, optionally raise exceptions.' 'Return the DNS response for checking, optionally raise exceptions.'
try: try:
return resolve( return resolve(
qname=domain, rdtype=rdtype_mx, lifetime=timeout, qname=domain, rdtype=rdtype_mx, lifetime=timeout,
search=True) # type: Answer search=True)
except NXDOMAIN: except NXDOMAIN:
raise DomainNotFoundError raise DomainNotFoundError
except NoNameservers: except NoNameservers:
@ -34,10 +34,10 @@ def _get_cleaned_mx_records(domain: str, timeout: int) -> list:
Return a list of hostnames in the MX record, raise an exception on Return a list of hostnames in the MX record, raise an exception on
any issues. any issues.
""" """
records = _get_mx_records(domain=domain, timeout=timeout) answer = _get_mx_records(domain=domain, timeout=timeout)
to_check = list() to_check = list()
host_set = set() host_set = set()
for record in records: # type: MX for record in answer.rrset.processing_order(): # type: MX
dns_str = record.exchange.to_text().rstrip('.') # type: str dns_str = record.exchange.to_text().rstrip('.') # type: str
if dns_str in host_set: if dns_str in host_set:
continue continue
@ -49,7 +49,7 @@ def _get_cleaned_mx_records(domain: str, timeout: int) -> list:
return result return result
def dns_check(email_address: EmailAddress, dns_timeout: int = 10) -> list: def dns_check(email_address: EmailAddress, timeout: int = 10) -> list:
""" """
Check whether there are any responsible SMTP servers for the email Check whether there are any responsible SMTP servers for the email
address by looking up the DNS MX records. address by looking up the DNS MX records.
@ -62,4 +62,4 @@ def dns_check(email_address: EmailAddress, dns_timeout: int = 10) -> list:
return [email_address.domain_literal_ip] return [email_address.domain_literal_ip]
else: else:
return _get_cleaned_mx_records( return _get_cleaned_mx_records(
domain=email_address.domain, timeout=dns_timeout) domain=email_address.domain, timeout=timeout)

View File

@ -56,11 +56,11 @@ class DomainListValidator(object):
self.domain_blacklist = set( self.domain_blacklist = set(
x.strip().lower() for x in lines if x.strip()) x.strip().lower() for x in lines if x.strip())
def __call__(self, address: EmailAddress) -> bool: def __call__(self, email_address: EmailAddress) -> bool:
'Do the checking here.' 'Do the checking here.'
if address.domain in self.domain_whitelist: if email_address.domain in self.domain_whitelist:
return True return True
if address.domain in self.domain_blacklist: if email_address.domain in self.domain_blacklist:
raise DomainBlacklistedError raise DomainBlacklistedError
return True return True

View File

@ -44,41 +44,41 @@ class DomainBlacklistedError(EmailValidationError):
message = 'Domain blacklisted.' message = 'Domain blacklisted.'
class MXError(EmailValidationError): class DNSError(EmailValidationError):
""" """
Base class of all exceptions that indicate failure to determine a Base class of all exceptions that indicate failure to determine a
valid MX for the domain of email address. valid MX for the domain of email address.
""" """
class DomainNotFoundError(MXError): class DomainNotFoundError(DNSError):
'Raised when the domain is not found.' 'Raised when the domain is not found.'
message = 'Domain not found.' message = 'Domain not found.'
class NoNameserverError(MXError): class NoNameserverError(DNSError):
'Raised when the domain does not resolve by nameservers in time.' 'Raised when the domain does not resolve by nameservers in time.'
message = 'No nameserver found for domain.' message = 'No nameserver found for domain.'
class DNSTimeoutError(MXError): class DNSTimeoutError(DNSError):
'Raised when the domain lookup times out.' 'Raised when the domain lookup times out.'
message = 'Domain lookup timed out.' message = 'Domain lookup timed out.'
class DNSConfigurationError(MXError): class DNSConfigurationError(DNSError):
""" """
Raised when the DNS entries for this domain are falsely configured. Raised when the DNS entries for this domain are falsely configured.
""" """
message = 'Misconfigurated DNS entries for domain.' message = 'Misconfigurated DNS entries for domain.'
class NoMXError(MXError): class NoMXError(DNSError):
'Raised when the domain has no MX records configured.' 'Raised when the domain has no MX records configured.'
message = 'No MX record for domain found.' message = 'No MX record for domain found.'
class NoValidMXError(MXError): class NoValidMXError(DNSError):
""" """
Raised when the domain has MX records configured, but none of them Raised when the domain has MX records configured, but none of them
has a valid format. has a valid format.

View File

@ -28,22 +28,22 @@ def _validate_ipv46_address(value: str) -> bool:
return _validate_ipv4_address(value) or _validate_ipv6_address(value) return _validate_ipv4_address(value) or _validate_ipv6_address(value)
def regex_check(address: EmailAddress) -> bool: def regex_check(email_address: EmailAddress) -> bool:
'Slightly adjusted email regex checker from the Django project.' 'Slightly adjusted email regex checker from the Django project.'
# Validate user part. # Validate user part.
if not USER_REGEX.match(address.user): if not USER_REGEX.match(email_address.user):
raise AddressFormatError raise AddressFormatError
# Validate domain part. # Validate domain part.
if address.domain_literal_ip: if email_address.domain_literal_ip:
literal_match = LITERAL_REGEX.match(address.ace_domain) literal_match = LITERAL_REGEX.match(email_address.ace_domain)
if literal_match is None: if literal_match is None:
raise AddressFormatError raise AddressFormatError
if not _validate_ipv46_address(literal_match[1]): if not _validate_ipv46_address(literal_match[1]):
raise AddressFormatError raise AddressFormatError
else: else:
if HOST_REGEX.match(address.ace_domain) is None: if HOST_REGEX.match(email_address.ace_domain) is None:
raise AddressFormatError raise AddressFormatError
# All validations successful. # All validations successful.

View File

@ -38,7 +38,6 @@ class _SMTPChecker(SMTP):
self.set_debuglevel(debuglevel=2 if debug else False) self.set_debuglevel(debuglevel=2 if debug else False)
self.__sender = sender self.__sender = sender
self.__recip = recip self.__recip = recip
self.__communication_errors = {}
self.__temporary_errors = {} self.__temporary_errors = {}
# Avoid error on close() after unsuccessful connect # Avoid error on close() after unsuccessful connect
self.sock = None self.sock = None
@ -144,15 +143,16 @@ class _SMTPChecker(SMTP):
self.mail(sender=self.__sender.ace) self.mail(sender=self.__sender.ace)
code, message = self.rcpt(recip=self.__recip.ace) code, message = self.rcpt(recip=self.__recip.ace)
except SMTPServerDisconnected as e: except SMTPServerDisconnected as e:
self.__communication_errors[self._host] = SMTPMessage( self.__temporary_errors[self._host] = SMTPMessage(
command=self.__command, code=0, text=str(e)) command=self.__command, code=451, text=str(e))
return False return False
except SMTPResponseException as e: except SMTPResponseException as e:
smtp_message = SMTPMessage( smtp_message = SMTPMessage(
command=self.__command, code=e.smtp_code, command=self.__command, code=e.smtp_code,
text=e.smtp_error.decode(errors='ignore')) text=e.smtp_error.decode(errors='ignore'))
if e.smtp_code >= 500: if e.smtp_code >= 500:
self.__communication_errors[self._host] = smtp_message raise SMTPCommunicationError(
error_messages={self._host: smtp_message})
else: else:
self.__temporary_errors[self._host] = smtp_message self.__temporary_errors[self._host] = smtp_message
return False return False
@ -169,18 +169,16 @@ class _SMTPChecker(SMTP):
LOGGER.debug(msg=f'Trying {host} ...') LOGGER.debug(msg=f'Trying {host} ...')
if self._check_one(host=host): if self._check_one(host=host):
return True return True
# Raise appropriate exceptions when necessary # Raise exception for collected temporary errors
if self.__communication_errors: if self.__temporary_errors:
raise SMTPCommunicationError(
error_messages=self.__communication_errors)
elif self.__temporary_errors:
raise SMTPTemporaryError(error_messages=self.__temporary_errors) raise SMTPTemporaryError(error_messages=self.__temporary_errors)
def smtp_check( def smtp_check(
email_address: EmailAddress, mx_records: list, debug: bool, email_address: EmailAddress, mx_records: List[str], timeout: float = 10,
from_address: Optional[EmailAddress] = None, helo_host: Optional[str] = None,
helo_host: Optional[str] = None, smtp_timeout: int = 10) -> bool: from_address: Optional[EmailAddress] = None, debug: bool = False
) -> bool:
""" """
Returns `True` as soon as the any of the given server accepts the Returns `True` as soon as the any of the given server accepts the
recipient address. recipient address.
@ -188,16 +186,17 @@ def smtp_check(
Raise an `AddressNotDeliverableError` if any server unambiguously Raise an `AddressNotDeliverableError` if any server unambiguously
and permanently refuses to accept the recipient address. and permanently refuses to accept the recipient address.
Raise `SMTPTemporaryError` if the server answers with a temporary Raise `SMTPTemporaryError` if all the servers answer with a
error code when validity of the email address can not be determined. temporary error code during the SMTP communication. This means that
Greylisting or server delivery issues can be a cause for this. the validity of the email address can not be determined. Greylisting
or server delivery issues can be a cause for this.
Raise `SMTPCommunicationError` if the SMTP server(s) reply with an Raise `SMTPCommunicationError` if any SMTP server replies with an
error message to any of the communication steps before the recipient error message to any of the communication steps before the recipient
address is checked, and the validity of the email address can not be address is checked, and the validity of the email address can not be
determined either. determined either.
""" """
smtp_checker = _SMTPChecker( smtp_checker = _SMTPChecker(
local_hostname=helo_host, timeout=smtp_timeout, debug=debug, local_hostname=helo_host, timeout=timeout, debug=debug,
sender=from_address or email_address, recip=email_address) sender=from_address or email_address, recip=email_address)
return smtp_checker.check(hosts=mx_records) return smtp_checker.check(hosts=mx_records)

View File

@ -12,6 +12,7 @@ from .smtp_check import smtp_check
LOGGER = getLogger(name=__name__) LOGGER = getLogger(name=__name__)
__all__ = ['validate_email', 'validate_email_or_fail']
__doc__ = """\ __doc__ = """\
Verify the given email address by determining the SMTP servers Verify the given email address by determining the SMTP servers
responsible for the domain and then asking them to deliver an email to responsible for the domain and then asking them to deliver an email to
@ -26,39 +27,39 @@ simply accept everything and send a bounce notification later. Hence, a
def validate_email_or_fail( def validate_email_or_fail(
email_address: str, check_regex: bool = True, check_mx: bool = True, email_address: str, *, check_format: bool = True,
from_address: Optional[str] = None, helo_host: Optional[str] = None, check_blacklist: bool = True, check_dns: bool = True,
smtp_timeout: int = 10, dns_timeout: int = 10, dns_timeout: float = 10, check_smtp: bool = True,
use_blacklist: bool = True, debug: bool = False, smtp_timeout: float = 10, smtp_helo_host: Optional[str] = None,
skip_smtp: bool = False) -> Optional[bool]: smtp_from_address: Optional[str] = None, smtp_debug: bool = False
) -> Optional[bool]:
""" """
Return `True` if the email address validation is successful, `None` Return `True` if the email address validation is successful, `None`
if the validation result is ambigious, and raise an exception if the if the validation result is ambigious, and raise an exception if the
validation fails. validation fails.
""" """
email_address = EmailAddress(address=email_address) email_address = EmailAddress(address=email_address)
if from_address is not None: if check_format:
regex_check(email_address=email_address)
if check_blacklist:
domainlist_check(email_address=email_address)
if not check_dns and not check_smtp: # check_smtp implies check_dns.
return True
mx_records = dns_check(email_address=email_address, timeout=dns_timeout)
if not check_smtp:
return True
if smtp_from_address is not None:
try: try:
from_address = EmailAddress(address=from_address) smtp_from_address = EmailAddress(address=smtp_from_address)
except AddressFormatError: except AddressFormatError:
raise FromAddressFormatError raise FromAddressFormatError
if check_regex:
regex_check(address=email_address)
if use_blacklist:
domainlist_check(address=email_address)
if not check_mx:
return True
mx_records = dns_check(
email_address=email_address, dns_timeout=dns_timeout)
if skip_smtp:
return True
return smtp_check( return smtp_check(
email_address=email_address, mx_records=mx_records, email_address=email_address, mx_records=mx_records,
from_address=from_address, helo_host=helo_host, timeout=smtp_timeout, helo_host=smtp_helo_host,
smtp_timeout=smtp_timeout, debug=debug) from_address=smtp_from_address, debug=smtp_debug)
def validate_email(email_address: str, *args, **kwargs): def validate_email(email_address: str, **kwargs):
""" """
Return `True` or `False` depending if the email address exists Return `True` or `False` depending if the email address exists
or/and can be delivered. or/and can be delivered.
@ -66,7 +67,7 @@ def validate_email(email_address: str, *args, **kwargs):
Return `None` if the result is ambigious. Return `None` if the result is ambigious.
""" """
try: try:
return validate_email_or_fail(email_address, *args, **kwargs) return validate_email_or_fail(email_address, **kwargs)
except SMTPTemporaryError as error: except SMTPTemporaryError as error:
LOGGER.info(msg=f'Validation for {email_address!r} ambigious: {error}') LOGGER.info(msg=f'Validation for {email_address!r} ambigious: {error}')
return return