Merge branch 'version-1.0.0'

This commit is contained in:
László Károlyi 2021-03-14 14:11:28 +01:00
commit fc117e087e
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
15 changed files with 784 additions and 322 deletions

View File

@ -24,4 +24,24 @@ A clear and concise description of what you expected to happen.
- Your network environment (ISP provided home connecton, or testing from an actual whitelisted server)
**Additional context**
=======
A clear and concise description of what the bug is.
**My debug output**
Output from the debug run described in the FAQ:
**Expected behavior**
A clear and concise description of what you expected to happen.
**Please complete the following information:**
- OS: [e.g. Linux, FreeBSD, Windows]
- Flavor and Version [e.g. Debian 22, FreeBSD 12.2]
- Your network environment (ISP provided home connecton, or testing from an actual whitelisted server)
- Your exact `py3-validate-email` module version
**Additional context**
Add any other context about the problem here.

View File

@ -1,3 +1,25 @@
1.0.0:
- New major release with breaking changes! They are:
- Parameter names for validate_email() and validate_email_or_fail() have changed:
- check_regex -> check_format
- use_blacklist -> check_blacklist
- check_mx -> check_dns
- skip_smtp -> check_smtp (with inverted logic)
- helo_host -> smtp_helo_host
- from_address -> smtp_from_address
- debug -> smtp_debug
- All parameters except for the first one (the email address to check) are now keyword-only.
- Ambiguous results and the possibility of more of them, to reflect a real world SMTP delivery process:
- The module tries all MX hosts in order of priority.
- An acceptance of the email address will yield a positive verification result, no further MX hosts will be tried.
- Any permanent SMTP error (5xx) will yield a negative verification result, no further MX hosts will be tried.
- Any temporary SMTP error (4xx) or any connection issue will cause the next MX host to be tried. Only if all MX hosts yield these kinds of errors, the overall verification result will be ambiguous. That is, greylisting or no servers providing a definitive negative or positive.
- The validate_email_or_fail() function will now raise an SMTPTemporaryError() on an ambiguous result.
- All exceptions raised by the SMTP check will contain the occurred communication results in their error_messages class variables.
- Internal API changes (refactorings)
- Check results are now logged with info level, instead of emitting warnings when debug is turned on.
- Props to @reinhard-mueller for coming up with the new proposals and helping in refining the idea.
0.2.16:
- Workaround for a bug in the built-in python 3.8 smtp library: https://github.com/karolyi/py3-validate-email/issues/50
@ -104,4 +126,4 @@
- Handle 'No MX record' exception
0.1.3:
- Added ambigious (4xx) response code handling
- Added ambiguous (4xx) response code handling

71
FAQ.md Normal file
View File

@ -0,0 +1,71 @@
# FAQ:
## The module provides false positives:
The function of this module, and specifically of the SMTP check, relies
on the assumption that the mail server declared responsible for an email
domain will immediately reject any nonexistent address.
Some SMTP servers (Yahoo's servers for example) are only rejecting
nonexistent emails after the end of `DATA` command has been provided in
the conversation with the server. This module only goes until the
`RCPT TO` and says it's valid if it doesn't get rejected there, since
the `DATA` part of the email is the email body itself.
Other SMTP servers accept emails even for nonexistent recipient
addresses and forward them to a different server which will create a
bounce message in a second step. This is the case for many email domains
hosted at Microsoft.
In both cases, there's nothing we can do about it, as the mail server
we talk to seemingly accepts the email address.
## Everything gets rejected:
Check if you have port 25 access from your IP to the accepting server's
IP. Even if you do, the server might use RBL's (spamhaus.org lists, for
example), and your IP might get rejected because of being listed in one
of the used lists by the email server. Your best bet is to use this
module on another server that delivers emails, thus eliminating the
chance of being blacklisted.
## I can't check thousands of emails!
This module is a tool; every tool can become a weapon if not used
properly. In my case, I use this module to check email address validity
at registration time, so not thousands at once. Doing so might make you
(your IP) end up in one of the aforementioned blocklists, as providers
will detect you as a possible spammer. In short, I would advise against
your use case.
## My email doesn't check out!
Run this code with the module installed (use your parameters within),
and see the output:
```python
python -c 'import logging, sys; logging.basicConfig(stream=sys.stderr, level=logging.DEBUG); from validate_email import validate_email; print(validate_email(\'your.email@address.com\', smtp_debug=True))'
```
If you still don't understand why your code doesn't work as expected by
looking at the the logs, then (and only then) add an issue explaining
your problem with a REPRODUCIBLE example, and the output of your test
run.
## How can I pass my email account's credentials? How can I use port 465 or 587 when my provider blocks port 25?
The credentials you got from your email provider, as well as the
instruction to use port 465 or 587, refer to *your provider's* server
for *outgoing* emails.
This module, however, directly talks to the *recipient's* server for
*incoming* emails, so neither your credentials nor the switch to port
465 or 587 is of any use here.
If your internet connection is within an IP pool (often the case for
private use) or it doesn't have a proper reverse DNS entry, the servers
for many email domains (depending on their configuration) will reject
connections from you. This can *not* be solved by using your provider's
mail server. Instead, you have to use the library on a machine with an
internet connection with static IP address and a proper reverse DNS
entry.

View File

