From 73814d7f6d64dfb41e30bebe4beaf008731b9cbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reinhard=20M=C3=BCller?= Date: Wed, 8 Apr 2020 23:56:49 +0200 Subject: [PATCH 1/3] Introduce distinct exception classes and logging --- README.rst | 2 + tests/test_blacklist_check.py | 35 +++++++++---- tests/test_mx_check.py | 19 +++---- tests/test_regex_check.py | 7 +-- validate_email/__init__.py | 4 +- validate_email/domainlist_check.py | 3 +- validate_email/exceptions.py | 82 ++++++++++++++++++++++++++++++ validate_email/mx_check.py | 58 ++++++++++++--------- validate_email/regex_check.py | 5 +- validate_email/validate_email.py | 37 +++++++++----- 10 files changed, 185 insertions(+), 67 deletions(-) create mode 100644 validate_email/exceptions.py diff --git a/README.rst b/README.rst index 71fa4df..e767658 100644 --- a/README.rst +++ b/README.rst @@ -41,6 +41,8 @@ Basic usage:: :code:`use_blacklist`: use the blacklist of domains downloaded from https://github.com/martenson/disposable-email-domains +The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure instead of returning :code:`False`. + Auto-updater ============================ The package contains an auto-updater for downloading and updating the built-in blacklist.txt. It will run on each module load (and installation), but will try to update the content if the file is older than 5 days, and if the content is not the same that's already downloaded. diff --git a/tests/test_blacklist_check.py b/tests/test_blacklist_check.py index 0138ce4..21167bd 100644 --- a/tests/test_blacklist_check.py +++ b/tests/test_blacklist_check.py @@ -1,7 +1,8 @@ from unittest.case import TestCase -from validate_email import validate_email +from validate_email import validate_email, validate_email_or_fail from validate_email.domainlist_check import BlacklistUpdater, domainlist_check +from validate_email.exceptions import DomainBlacklistedError class BlacklistCheckTestCase(TestCase): @@ -14,14 +15,30 @@ class BlacklistCheckTestCase(TestCase): def test_blacklist_positive(self): 'Disallows blacklist item: mailinator.com.' domainlist_check._load_builtin_blacklist() - self.assertFalse(expr=domainlist_check( - user_part='pa2', domain_part='mailinator.com')) - self.assertFalse(expr=validate_email( - email_address='pa2@mailinator.com', check_regex=False, - use_blacklist=True)) - self.assertFalse(expr=validate_email( - email_address='pa2@mailinator.com', check_regex=True, - use_blacklist=True)) + with self.assertRaises(DomainBlacklistedError): + domainlist_check(user_part='pa2', domain_part='mailinator.com') + with self.assertRaises(DomainBlacklistedError): + validate_email_or_fail( + email_address='pa2@mailinator.com', + check_regex=False, + use_blacklist=True) + with self.assertRaises(DomainBlacklistedError): + validate_email_or_fail( + email_address='pa2@mailinator.com', + check_regex=True, + use_blacklist=True) + with self.assertLogs(): + self.assertFalse( + validate_email( + email_address='pa2@mailinator.com', + check_regex=False, + use_blacklist=True)) + with self.assertLogs(): + self.assertFalse( + validate_email( + email_address='pa2@mailinator.com', + check_regex=True, + use_blacklist=True)) def test_blacklist_negative(self): 'Allows a domain not in the blacklist.' diff --git a/tests/test_mx_check.py b/tests/test_mx_check.py index fca9fc9..244683b 100644 --- a/tests/test_mx_check.py +++ b/tests/test_mx_check.py @@ -5,6 +5,7 @@ from unittest.mock import Mock, patch from dns.exception import Timeout from validate_email import mx_check as mx_module +from validate_email.exceptions import DNSTimeoutError, NoValidMXError from validate_email.mx_check import ( _dissect_email, _get_idna_address, _get_mx_records) @@ -63,22 +64,16 @@ class GetMxRecordsTestCase(TestCase): 'Fails when an MX hostname is "."' TEST_QUERY.return_value = [ SimpleNamespace(exchange=DnsNameStub(value='.'))] - with self.assertRaises(ValueError) as exc: + with self.assertRaises(NoValidMXError): _get_mx_records(domain='testdomain1', timeout=10) - self.assertEqual( - exc.exception.args[0], - 'Domain testdomain1 does not have a valid MX record') @patch.object(target=mx_module, attribute='query', new=TEST_QUERY) def test_fails_with_null_hostnames(self): 'Fails when an MX hostname is invalid.' TEST_QUERY.return_value = [ SimpleNamespace(exchange=DnsNameStub(value='asdqwe'))] - with self.assertRaises(ValueError) as exc: + with self.assertRaises(NoValidMXError): _get_mx_records(domain='testdomain2', timeout=10) - self.assertEqual( - exc.exception.args[0], - 'Domain testdomain2 does not have a valid MX record') @patch.object(target=mx_module, attribute='query', new=TEST_QUERY) def test_filters_out_invalid_hostnames(self): @@ -93,13 +88,11 @@ class GetMxRecordsTestCase(TestCase): self.assertListEqual(result, ['valid.host.', 'valid2.host.']) @patch.object(target=mx_module, attribute='query', new=TEST_QUERY) - def test_raises_valueerror_on_dns_exception(self): - 'Raises `ValueError` on DNS exception.' + def test_raises_exception_on_dns_timeout(self): + 'Raises exception on DNS timeout.' TEST_QUERY.side_effect = Timeout() - with self.assertRaises(ValueError) as exc: + with self.assertRaises(DNSTimeoutError): _get_mx_records(domain='testdomain3', timeout=10) - self.assertEqual( - exc.exception.args[0], 'testdomain3 DNS resolve timed out') def test_returns_false_on_idna_failure(self): 'Returns `False` on IDNA failure.' diff --git a/tests/test_regex_check.py b/tests/test_regex_check.py index 0984026..350b61c 100644 --- a/tests/test_regex_check.py +++ b/tests/test_regex_check.py @@ -1,5 +1,6 @@ from unittest.case import TestCase +from validate_email.exceptions import AddressFormatError from validate_email.regex_check import regex_check from validate_email.validate_email import validate_email @@ -55,9 +56,9 @@ class FormatValidity(TestCase): 'Rejects an email with an invalid structure.' for address in INVALID_EXAMPLES: user_part, domain_part = address.rsplit('@', 1) - self.assertFalse( - expr=regex_check(user_part=user_part, domain_part=domain_part), - msg=f'Check is true with {address}') + with self.assertRaises( + AddressFormatError, msg=f'Test failed for {address}'): + regex_check(user_part=user_part, domain_part=domain_part), def test_unparseable_email(self): 'Rejects an unparseable email.' diff --git a/validate_email/__init__.py b/validate_email/__init__.py index 2edb0c3..a325e14 100644 --- a/validate_email/__init__.py +++ b/validate_email/__init__.py @@ -1,3 +1 @@ -from .validate_email import validate_email - -validate_email +from .validate_email import validate_email, validate_email_or_fail # noqa diff --git a/validate_email/domainlist_check.py b/validate_email/domainlist_check.py index 94c4cde..28abe08 100644 --- a/validate_email/domainlist_check.py +++ b/validate_email/domainlist_check.py @@ -1,5 +1,6 @@ from typing import Optional +from .exceptions import DomainBlacklistedError from .updater import BLACKLIST_FILE_PATH, BlacklistUpdater SetOrNone = Optional[set] @@ -38,7 +39,7 @@ class DomainListValidator(object): if domain_part in self.domain_whitelist: return True if domain_part in self.domain_blacklist: - return False + raise DomainBlacklistedError return True diff --git a/validate_email/exceptions.py b/validate_email/exceptions.py new file mode 100644 index 0000000..ccad1a4 --- /dev/null +++ b/validate_email/exceptions.py @@ -0,0 +1,82 @@ +class EmailValidationError(Exception): + """ + Base class for all exceptions indicating validation failure. + """ + message = 'Unknown error.' + + def __str__(self): + return self.message + + +class AddressFormatError(EmailValidationError): + """ + Raised when the email address has an invalid format. + """ + message = 'Invalid email address.' + + +class DomainBlacklistedError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'Domain blacklisted.' + + +class DomainNotFoundError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'Domain not found.' + + +class NoNameserverError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'No nameserver found for domain.' + + +class DNSTimeoutError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'Domain lookup timed out.' + + +class DNSConfigurationError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'Misconfigurated DNS entries for domain.' + + +class NoMXError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'No MX record for domain found.' + + +class NoValidMXError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'No valid MX record for domain found.' + + +class AddressNotDeliverableError(EmailValidationError): + """ + Raised when the domain of the email address is blacklisted on + https://git.com/martenson/disposable-email-domains. + """ + message = 'Non-deliverable email address:' + + def __init__(self, error_messages): + self.message = '\n'.join([self.message] + error_messages) diff --git a/validate_email/mx_check.py b/validate_email/mx_check.py index bf6828d..1bc5675 100644 --- a/validate_email/mx_check.py +++ b/validate_email/mx_check.py @@ -12,6 +12,10 @@ from dns.resolver import ( from idna.core import IDNAError, encode from .constants import EMAIL_EXTRACT_HOST_REGEX, HOST_REGEX +from .exceptions import ( + AddressFormatError, AddressNotDeliverableError, DNSConfigurationError, + DNSTimeoutError, DomainNotFoundError, NoMXError, NoNameserverError, + NoValidMXError) @lru_cache(maxsize=10) @@ -20,9 +24,9 @@ def _dissect_email(email_address: str) -> Tuple[str, str]: try: domain = EMAIL_EXTRACT_HOST_REGEX.search(string=email_address)[1] except TypeError: - raise ValueError('Invalid email address') + raise AddressFormatError(email_address) except IndexError: - raise ValueError('Invalid email address') + raise AddressFormatError(email_address) return email_address[:-(len(domain) + 1)], domain @@ -36,30 +40,29 @@ def _get_idna_address(email_address: str) -> str: def _get_mx_records(domain: str, timeout: int) -> list: """ - Return a list of hostnames in the MX record, raise `ValueError` on + Return a list of hostnames in the MX record, raise an exception on any issues. """ try: records = query( qname=domain, rdtype=rdtype_mx, lifetime=timeout) # type: Answer except NXDOMAIN: - raise ValueError(f'Domain {domain} does not seem to exist') - except NoAnswer: - raise ValueError(f'Domain {domain} does not have an MX record') - except Timeout: - raise ValueError(f'{domain} DNS resolve timed out') - except YXDOMAIN: - raise ValueError( - 'The DNS query name is too long after DNAME substitution.') + raise DomainNotFoundError except NoNameservers: - raise ValueError('No nameservers responded in time.') + raise NoNameserverError + except Timeout: + raise DNSTimeoutError + except YXDOMAIN: + raise DNSConfigurationError + except NoAnswer: + raise NoMXError to_check = dict() for record in records: # type: MX dns_str = record.exchange.to_text() # type: str to_check[dns_str] = dns_str[:-1] if dns_str.endswith('.') else dns_str result = [k for k, v in to_check.items() if HOST_REGEX.search(string=v)] if not len(result): - raise ValueError(f'Domain {domain} does not have a valid MX record') + raise NoValidMXError return result @@ -70,7 +73,8 @@ def _check_mx_records( 'Check the mx records for a given email address.' smtp = SMTP(timeout=smtp_timeout) smtp.set_debuglevel(debuglevel=0) - answers = set() + error_messages = [] + found_ambigious = False for mx_record in mx_records: try: smtp.connect(host=mx_record) @@ -79,18 +83,27 @@ def _check_mx_records( code, message = smtp.rcpt(recip=email_address) smtp.quit() except SMTPServerDisconnected: - answers.add(None) + found_ambigious = True continue - except SocketError: - answers.add(False) + except SocketError as error: + error_messages.append(f'{mx_record}: {error}') continue if code == 250: return True - if 400 <= code <= 499: + elif 400 <= code <= 499: # Ambigious return code, can be graylist, temporary # problems, quota or mailsystem error - answers.add(None) - return None if None in answers else False + found_ambigious = True + else: + message = message.decode(errors='ignore') + error_messages.append(f'{mx_record}: {code} {message}') + + # If any of the mx servers behaved ambigious, return None, otherwise raise + # an exceptin containing the collected error messages. + if found_ambigious: + return None + else: + raise AddressNotDeliverableError(error_messages) def mx_check( @@ -111,10 +124,7 @@ def mx_check( except IDNAError: return False _user, domain = _dissect_email(email_address=email_address) - try: - mx_records = _get_mx_records(domain=domain, timeout=dns_timeout) - except ValueError: - return False + mx_records = _get_mx_records(domain=domain, timeout=dns_timeout) return _check_mx_records( mx_records=mx_records, smtp_timeout=smtp_timeout, helo_host=host, from_address=idna_from, email_address=idna_to) diff --git a/validate_email/regex_check.py b/validate_email/regex_check.py index ed992bf..4c8d4cc 100644 --- a/validate_email/regex_check.py +++ b/validate_email/regex_check.py @@ -2,6 +2,7 @@ from ipaddress import IPv4Address, IPv6Address from typing import Optional from .constants import HOST_REGEX, LITERAL_REGEX, USER_REGEX +from .exceptions import AddressFormatError SetOrNone = Optional[set] @@ -38,7 +39,7 @@ class RegexValidator(object): self, user_part: str, domain_part: str, use_blacklist: bool = True) -> bool: if not USER_REGEX.match(user_part): - return False + raise AddressFormatError if not self.validate_domain_part(domain_part): # Try for possible IDN domain-part @@ -49,7 +50,7 @@ class RegexValidator(object): else: if self.validate_domain_part(domain_part): return True - return False + raise AddressFormatError return True def validate_domain_part(self, domain_part): diff --git a/validate_email/validate_email.py b/validate_email/validate_email.py index 963e1e9..de6e21c 100644 --- a/validate_email/validate_email.py +++ b/validate_email/validate_email.py @@ -1,33 +1,46 @@ +from logging import getLogger from typing import Optional from .domainlist_check import domainlist_check +from .exceptions import AddressFormatError, EmailValidationError from .mx_check import mx_check from .regex_check import regex_check -def validate_email( +def validate_email_or_fail( email_address: str, check_regex: bool = True, check_mx: bool = True, from_address: Optional[str] = None, helo_host: Optional[str] = None, smtp_timeout: int = 10, dns_timeout: int = 10, use_blacklist: bool = True) -> Optional[bool]: """ - Return `True` or `False` depending if the email address exists - or/and can be delivered. - - Return `None` if the result is ambigious. + Return `True` if the email address validation is successful, `None` if the + validation result is ambigious, and raise an exception if the validation + fails. """ if not email_address or '@' not in email_address: - return False + raise AddressFormatError user_part, domain_part = email_address.rsplit('@', 1) - if check_regex and \ - not regex_check(user_part=user_part, domain_part=domain_part): - return False - if use_blacklist and \ - not domainlist_check(user_part=user_part, domain_part=domain_part): - return False + if check_regex: + regex_check(user_part=user_part, domain_part=domain_part) + if use_blacklist: + domainlist_check(user_part=user_part, domain_part=domain_part) if not check_mx: return True return mx_check( email_address=email_address, from_address=from_address, helo_host=helo_host, smtp_timeout=smtp_timeout, dns_timeout=dns_timeout) + + +def validate_email(*args, **kwargs): + """ + Return `True` or `False` depending if the email address exists + or/and can be delivered. + + Return `None` if the result is ambigious. + """ + try: + return validate_email_or_fail(*args, **kwargs) + except EmailValidationError as error: + getLogger('validate_email').info(error) + return False From e1a634a7b32f5cd3b5db13863a36939061b87183 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reinhard=20M=C3=BCller?= Date: Thu, 9 Apr 2020 00:04:35 +0200 Subject: [PATCH 2/3] Improve logging output --- validate_email/validate_email.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/validate_email/validate_email.py b/validate_email/validate_email.py index de6e21c..ea9905c 100644 --- a/validate_email/validate_email.py +++ b/validate_email/validate_email.py @@ -32,7 +32,7 @@ def validate_email_or_fail( dns_timeout=dns_timeout) -def validate_email(*args, **kwargs): +def validate_email(email_address: str, *args, **kwargs): """ Return `True` or `False` depending if the email address exists or/and can be delivered. @@ -40,7 +40,8 @@ def validate_email(*args, **kwargs): Return `None` if the result is ambigious. """ try: - return validate_email_or_fail(*args, **kwargs) + return validate_email_or_fail(email_address, *args, **kwargs) except EmailValidationError as error: - getLogger('validate_email').info(error) + message = f'Validation for {email_address!r} failed: {error}' + getLogger('validate_email').info(message) return False From 55112f2c054aa4c64d2ef48dcecd463eb55eecbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Reinhard=20M=C3=BCller?= Date: Thu, 9 Apr 2020 00:35:51 +0200 Subject: [PATCH 3/3] Fix IDNAError handling --- tests/test_mx_check.py | 9 ++++++--- validate_email/mx_check.py | 6 +++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/test_mx_check.py b/tests/test_mx_check.py index 244683b..dfcfdb2 100644 --- a/tests/test_mx_check.py +++ b/tests/test_mx_check.py @@ -5,7 +5,8 @@ from unittest.mock import Mock, patch from dns.exception import Timeout from validate_email import mx_check as mx_module -from validate_email.exceptions import DNSTimeoutError, NoValidMXError +from validate_email.exceptions import ( + AddressFormatError, DNSTimeoutError, NoValidMXError) from validate_email.mx_check import ( _dissect_email, _get_idna_address, _get_mx_records) @@ -96,5 +97,7 @@ class GetMxRecordsTestCase(TestCase): def test_returns_false_on_idna_failure(self): 'Returns `False` on IDNA failure.' - self.assertFalse(expr=mx_module.mx_check( - email_address='test@♥web.de', from_address='mail@example.com')) + with self.assertRaises(AddressFormatError): + mx_module.mx_check( + email_address='test@♥web.de', + from_address='mail@example.com') diff --git a/validate_email/mx_check.py b/validate_email/mx_check.py index 1bc5675..a75c2df 100644 --- a/validate_email/mx_check.py +++ b/validate_email/mx_check.py @@ -24,9 +24,9 @@ def _dissect_email(email_address: str) -> Tuple[str, str]: try: domain = EMAIL_EXTRACT_HOST_REGEX.search(string=email_address)[1] except TypeError: - raise AddressFormatError(email_address) + raise AddressFormatError except IndexError: - raise AddressFormatError(email_address) + raise AddressFormatError return email_address[:-(len(domain) + 1)], domain @@ -122,7 +122,7 @@ def mx_check( try: idna_to = _get_idna_address(email_address=email_address) except IDNAError: - return False + raise AddressFormatError _user, domain = _dissect_email(email_address=email_address) mx_records = _get_mx_records(domain=domain, timeout=dns_timeout) return _check_mx_records(