@ -25,32 +25,135 @@ USAGE
Basic usage::
from validate_email import validate_email
is_valid = validate_email(email_address='example@example.com', check_regex=True, check_mx=True, from_address='my@from.addr.ess', helo_host='my.host.name', smtp_timeout=10, dns_timeout=10, use_blacklist=True, debug=False)
is_valid = validate_email(email_address='example@example.com', check_format=True, check_blacklist=True, check_dns=True, dns_timeout=10, check_smtp=True, smtp_timeout=10, smtp_helo_host='my.host.name', smtp_from_address='my@from.addr.ess', smtp_debug=False)
:code:`check_regex` will check will the email address has a valid structure and defaults to True
Parameters
----------------------------
:code:`check_mx`: check the mx-records and check whether the email actually exists
:code:`email_address`: the email address to check
:code:`from_address`: the email address the probe will be sent from,
:code:`check_format`: check whether the email address has a valid structure; defaults to :code:`True`
:code:`helo_host`: the host to use in SMTP HELO when checking for an email,
:code:`check_blacklist`: check the email against the blacklist of domains downloaded from https://github.com/martenson/disposable-email-domains; defaults to :code:`True`
:code:`smtp_timeout`: seconds until SMTP timeout
:code:`check_dns`: check the DNS mx-records, defaults to :code:`True`
:code:`dns_timeout`: seconds until DNS timeout; defaults to 10 seconds
:code:`dns_timeout`: seconds until DNS timeout
:code:`check_smtp`: check whether the email actually exists by initiating an SMTP conversation; defaults to :code:`True`
:code:`use_blacklist`: use the blacklist of domains downloaded from https://github.com/martenson/disposable-email-domains
:code:`smtp_timeout`: seconds until SMTP timeout; defaults to 10 seconds
:code:`debug`: emit debug/warning messages while checking email
:code:`smtp_helo_host`: the hostname to use in SMTP HELO/EHLO; if set to :code:`None` (the default), the fully qualified domain name of the local host is used
:code:`skip_smtp`: (default :code:`False`) skip the SMTP conversation with the server, after MX checks. Will automatically be set to :code:`True` when :code:`check_mx` is :code:`False`!
:code:`smtp_from_address`: the email address used for the sender in the SMTP conversation; if set to :code:`None` (the default), the :code:`email_address` parameter is used as the sender as well
The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure instead of returning :code:`False`.
:code:`smtp_debug`: activate :code:`smtplib`'s debug output which always goes to stderr; defaults to :code:`False`
Result
----------------------------
The function :code:`validate_email()` returns the following results:
:code:`True`
All requested checks were successful for the given email address.
:code:`False`
At least one of the requested checks failed for the given email address.
:code:`None`
None of the requested checks failed, but at least one of them yielded an ambiguous result. Currently, the SMTP check is the only check which can actually yield an ambigous result.
Getting more information
----------------------------
The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure and ambiguous result instead of returning :code:`False` or :code:`None`, respectively.
All these exceptions descend from :code:`EmailValidationError`. Please see below for the exact exceptions raised by the various checks. Note that all exception classes are defined in the module :code:`validate_email.exceptions`.
Please note that :code:`SMTPTemporaryError` indicates an ambigous check result rather than a check failure, so if you use :code:`validate_email_or_fail()`, you probably want to catch this exception.
The checks
============================
By default, all checks are enabled, but each of them can be disabled by one of the :code:`check_...` parameters. Note that, however, :code:`check_smtp` implies :code:`check_dns`.
:code:`check_format`
----------------------------
Check whether the given email address conforms to the general format requirements of valid email addresses.
:code:`validate_email_or_fail()` raises :code:`AddressFormatError` on any failure of this test.
:code:`check_blacklist`
----------------------------
Check whether the domain part of the given email address (the part behind the "@") is known as a disposable and temporary email address domain. These are often used to register dummy users in order to spam or abuse some services.
A list of such domains is maintained at https://github.com/martenson/disposable-email-domains, and this module uses that list.
:code:`validate_email_or_fail()` raises :code:`DomainBlacklistedError` if the email address belongs to a blacklisted domain.
:code:`check_dns`
----------------------------
Check whether there is a valid list of servers responsible for delivering emails to the given email address.
First, a DNS query is issued for the email address' domain to retrieve a list of all MX records. That list is then stripped of duplicates and malformatted entries. If at the end of this procedure, at least one valid MX record remains, the check is considered successful.
On failure of this check, :code:`validate_email_or_fail()` raises one of the following exceptions, all of which descend from :code:`DNSError`:
:code:`DomainNotFoundError`
The domain of the email address cannot be found at all.
:code:`NoNameserverError`
There is no nameserver for the domain.
:code:`DNSTimeoutError`
A timeout occured when querying the nameserver. Note that the timeout period can be changed with the :code:`dns_timeout` parameter.
:code:`DNSConfigurationError`
The nameserver is misconfigured.
:code:`NoMXError`
The nameserver does not list any MX records for the domain.
:code:`NoValidMXError`
The nameserver lists MX records for the domain, but none of them is valid.
:code:`check_smtp`
----------------------------
Check whether the given email address exists by simulating an actual email delivery.
A connection to the SMTP server identified through the domain's MX record is established, and an SMTP conversation is initiated up to the point where the server confirms the existence of the email address. After that, instead of actually sending an email, the conversation is cancelled.
The module will try to negotiate a TLS connection with STARTTLS, and silently fall back to an unencrypted SMTP connection if the server doesn't support it.
If the SMTP server replies to the :code:`RCPT TO` command with a code 250 (success) response, the check is considered successful.
If the SMTP server replies with a code 5xx (permanent error) response at any point in the conversation, the check is considered failed.
If the SMTP server cannot be connected, unexpectedly closes the connection, or replies with a code 4xx (temporary error) at any stage of the conversation, the check is considered ambiguous.
If there is more than one valid MX record for the domain, they are tried in order of priority until the first time the check is either successful or failed. Only in case of an ambiguous check result, the next server is tried, and only if the check result is ambiguous for all servers, the overall check is considered ambigous as well.
On failure of this check or on ambiguous result, :code:`validate_email_or_fail()` raises one of the following exceptions, all of which descend from :code:`SMTPError`:
:code:`AddressNotDeliverableError`
The SMTP server permanently refused the email address. Technically, this means that the server replied to the :code:`RCPT TO` command with a code 5xx response.
:code:`SMTPCommunicationError`
The SMTP server refused to even let us get to the point where we could ask it about the email address. Technically, this means that the server sent a code 5xx response either immediately after connection, or as a reply to the :code:`EHLO` (or :code:`HELO`) or :code:`MAIL FROM` commands.
:code:`SMTPTemporaryError`
A temporary error occured during the check for all available MX servers. This is considered an ambigous check result. For example, greylisting is a frequent cause for this.
All of the above three exceptions provide further detail about the error response(s) in the exception's instance variable :code:`error_messages`.
Auto-updater
============================
The package contains an auto-updater for downloading and updating the built-in blacklist.txt. It will run on each module load (and installation), but will try to update the content only if the file is older than 5 days, and if the content is not the same that's already downloaded.
The update can be triggered manually::
@ -66,25 +169,7 @@ The update can be triggered manually::
:code:`callback`: An optional `Callable` (function/method) to be called when the update is done.
FAQ:
========
The module provides false positives:
------------------------------------
Some SMTP Servers (Yahoo's servers for example) are only rejecting nonexistent emails after the end of ``DATA`` command has been provided in the conversation with the server. This module only goes until the ``RCPT TO`` and says it's valid if it doesn't get rejected there, since the ``DATA`` part of the email is the email body itself. There's not much one can do with it, you have to accept false positives in the case of yahoo.com and some other providers. I'm not sure if rejecting emails after the ``DATA`` command is a valid behavior based on the SMTP RFC, but I wouldn't wonder if not.
Read the FAQ_!
============================
Everything gets rejected:
-------------------------
Check if you have port 25 access from your IP to the accepting server's IP. Even if you do, the server might use RBL's (spamhaus.org lists, for example), and your IP might get rejected because of being listed in one of the used lists by the email server. Your best bet is to use this module on another server that delivers emails, thus eliminating the chance of being blacklisted.
I can't check thousands of emails!
----------------------------------
This module is a tool; every tool can become a weapon if not used properly. In my case, I use this module to check email address validity at registration time, so not thousands at once. Doing so might make you (your IP) end up in one of the aforementioned blocklists, as providers will detect you as a possible spammer. In short, I would advise against your use case.
My email doesn't check out!
---------------------------
Run this code with the module installed (use your parameters within), and see the output::
python -c 'import logging, sys; logging.basicConfig(stream=sys.stderr, level=logging.DEBUG); from validate_email import validate_email; print(validate_email(\'your.email@address.com\', check_mx=True, debug=True))'
If you still don't understand why your code doesn't work as expected by looking at the the logs, then (and only then) add an issue explaining your problem with a REPRODUCIBLE example, and the output of your test run.
.. _FAQ: https://github.com/karolyi/py3-validate-email/blob/master/FAQ.md

View File

@ -58,7 +58,7 @@ setup(
name='py3-validate-email',
version='0.2.16',
packages=find_packages(exclude=['tests']),
install_requires=['dnspython~=2.0', 'idna~=2.10', 'filelock~=3.0'],
install_requires=['dnspython~=2.0', 'idna~=3.0', 'filelock~=3.0'],
author='László Károlyi',
author_email='laszlo@karolyi.hu',
description=(

View File

@ -20,20 +20,20 @@ class BlacklistCheckTestCase(TestCase):
domainlist_check(EmailAddress('pm2@mailinator.com'))
with self.assertRaises(DomainBlacklistedError):
validate_email_or_fail(
email_address='pm2@mailinator.com', check_regex=False,
use_blacklist=True)
email_address='pm2@mailinator.com', check_format=False,
check_blacklist=True)
with self.assertRaises(DomainBlacklistedError):
validate_email_or_fail(
email_address='pm2@mailinator.com', check_regex=True,
use_blacklist=True)
email_address='pm2@mailinator.com', check_format=True,
check_blacklist=True)
with self.assertLogs():
self.assertFalse(expr=validate_email(
email_address='pm2@mailinator.com', check_regex=False,
use_blacklist=True, debug=True))
email_address='pm2@mailinator.com', check_format=False,
check_blacklist=True))
with self.assertLogs():
self.assertFalse(expr=validate_email(
email_address='pm2@mailinator.com', check_regex=True,
use_blacklist=True, debug=True))
email_address='pm2@mailinator.com', check_format=True,
check_blacklist=True))
def test_blacklist_negative(self):
'Allows a domain not in the blacklist.'

View File

@ -4,10 +4,9 @@ from unittest.mock import Mock, patch
from dns.exception import Timeout
from validate_email import mx_check as mx_module
from validate_email.email_address import EmailAddress
from validate_email import dns_check
from validate_email.dns_check import _get_cleaned_mx_records
from validate_email.exceptions import DNSTimeoutError, NoValidMXError
from validate_email.mx_check import _get_cleaned_mx_records, mx_check
class DnsNameStub(object):
@ -20,55 +19,60 @@ class DnsNameStub(object):
return self.value
class DnsRRsetStub(object):
'Stub for `dns.rrset.RRset`.'
def __init__(self, hostnames: list):
self.names = [
SimpleNamespace(exchange=DnsNameStub(value=x)) for x in hostnames]
def processing_order(self):
return self.names
def _answer(hostnames: list):
return SimpleNamespace(rrset=DnsRRsetStub(hostnames=hostnames))
TEST_QUERY = Mock()
class GetMxRecordsTestCase(TestCase):
'Testing `_get_mx_records`.'
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_invalid_hostnames(self):
'Fails when an MX hostname is "."'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='.'))]
TEST_QUERY.return_value = _answer(hostnames=['.'])
with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain1', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_null_hostnames(self):
'Fails when an MX hostname is invalid.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe'))]
TEST_QUERY.return_value = _answer(hostnames=['asdqwe'])
with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain2', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_filters_out_invalid_hostnames(self):
'Returns only the valid hostnames.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe.')),
SimpleNamespace(exchange=DnsNameStub(value='.')),
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')),
# This is an intentional duplicate.
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')),
SimpleNamespace(exchange=DnsNameStub(value='valid2.host.')),
]
TEST_QUERY.return_value = _answer(hostnames=[
'asdqwe.',
'.',
'valid.host.',
'valid.host.', # This is an intentional duplicate.
'valid2.host.',
])
result = _get_cleaned_mx_records(domain='testdomain3', timeout=10)
self.assertListEqual(result, ['valid.host', 'valid2.host'])
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_raises_exception_on_dns_timeout(self):
'Raises exception on DNS timeout.'
TEST_QUERY.side_effect = Timeout()
with self.assertRaises(DNSTimeoutError) as exc:
_get_cleaned_mx_records(domain='testdomain3', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@patch.object(target=mx_module, attribute='_check_mx_records')
def test_skip_smtp_argument(self, check_mx_records_mock):
'Check correct work of `skip_smtp` argument.'
self.assertTrue(mx_check(
EmailAddress('test@mail.ru'), debug=False, skip_smtp=True))
self.assertEqual(check_mx_records_mock.call_count, 0)

100
tests/test_smtp_check.py Normal file
View File

@ -0,0 +1,100 @@
from smtplib import SMTPServerDisconnected
from socket import timeout
from unittest.case import TestCase
from unittest.mock import patch
from validate_email.email_address import EmailAddress
from validate_email.exceptions import (
AddressNotDeliverableError, SMTPCommunicationError, SMTPTemporaryError)
from validate_email.smtp_check import _SMTPChecker, smtp_check
class SMTPMock(_SMTPChecker):
"""
Mock replacement for the SMTP connection.
Instead of really communicating with an SMTP server, this class
works with predefined fake responses. By default, the responses
emulate a successful SMTP conversation, but it can be turned into an
unsuccessful one by patching the `reply` dictionary.
"""
reply = {
None: (220, b'Welcome'),
"EHLO": (502, b'Please use HELO'),
'HELO': (220, b'HELO successful'),
'MAIL': (250, b'MAIL FROM successful'),
'RCPT': (250, b'RCPT TO successful'),
'QUIT': (221, b'QUIT successful'),
}
last_command = None
def _get_socket(self, host, port, timeout):
return None
def send(self, s):
self.last_command = s[:4].upper()
def getreply(self):
if isinstance(self.reply[self.last_command], Exception):
self.close()
raise self.reply[self.last_command]
return self.reply[self.last_command]
class SMTPCheckTest(TestCase):
'Collection of tests the `smtp_check` method.'
# All the possible ways to fail we want to test, listed as tuples
# containing (command, reply, expected exception).
failures = [
# Timeout on connection
(None, timeout(), SMTPTemporaryError),
# Connection unexpectedly closed during any stage
(None, SMTPServerDisconnected('Test'), SMTPTemporaryError),
('EHLO', SMTPServerDisconnected('Test'), SMTPTemporaryError),
('HELO', SMTPServerDisconnected('Test'), SMTPTemporaryError),
('MAIL', SMTPServerDisconnected('Test'), SMTPTemporaryError),
('RCPT', SMTPServerDisconnected('Test'), SMTPTemporaryError),
# Temporary error codes
(None, (421, b'Connect failed'), SMTPTemporaryError),
('HELO', (421, b'HELO failed'), SMTPTemporaryError),
('MAIL', (451, b'MAIL FROM failed'), SMTPTemporaryError),
('RCPT', (451, b'RCPT TO failed'), SMTPTemporaryError),
# Permanent error codes
(None, (554, b'Connect failed'), SMTPCommunicationError),
('HELO', (504, b'HELO failed'), SMTPCommunicationError),
('MAIL', (550, b'MAIL FROM failed'), SMTPCommunicationError),
('RCPT', (550, b'RCPT TO failed'), AddressNotDeliverableError),
]
@patch(target='validate_email.smtp_check._SMTPChecker', new=SMTPMock)
def test_smtp_success(self):
'Succeeds on successful SMTP conversation'
self.assertTrue(
smtp_check(
email_address=EmailAddress('alice@example.com'),
mx_records=['smtp.example.com'],
)
)
def _test_one_smtp_failure(self, cmd, reply, exception):
with patch.dict(in_dict=SMTPMock.reply, values={cmd: reply}):
with self.assertRaises(exception) as context:
smtp_check(
email_address=EmailAddress('alice@example.com'),
mx_records=['smtp.example.com'],
)
if isinstance(reply, tuple):
error_messages = context.exception.error_messages
error_info = error_messages['smtp.example.com']
self.assertEqual(error_info.command[:4].upper(), cmd or 'CONN')
self.assertEqual(error_info.code, reply[0])
self.assertEqual(error_info.text, reply[1].decode())
@patch(target='validate_email.smtp_check._SMTPChecker', new=SMTPMock)
def test_smtp_failure(self):
'Fails on unsuccessful SMTP conversation.'
for cmd, reply, exception in self.failures:
with self.subTest(cmd=cmd, reply=reply):
self._test_one_smtp_failure(cmd, reply, exception)

View File

@ -0,0 +1,65 @@
from dns.exception import Timeout
from dns.rdatatype import MX as rdtype_mx
from dns.rdtypes.ANY.MX import MX
from dns.resolver import (
NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, resolve)
from .constants import HOST_REGEX
from .email_address import EmailAddress
from .exceptions import (
DNSConfigurationError, DNSTimeoutError, DomainNotFoundError, NoMXError,
NoNameserverError, NoValidMXError)
def _get_mx_records(domain: str, timeout: int) -> Answer:
'Return the DNS response for checking, optionally raise exceptions.'
try:
return resolve(
qname=domain, rdtype=rdtype_mx, lifetime=timeout,
search=True)
except NXDOMAIN:
raise DomainNotFoundError
except NoNameservers:
raise NoNameserverError
except Timeout:
raise DNSTimeoutError
except YXDOMAIN:
raise DNSConfigurationError
except NoAnswer:
raise NoMXError
def _get_cleaned_mx_records(domain: str, timeout: int) -> list:
"""
Return a list of hostnames in the MX record, raise an exception on
any issues.
"""
answer = _get_mx_records(domain=domain, timeout=timeout)
to_check = list()
host_set = set()
for record in answer.rrset.processing_order(): # type: MX
dns_str = record.exchange.to_text().rstrip('.') # type: str
if dns_str in host_set:
continue
to_check.append(dns_str)
host_set.add(dns_str)
result = [x for x in to_check if HOST_REGEX.search(string=x)]
if not result:
raise NoValidMXError
return result
def dns_check(email_address: EmailAddress, timeout: int = 10) -> list:
"""
Check whether there are any responsible SMTP servers for the email
address by looking up the DNS MX records.
In case no responsible SMTP servers can be determined, a variety of
exceptions is raised depending on the exact issue, all derived from
`MXError`. Otherwise, return the list of MX hostnames.
"""
if email_address.domain_literal_ip:
return [email_address.domain_literal_ip]
else:
return _get_cleaned_mx_records(
domain=email_address.domain, timeout=timeout)

View File

@ -56,11 +56,11 @@ class DomainListValidator(object):
self.domain_blacklist = set(
x.strip().lower() for x in lines if x.strip())
def __call__(self, address: EmailAddress) -> bool:
def __call__(self, email_address: EmailAddress) -> bool:
'Do the checking here.'
if address.domain in self.domain_whitelist:
if email_address.domain in self.domain_whitelist:
return True
if address.domain in self.domain_blacklist:
if email_address.domain in self.domain_blacklist:
raise DomainBlacklistedError
return True

View File

@ -1,20 +1,25 @@
from typing import Iterable
from collections import namedtuple
from typing import Dict
SMTPMessage = namedtuple(
typename='SmtpErrorMessage', field_names=['command', 'code', 'text'])
class EmailValidationError(Exception):
'Base class for all exceptions indicating validation failure.'
class Error(Exception):
'Base class for all exceptions of this module.'
message = 'Unknown error.'
def __str__(self):
return self.message
class AddressFormatError(EmailValidationError):
'Raised when the email address has an invalid format.'
message = 'Invalid email address.'
class ParameterError(Error):
"""
Base class for all exceptions indicating a wrong function parameter.
"""
class FromAddressFormatError(EmailValidationError):
class FromAddressFormatError(ParameterError):
"""
Raised when the from email address used for the MX check has an
invalid format.
@ -22,6 +27,15 @@ class FromAddressFormatError(EmailValidationError):
message = 'Invalid "From:" email address.'
class EmailValidationError(Error):
'Base class for all exceptions indicating validation failure.'
class AddressFormatError(EmailValidationError):
'Raised when the email address has an invalid format.'
message = 'Invalid email address.'
class DomainBlacklistedError(EmailValidationError):
"""
Raised when the domain of the email address is blacklisted on
@ -30,43 +44,101 @@ class DomainBlacklistedError(EmailValidationError):
message = 'Domain blacklisted.'
class DomainNotFoundError(EmailValidationError):
class DNSError(EmailValidationError):
"""
Base class of all exceptions that indicate failure to determine a
valid MX for the domain of email address.
"""
class DomainNotFoundError(DNSError):
'Raised when the domain is not found.'
message = 'Domain not found.'
class NoNameserverError(EmailValidationError):
class NoNameserverError(DNSError):
'Raised when the domain does not resolve by nameservers in time.'
message = 'No nameserver found for domain.'
class DNSTimeoutError(EmailValidationError):
class DNSTimeoutError(DNSError):
'Raised when the domain lookup times out.'
message = 'Domain lookup timed out.'
class DNSConfigurationError(EmailValidationError):
class DNSConfigurationError(DNSError):
"""
Raised when the DNS entries for this domain are falsely configured.
"""
message = 'Misconfigurated DNS entries for domain.'
class NoMXError(EmailValidationError):
'Raised then the domain has no MX records configured.'
class NoMXError(DNSError):
'Raised when the domain has no MX records configured.'
message = 'No MX record for domain found.'
class NoValidMXError(EmailValidationError):
class NoValidMXError(DNSError):
"""
Raised when the domain has MX records configured, but none of them
has a valid format.
"""
message = 'No valid MX record for domain found.'
class AddressNotDeliverableError(EmailValidationError):
'Raised when a non-ambigious resulted lookup fails.'
message = 'Email address undeliverable:'
class SMTPError(EmailValidationError):
"""
Base class for exceptions raised in the end from unsuccessful SMTP
communication.
def __init__(self, error_messages: Iterable):
`error_messages` is a dictionary with a `SMTPMessage` per MX record,
where the hostname is the key and a tuple of command, error code,
and error message is the value.
"""
def __init__(self, error_messages: Dict[str, SMTPMessage]):
self.error_messages = error_messages
def __str__(self) -> str:
return '\n'.join([self.message] + self.error_messages)
return '\n'.join([self.message] + [
f'{host}: {message.code} {message.text} '
f'(in reply to {message.command!r})'
for host, message in self.error_messages.items()
])
class AddressNotDeliverableError(SMTPError):
"""
Raised when at least one of the MX sends an SMTP reply which
unambiguously indicate an invalid (nonexistant, blocked, expired...)
recipient email address.
This exception indicates that the email address is clearly invalid.
"""
message = 'Email address undeliverable:'
class SMTPCommunicationError(SMTPError):
"""
Raised when the SMTP communication with all MX was unsuccessful for
other reasons than an invalid recipient email address.
This exception indicates a configuration issue either on the host
where this program runs or on the MX. A possible reason is that the
local host ist blacklisted on the MX.
"""
message = 'SMTP communication failure:'
class SMTPTemporaryError(SMTPError):
"""
Raised when the email address cannot be verified because none of the
MX gave a clear "yes" or "no" about the existence of the address,
but at least one gave a temporary error reply to the "RCPT TO:"
command.
This exception indicates that the validity of the email address
cannot be verified, either for reasons of MX configuration (like
greylisting) or due to temporary server issues on the MX.
"""
message = 'Temporary error in email address verification:'

View File

@ -1,198 +0,0 @@
from logging import getLogger
from smtplib import SMTP, SMTPNotSupportedError, SMTPServerDisconnected
from socket import error as SocketError
from socket import gethostname
from typing import Optional
from dns.exception import Timeout
from dns.rdatatype import MX as rdtype_mx
from dns.rdtypes.ANY.MX import MX
from dns.resolver import (
NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, resolve)
from .constants import HOST_REGEX
from .email_address import EmailAddress
from .exceptions import (
AddressNotDeliverableError, DNSConfigurationError, DNSTimeoutError,
DomainNotFoundError, NoMXError, NoNameserverError, NoValidMXError)
LOGGER = getLogger(name=__name__)
class _ProtocolError(Exception):
"""
Raised when there is an error during the SMTP conversation.
Used only internally.
"""
def __init__(self, command: str, code: int, message: bytes):
self.command = command
self.code = code
self.message = message.decode(errors='ignore')
def __str__(self):
return f'{self.code} {self.message} (in reply to {self.command})'
def _get_mx_records(domain: str, timeout: int) -> list:
'Return the DNS response for checking, optionally raise exceptions.'
try:
return resolve(
qname=domain, rdtype=rdtype_mx, lifetime=timeout,
search=True) # type: Answer
except NXDOMAIN:
raise DomainNotFoundError
except NoNameservers:
raise NoNameserverError
except Timeout:
raise DNSTimeoutError
except YXDOMAIN:
raise DNSConfigurationError
except NoAnswer:
raise NoMXError
def _get_cleaned_mx_records(domain: str, timeout: int) -> list:
"""
Return a list of hostnames in the MX record, raise an exception on
any issues.
"""
records = _get_mx_records(domain=domain, timeout=timeout)
to_check = list()
host_set = set()
for record in records: # type: MX
dns_str = record.exchange.to_text().rstrip('.') # type: str
if dns_str in host_set:
continue
to_check.append(dns_str)
host_set.add(dns_str)
result = [x for x in to_check if HOST_REGEX.search(string=x)]
if not result:
raise NoValidMXError
return result
def _smtp_ehlo_tls(smtp: SMTP, helo_host: str):
"""
Try and start the TLS session, fall back to unencrypted when
unavailable.
"""
code, message = smtp.ehlo(name=helo_host)
if code >= 300:
# EHLO bails out, no further SMTP commands are acceptable
raise _ProtocolError('EHLO', code, message)
try:
smtp.starttls()
code, message = smtp.ehlo(name=helo_host)
except SMTPNotSupportedError:
# The server does not support the STARTTLS extension
pass
except RuntimeError:
# SSL/TLS support is not available to your Python interpreter
pass
def _smtp_mail(smtp: SMTP, from_address: EmailAddress):
'Send and evaluate the `MAIL FROM` command.'
code, message = smtp.mail(sender=from_address.ace)
if code >= 300:
# MAIL FROM bails out, no further SMTP commands are acceptable
raise _ProtocolError('MAIL FROM', code, message)
def _smtp_converse(
mx_record: str, smtp_timeout: int, debug: bool, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress):
"""
Do the `SMTP` conversation, handle errors in the caller.
Raise `_ProtocolError` on error, and `StopIteration` if the
conversation points out an existing email.
"""
if debug:
LOGGER.debug(msg=f'Trying {mx_record} ...')
with SMTP(timeout=smtp_timeout) as smtp:
smtp._host = mx_record # Workaround for bug in smtplib
smtp.set_debuglevel(debuglevel=2 if debug else False)
code, message = smtp.connect(host=mx_record)
if code >= 400:
raise _ProtocolError('connect', code, message)
_smtp_ehlo_tls(smtp=smtp, helo_host=helo_host)
_smtp_mail(smtp=smtp, from_address=from_address)
code, message = smtp.rcpt(recip=email_address.ace)
if code == 250:
# Address valid, early exit
raise StopIteration
elif code >= 500:
raise _ProtocolError('RCPT TO', code, message)
def _check_one_mx(
error_messages: list, mx_record: str, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress,
smtp_timeout: int, debug: bool) -> bool:
"""
Check one MX server, return the `is_ambigious` boolean or raise
`StopIteration` if this MX accepts the email.
"""
try:
_smtp_converse(
mx_record=mx_record, smtp_timeout=smtp_timeout, debug=debug,
helo_host=helo_host, from_address=from_address,
email_address=email_address)
except SMTPServerDisconnected:
return True
except (SocketError, _ProtocolError) as error:
error_messages.append(f'{mx_record}: {error}')
return False
return True
def _check_mx_records(
mx_records: list, smtp_timeout: int, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress,
debug: bool) -> Optional[bool]:
'Check the mx records for a given email address.'
# TODO: Raise an ambigious exception, containing the messages? Will
# be a breaking change.
error_messages = []
found_ambigious = False
for mx_record in mx_records:
try:
found_ambigious |= _check_one_mx(
error_messages=error_messages, mx_record=mx_record,
helo_host=helo_host, from_address=from_address,
email_address=email_address, smtp_timeout=smtp_timeout,
debug=debug)
except StopIteration:
# Address valid, early exit
return True
# If any of the mx servers behaved ambigious, return None, otherwise raise
# an exception containing the collected error messages.
if not found_ambigious:
raise AddressNotDeliverableError(error_messages=error_messages)
def mx_check(
email_address: EmailAddress, debug: bool,
from_address: Optional[EmailAddress] = None,
helo_host: Optional[str] = None, smtp_timeout: int = 10,
dns_timeout: int = 10, skip_smtp: bool = False
) -> Optional[bool]:
"""
Return `True` if the host responds with a deliverable response code,
`False` if not-deliverable. Also, return `None` if there if couldn't
provide a conclusive result (e.g. temporary errors or graylisting).
"""
host = helo_host or gethostname()
from_address = from_address or email_address
if email_address.domain_literal_ip:
mx_records = [email_address.domain_literal_ip]
else:
mx_records = _get_cleaned_mx_records(
domain=email_address.domain, timeout=dns_timeout)
if skip_smtp:
return True
return _check_mx_records(
mx_records=mx_records, smtp_timeout=smtp_timeout, helo_host=host,
from_address=from_address, email_address=email_address, debug=debug)

View File

@ -28,22 +28,22 @@ def _validate_ipv46_address(value: str) -> bool:
return _validate_ipv4_address(value) or _validate_ipv6_address(value)
def regex_check(address: EmailAddress) -> bool:
def regex_check(email_address: EmailAddress) -> bool:
'Slightly adjusted email regex checker from the Django project.'
# Validate user part.
if not USER_REGEX.match(address.user):
if not USER_REGEX.match(email_address.user):
raise AddressFormatError
# Validate domain part.
if address.domain_literal_ip:
literal_match = LITERAL_REGEX.match(address.ace_domain)
if email_address.domain_literal_ip:
literal_match = LITERAL_REGEX.match(email_address.ace_domain)
if literal_match is None:
raise AddressFormatError
if not _validate_ipv46_address(literal_match[1]):
raise AddressFormatError
else:
if HOST_REGEX.match(address.ace_domain) is None:
if HOST_REGEX.match(email_address.ace_domain) is None:
raise AddressFormatError
# All validations successful.

View File

@ -0,0 +1,202 @@
from logging import getLogger
from smtplib import (
SMTP, SMTPNotSupportedError, SMTPResponseException, SMTPServerDisconnected)
from typing import List, Optional, Tuple
from .email_address import EmailAddress
from .exceptions import (
AddressNotDeliverableError, SMTPCommunicationError, SMTPMessage,
SMTPTemporaryError)
LOGGER = getLogger(name=__name__)
class _SMTPChecker(SMTP):
"""
A specialized variant of `smtplib.SMTP` for checking the validity of
email addresses.
All the commands used in the check process are modified to raise
appropriate exceptions: `SMTPServerDisconnected` on connection
issues and `SMTPResponseException` on negative SMTP server
responses. Note that the methods of `smtplib.SMTP` already raise
these exceptions on some conditions.
Also, a new method `check` is added to run the check for a given
list of SMTP servers.
"""
def __init__(
self, local_hostname: str, timeout: float, debug: bool,
sender: EmailAddress, recip: EmailAddress):
"""
Initialize the object with all the parameters which remain
constant during the check of one email address on all the SMTP
servers.
"""
super().__init__(local_hostname=local_hostname, timeout=timeout)
self.set_debuglevel(debuglevel=2 if debug else False)
self.__sender = sender
self.__recip = recip
self.__temporary_errors = {}
# Avoid error on close() after unsuccessful connect
self.sock = None
def putcmd(self, cmd: str, args: str = ''):
"""
Like `smtplib.SMTP.putcmd`, but remember the command for later
use in error messages.
"""
if args:
self.__command = f'{cmd} {args}'
else:
self.__command = cmd
super().putcmd(cmd=cmd, args=args)
def connect(
self, host: str = 'localhost', port: int = 0,
source_address: str = None) -> Tuple[int, str]:
"""
Like `smtplib.SMTP.connect`, but raise appropriate exceptions on
connection failure or negative SMTP server response.
"""
self.__command = 'connect' # Used for error messages.
self._host = host # Workaround: Missing in standard smtplib!
try:
code, message = super().connect(
host=host, port=port, source_address=source_address)
except OSError as error:
raise SMTPServerDisconnected(str(error))
if code >= 400:
raise SMTPResponseException(code=code, msg=message)
return code, message
def starttls(self, *args, **kwargs):
"""
Like `smtplib.SMTP.starttls`, but continue without TLS in case
either end of the connection does not support it.
"""
try:
super().starttls(*args, **kwargs)
except SMTPNotSupportedError:
# The server does not support the STARTTLS extension
pass
except RuntimeError:
# SSL/TLS support is not available to your Python interpreter
pass
def mail(self, sender: str, options: tuple = ()):
"""
Like `smtplib.SMTP.mail`, but raise an appropriate exception on
negative SMTP server response.
A code > 400 is an error here.
"""
code, message = super().mail(sender=sender, options=options)
if code >= 400:
raise SMTPResponseException(code=code, msg=message)
return code, message
def rcpt(self, recip: str, options: tuple = ()):
"""
Like `smtplib.SMTP.rcpt`, but handle negative SMTP server
responses directly.
"""
code, message = super().rcpt(recip=recip, options=options)
if code >= 500:
# Address clearly invalid: issue negative result
raise AddressNotDeliverableError({
self._host: SMTPMessage(
command='RCPT TO', code=code,
text=message.decode(errors='ignore'))})
elif code >= 400:
raise SMTPResponseException(code=code, msg=message)
return code, message
def quit(self):
"""
Like `smtplib.SMTP.quit`, but make sure that everything is
cleaned up properly even if the connection has been lost before.
"""
try:
return super().quit()
except SMTPServerDisconnected:
self.ehlo_resp = self.helo_resp = None
self.esmtp_features = {}
self.does_esmtp = False
self.close()
def _check_one(self, host: str) -> bool:
"""
Run the check for one SMTP server.
Return `True` on positive result.
Return `False` on ambiguous result (4xx response to `RCPT TO`),
while collecting the error message for later use.
Raise `AddressNotDeliverableError`. on negative result.
"""
try:
self.connect(host=host)
self.starttls()
self.ehlo_or_helo_if_needed()
self.mail(sender=self.__sender.ace)
code, message = self.rcpt(recip=self.__recip.ace)
except SMTPServerDisconnected as e:
self.__temporary_errors[self._host] = SMTPMessage(
command=self.__command, code=451, text=str(e))
return False
except SMTPResponseException as e:
smtp_message = SMTPMessage(
command=self.__command, code=e.smtp_code,
text=e.smtp_error.decode(errors='ignore'))
if e.smtp_code >= 500:
raise SMTPCommunicationError(
error_messages={self._host: smtp_message})
else:
self.__temporary_errors[self._host] = smtp_message
return False
finally:
self.quit()
return code < 400
def check(self, hosts: List[str]) -> bool:
"""
Run the check for all given SMTP servers. On positive result,
return `True`, else raise exceptions described in `smtp_check`.
"""
for host in hosts:
LOGGER.debug(msg=f'Trying {host} ...')
if self._check_one(host=host):
return True
# Raise exception for collected temporary errors
if self.__temporary_errors:
raise SMTPTemporaryError(error_messages=self.__temporary_errors)
def smtp_check(
email_address: EmailAddress, mx_records: List[str], timeout: float = 10,
helo_host: Optional[str] = None,
from_address: Optional[EmailAddress] = None, debug: bool = False
) -> bool:
"""
Returns `True` as soon as the any of the given server accepts the
recipient address.
Raise an `AddressNotDeliverableError` if any server unambiguously
and permanently refuses to accept the recipient address.
Raise `SMTPTemporaryError` if all the servers answer with a
temporary error code during the SMTP communication. This means that
the validity of the email address can not be determined. Greylisting
or server delivery issues can be a cause for this.
Raise `SMTPCommunicationError` if any SMTP server replies with an
error message to any of the communication steps before the recipient
address is checked, and the validity of the email address can not be
determined either.
"""
smtp_checker = _SMTPChecker(
local_hostname=helo_host, timeout=timeout, debug=debug,
sender=from_address or email_address, recip=email_address)
return smtp_checker.check(hosts=mx_records)

View File

@ -1,47 +1,65 @@
from logging import getLogger
from typing import Optional
from .dns_check import dns_check
from .domainlist_check import domainlist_check
from .email_address import EmailAddress
from .exceptions import (
AddressFormatError, EmailValidationError, FromAddressFormatError)
from .mx_check import mx_check
AddressFormatError, EmailValidationError, FromAddressFormatError,
SMTPTemporaryError)
from .regex_check import regex_check
from .smtp_check import smtp_check
LOGGER = getLogger(name=__name__)
__all__ = ['validate_email', 'validate_email_or_fail']
__doc__ = """\
Verify the given email address by determining the SMTP servers
responsible for the domain and then asking them to deliver an email to
the address. Before the actual message is sent, the process is
interrupted.
PLEASE NOTE: Some email providers only tell the actual delivery failure
AFTER having delivered the body which this module doesn't, while others
simply accept everything and send a bounce notification later. Hence, a
100% proper response is not guaranteed.
"""
def validate_email_or_fail(
email_address: str, check_regex: bool = True, check_mx: bool = True,
from_address: Optional[str] = None, helo_host: Optional[str] = None,
smtp_timeout: int = 10, dns_timeout: int = 10,
use_blacklist: bool = True, debug: bool = False,
skip_smtp: bool = False) -> Optional[bool]:
email_address: str, *, check_format: bool = True,
check_blacklist: bool = True, check_dns: bool = True,
dns_timeout: float = 10, check_smtp: bool = True,
smtp_timeout: float = 10, smtp_helo_host: Optional[str] = None,
smtp_from_address: Optional[str] = None, smtp_debug: bool = False
) -> Optional[bool]:
"""
Return `True` if the email address validation is successful, `None` if the
validation result is ambigious, and raise an exception if the validation
fails.
Return `True` if the email address validation is successful, `None`
if the validation result is ambigious, and raise an exception if the
validation fails.
"""
email_address = EmailAddress(email_address)
if from_address is not None:
email_address = EmailAddress(address=email_address)
if check_format:
regex_check(email_address=email_address)
if check_blacklist:
domainlist_check(email_address=email_address)
if not check_dns and not check_smtp: # check_smtp implies check_dns.
return True
mx_records = dns_check(email_address=email_address, timeout=dns_timeout)
if not check_smtp:
return True
if smtp_from_address is not None:
try:
from_address = EmailAddress(from_address)
smtp_from_address = EmailAddress(address=smtp_from_address)
except AddressFormatError:
raise FromAddressFormatError
if check_regex:
regex_check(email_address)
if use_blacklist:
domainlist_check(email_address)
if not check_mx:
return True
return mx_check(
email_address=email_address, from_address=from_address,
helo_host=helo_host, smtp_timeout=smtp_timeout,
dns_timeout=dns_timeout, skip_smtp=skip_smtp, debug=debug)
return smtp_check(
email_address=email_address, mx_records=mx_records,
timeout=smtp_timeout, helo_host=smtp_helo_host,
from_address=smtp_from_address, debug=smtp_debug)
def validate_email(email_address: str, *args, **kwargs):
def validate_email(email_address: str, **kwargs):
"""
Return `True` or `False` depending if the email address exists
or/and can be delivered.
@ -49,9 +67,10 @@ def validate_email(email_address: str, *args, **kwargs):
Return `None` if the result is ambigious.
"""
try:
return validate_email_or_fail(email_address, *args, **kwargs)
return validate_email_or_fail(email_address, **kwargs)
except SMTPTemporaryError as error:
LOGGER.info(msg=f'Validation for {email_address!r} ambigious: {error}')
return
except EmailValidationError as error:
message = f'Validation for {email_address!r} failed: {error}'
if kwargs.get('debug'):
LOGGER.warning(msg=message)
LOGGER.info(msg=f'Validation for {email_address!r} failed: {error}')
return False