diff --git a/tests/test_blacklist_check.py b/tests/test_blacklist_check.py index 5f4e4f6..5bf97a1 100644 --- a/tests/test_blacklist_check.py +++ b/tests/test_blacklist_check.py @@ -2,6 +2,7 @@ from unittest.case import TestCase from validate_email.domainlist_check import ( domainlist_check, update_builtin_blacklist) +from validate_email.email_address import EmailAddress from validate_email.exceptions import DomainBlacklistedError from validate_email.validate_email import ( validate_email, validate_email_or_fail) @@ -16,7 +17,7 @@ class BlacklistCheckTestCase(TestCase): def test_blacklist_positive(self): 'Disallows blacklist item: mailinator.com.' with self.assertRaises(DomainBlacklistedError): - domainlist_check(user_part='pa2', domain_part='mailinator.com') + domainlist_check(EmailAddress('pa2@mailinator.com')) with self.assertRaises(DomainBlacklistedError): validate_email_or_fail( email_address='pa2@mailinator.com', check_regex=False, @@ -37,5 +38,4 @@ class BlacklistCheckTestCase(TestCase): def test_blacklist_negative(self): 'Allows a domain not in the blacklist.' self.assertTrue(expr=domainlist_check( - user_part='pa2', - domain_part='some-random-domain-thats-not-blacklisted.com')) + EmailAddress('pa2@some-random-domain-thats-not-blacklisted.com'))) diff --git a/tests/test_email_address.py b/tests/test_email_address.py new file mode 100644 index 0000000..6821489 --- /dev/null +++ b/tests/test_email_address.py @@ -0,0 +1,68 @@ +from unittest.case import TestCase + +from validate_email import validate_email +from validate_email.email_address import EmailAddress +from validate_email.exceptions import AddressFormatError + + +class UserDomainTestCase(TestCase): + 'Test the split of an email address into user and domain.' + + valid_tests = { + 'email@domain.com': ('email', 'domain.com'), + 'email@subdomain.domain.com': ('email', 'subdomain.domain.com'), + 'email@123.123.123.123': ('email', '123.123.123.123'), + 'email@[123.123.123.123]': ('email', '[123.123.123.123]'), + 'email@domain-one.com': ('email', 'domain-one.com'), + 'email@domain.co.jp': ('email', 'domain.co.jp'), + } + + invalid_tests = [ + 'plainaddress', # missing @ sign and domain + 'email.domain.com', # missing @ + ] + + def test_user_domain_valid(self): + 'Splits email address into user and domain parts.' + for address, (user, domain) in self.valid_tests.items(): + self.assertEqual(EmailAddress(address).user, user) + self.assertEqual(EmailAddress(address).domain, domain) + + def test_user_domain_invalid(self): + 'Rejects unparseable email address.' + for address in self.invalid_tests: + # This must be rejected directly by the EmailAddress constructor... + with self.assertRaises(AddressFormatError) as exc: + EmailAddress(address) + self.assertTupleEqual(exc.exception.args, ()) + # ...and indirectly by validate_email(). + self.assertFalse(validate_email(address)) + + +class IdnaTestCase(TestCase): + 'Testing IDNA conversion.' + + valid_tests = { + 'email@address.com': 'email@address.com', + 'email@motörhéád.com': 'email@xn--motrhd-tta7d3f.com', + 'email@[123.123.123.123]': ('email@[123.123.123.123]'), + } + + invalid_tests = [ + 'test@♥web.de', + ] + + def test_idna_conversion_valid(self): + 'Converts email address into ASCII-compatible encoding.' + for address, ace in self.valid_tests.items(): + self.assertEqual(EmailAddress(address).ace, ace) + + def test_idna_conversion_invalid(self): + 'Rejects email address which is not IDNA-convertible.' + for address in self.invalid_tests: + # This must be rejected directly by the EmailAddress constructor... + with self.assertRaises(AddressFormatError) as exc: + EmailAddress(address) + self.assertTupleEqual(exc.exception.args, ()) + # ...and indirectly by validate_email(). + self.assertFalse(validate_email(address)) diff --git a/tests/test_mx_check.py b/tests/test_mx_check.py index f1979b5..a2cde64 100644 --- a/tests/test_mx_check.py +++ b/tests/test_mx_check.py @@ -5,19 +5,8 @@ from unittest.mock import Mock, patch from dns.exception import Timeout from validate_email import mx_check as mx_module -from validate_email.exceptions import ( - AddressFormatError, DNSTimeoutError, NoValidMXError) -from validate_email.mx_check import ( - _dissect_email, _get_idna_address, _get_mx_records) - -DOMAINS = { - 'email@domain.com': 'domain.com', - 'email@subdomain.domain.com': 'subdomain.domain.com', - 'email@123.123.123.123': '123.123.123.123', - 'email@[123.123.123.123]': '123.123.123.123', - 'email@domain-one.com': 'domain-one.com', - 'email@domain.co.jp': 'domain.co.jp', -} +from validate_email.exceptions import DNSTimeoutError, NoValidMXError +from validate_email.mx_check import _get_mx_records class DnsNameStub(object): @@ -33,30 +22,6 @@ class DnsNameStub(object): TEST_QUERY = Mock() -class DomainTestCase(TestCase): - - def test_domain_from_email_address(self): - for address, domain in DOMAINS.items(): - _user, domain_from_function = _dissect_email(email_address=address) - self.assertEqual(domain_from_function, domain) - - -class IdnaTestCase(TestCase): - 'Testing IDNA converting.' - - def test_resolves_idna_domains(self): - 'Resolves email@motörhéád.com.' - self.assertEqual( - first=_get_idna_address(email_address='email@motörhéád.com'), - second='email@xn--motrhd-tta7d3f.com') - - def test_resolves_conventional_domains(self): - 'Resolves email@address.com.' - self.assertEqual( - first=_get_idna_address(email_address='email@address.com'), - second='email@address.com') - - class GetMxRecordsTestCase(TestCase): 'Testing `_get_mx_records`.' @@ -97,10 +62,3 @@ class GetMxRecordsTestCase(TestCase): with self.assertRaises(DNSTimeoutError) as exc: _get_mx_records(domain='testdomain3', timeout=10) self.assertTupleEqual(exc.exception.args, ()) - - def test_returns_false_on_idna_failure(self): - 'Returns `False` on IDNA failure.' - with self.assertRaises(AddressFormatError) as exc: - mx_module.mx_check( - email_address='test@♥web.de', from_address='mail@example.com') - self.assertTupleEqual(exc.exception.args, ()) diff --git a/tests/test_regex_check.py b/tests/test_regex_check.py index efba69f..e7cdf79 100644 --- a/tests/test_regex_check.py +++ b/tests/test_regex_check.py @@ -1,8 +1,8 @@ from unittest.case import TestCase +from validate_email.email_address import EmailAddress from validate_email.exceptions import AddressFormatError from validate_email.regex_check import regex_check -from validate_email.validate_email import validate_email VALID_EXAMPLES = [ 'email@domain.com', # basic valid email @@ -35,11 +35,6 @@ INVALID_EXAMPLES = [ 'email@domain..com', # multiple dot in the domain portion is invalid ] -UNPARSEABLE_EXAMPLES = [ - 'plainaddress', # missing @ sign and domain - 'email.domain.com', # missing @ -] - class FormatValidity(TestCase): 'Testing regex validation + format validity.' @@ -47,21 +42,14 @@ class FormatValidity(TestCase): def test_valid_email_structure_regex(self): 'Accepts an email with a valid structure.' for address in VALID_EXAMPLES: - user_part, domain_part = address.rsplit('@', 1) self.assertTrue( - expr=regex_check(user_part=user_part, domain_part=domain_part), + expr=regex_check(EmailAddress(address)), msg=f'Check is not true with {address}') def test_invalid_email_structure_regex(self): 'Rejects an email with an invalid structure.' for address in INVALID_EXAMPLES: - user_part, domain_part = address.rsplit('@', 1) with self.assertRaises( expected_exception=AddressFormatError, msg=f'Test failed for {address}'): - regex_check(user_part=user_part, domain_part=domain_part), - - def test_unparseable_email(self): - 'Rejects an unparseable email.' - for address in UNPARSEABLE_EXAMPLES: - self.assertFalse(expr=validate_email(email_address=address)) + regex_check(EmailAddress(address)) diff --git a/validate_email/constants.py b/validate_email/constants.py index 6f41c9e..b4827f1 100644 --- a/validate_email/constants.py +++ b/validate_email/constants.py @@ -5,7 +5,6 @@ HOST_REGEX = re_compile( # max length for domain name labels is 63 characters per RFC 1034 r'((?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+)' r'(?:[A-Z0-9-]{2,63}(? bool: + def __call__(self, address: EmailAddress) -> bool: 'Do the checking here.' - if domain_part in self.domain_whitelist: + if address.domain in self.domain_whitelist: return True - if domain_part in self.domain_blacklist: + if address.domain in self.domain_blacklist: raise DomainBlacklistedError return True diff --git a/validate_email/email_address.py b/validate_email/email_address.py new file mode 100644 index 0000000..52e768f --- /dev/null +++ b/validate_email/email_address.py @@ -0,0 +1,60 @@ +from idna.core import IDNAError, encode + +from .exceptions import AddressFormatError + + +class EmailAddress(object): + """ + Internally used class to hold an email address. + + This class featuers splitting the email address into user and domain + part as well as converting internationalized domain name into the + ASCII-compatible encoding (ACE) according to the IDNA standard. + """ + + def __init__(self, address: str): + self._address = address + + # Split email address into user and domain part. + try: + self._user, self._domain = self._address.rsplit('@', 1) + except ValueError: + raise AddressFormatError + + # Convert internationalized domain name into the ACE encoding + if self._domain.startswith('[') and self._domain.endswith(']'): + self._ace_domain = self._domain + else: + try: + self._ace_domain = encode(self._domain).decode('ascii') + except IDNAError: + raise AddressFormatError + + @property + def user(self) -> str: + """ + The username part of the email address, that is the part before + the "@" sign. + """ + return self._user + + @property + def domain(self) -> str: + """ + The domain part of the email address, that is the part after the + "@" sign. + """ + return self._domain + + @property + def ace(self) -> str: + 'The ASCII-compatible encoding for the email address.' + return '@'.join((self._user, self._ace_domain)) + + @property + def ace_domain(self) -> str: + """ + The ASCII-compatible encoding for the domain part of the email + address. + """ + return self._ace_domain diff --git a/validate_email/exceptions.py b/validate_email/exceptions.py index 5a067af..d08a714 100644 --- a/validate_email/exceptions.py +++ b/validate_email/exceptions.py @@ -14,6 +14,14 @@ class AddressFormatError(EmailValidationError): message = 'Invalid email address.' +class FromAddressFormatError(EmailValidationError): + """ + Raised when the from email address used for the MX check has an + invalid format. + """ + message = 'Invalid "From:" email address.' + + class DomainBlacklistedError(EmailValidationError): """ Raised when the domain of the email address is blacklisted on diff --git a/validate_email/mx_check.py b/validate_email/mx_check.py index 0717063..3e99c6f 100644 --- a/validate_email/mx_check.py +++ b/validate_email/mx_check.py @@ -1,41 +1,19 @@ -from functools import lru_cache from smtplib import SMTP, SMTPServerDisconnected from socket import error as SocketError from socket import gethostname -from typing import Optional, Tuple +from typing import Optional from dns.exception import Timeout from dns.rdatatype import MX as rdtype_mx from dns.rdtypes.ANY.MX import MX from dns.resolver import ( NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, query) -from idna.core import IDNAError, encode -from .constants import EMAIL_EXTRACT_HOST_REGEX, HOST_REGEX +from .constants import HOST_REGEX +from .email_address import EmailAddress from .exceptions import ( - AddressFormatError, AddressNotDeliverableError, DNSConfigurationError, - DNSTimeoutError, DomainNotFoundError, NoMXError, NoNameserverError, - NoValidMXError) - - -@lru_cache(maxsize=10) -def _dissect_email(email_address: str) -> Tuple[str, str]: - 'Return a tuple of the user and domain part.' - try: - domain = EMAIL_EXTRACT_HOST_REGEX.search(string=email_address)[1] - except TypeError: - raise AddressFormatError - except IndexError: - raise AddressFormatError - return email_address[:-(len(domain) + 1)], domain - - -@lru_cache(maxsize=10) -def _get_idna_address(email_address: str) -> str: - 'Return an IDNA converted email address.' - user, domain = _dissect_email(email_address=email_address) - idna_resolved_domain = encode(s=domain).decode('ascii') - return f'{user}@{idna_resolved_domain}' + AddressNotDeliverableError, DNSConfigurationError, DNSTimeoutError, + DomainNotFoundError, NoMXError, NoNameserverError, NoValidMXError) def _get_mx_records(domain: str, timeout: int) -> list: @@ -68,7 +46,7 @@ def _get_mx_records(domain: str, timeout: int) -> list: def _check_one_mx( smtp: SMTP, error_messages: list, mx_record: str, helo_host: str, - from_address: str, email_address: str) -> bool: + from_address: EmailAddress, email_address: EmailAddress) -> bool: """ Check one MX server, return the `is_ambigious` boolean or raise `StopIteration` if this MX accepts the email. @@ -76,8 +54,8 @@ def _check_one_mx( try: smtp.connect(host=mx_record) smtp.helo(name=helo_host) - smtp.mail(sender=from_address) - code, message = smtp.rcpt(recip=email_address) + smtp.mail(sender=from_address.ace) + code, message = smtp.rcpt(recip=email_address.ace) smtp.quit() except SMTPServerDisconnected: return True @@ -96,8 +74,8 @@ def _check_one_mx( def _check_mx_records( - mx_records: list, smtp_timeout: int, helo_host: str, from_address: str, - email_address: str + mx_records: list, smtp_timeout: int, helo_host: str, + from_address: EmailAddress, email_address: EmailAddress ) -> Optional[bool]: 'Check the mx records for a given email address.' smtp = SMTP(timeout=smtp_timeout) @@ -119,7 +97,7 @@ def _check_mx_records( def mx_check( - email_address: str, from_address: Optional[str] = None, + email_address: EmailAddress, from_address: Optional[EmailAddress] = None, helo_host: Optional[str] = None, smtp_timeout: int = 10, dns_timeout: int = 10 ) -> Optional[bool]: @@ -130,13 +108,9 @@ def mx_check( (e.g. temporary errors or graylisting). """ host = helo_host or gethostname() - idna_from = _get_idna_address(email_address=from_address or email_address) - try: - idna_to = _get_idna_address(email_address=email_address) - except IDNAError: - raise AddressFormatError - _user, domain = _dissect_email(email_address=email_address) - mx_records = _get_mx_records(domain=domain, timeout=dns_timeout) + from_address = from_address or email_address + mx_records = _get_mx_records( + domain=email_address.domain, timeout=dns_timeout) return _check_mx_records( mx_records=mx_records, smtp_timeout=smtp_timeout, helo_host=host, - from_address=idna_from, email_address=idna_to) + from_address=from_address, email_address=email_address) diff --git a/validate_email/regex_check.py b/validate_email/regex_check.py index b8fc64a..eb4335a 100644 --- a/validate_email/regex_check.py +++ b/validate_email/regex_check.py @@ -1,11 +1,9 @@ from ipaddress import IPv4Address, IPv6Address -from typing import Optional from .constants import HOST_REGEX, LITERAL_REGEX, USER_REGEX +from .email_address import EmailAddress from .exceptions import AddressFormatError -SetOrNone = Optional[set] - def _validate_ipv4_address(value: str): try: @@ -27,41 +25,26 @@ def _validate_ipv6_address(value: str) -> bool: def _validate_ipv46_address(value: str) -> bool: - if _validate_ipv4_address(value): - return True - return _validate_ipv6_address(value) + return _validate_ipv4_address(value) or _validate_ipv6_address(value) -class RegexValidator(object): +def regex_check(address: EmailAddress) -> bool: 'Slightly adjusted email regex checker from the Django project.' - def __call__( - self, user_part: str, domain_part: str, - use_blacklist: bool = True) -> bool: - if not USER_REGEX.match(user_part): - raise AddressFormatError + # Validate user part. + if not USER_REGEX.match(address.user): + raise AddressFormatError - if not self.validate_domain_part(domain_part): - # Try for possible IDN domain-part - try: - domain_part = domain_part.encode('idna').decode('ascii') - except UnicodeError: - pass - else: - if self.validate_domain_part(domain_part): - return True - raise AddressFormatError + # Validate domain part: a) hostname. + if HOST_REGEX.match(address.ace_domain): return True - def validate_domain_part(self, domain_part: str): - if HOST_REGEX.match(domain_part): + # Validate domain part: b) literal IP address. + literal_match = LITERAL_REGEX.match(address.ace_domain) + if literal_match: + ip_address = literal_match.group(1) + if _validate_ipv46_address(ip_address): return True - literal_match = LITERAL_REGEX.match(domain_part) - if literal_match: - ip_address = literal_match.group(1) - return _validate_ipv46_address(ip_address) - return False - - -regex_check = RegexValidator() + # Domain part not successfully validated. + raise AddressFormatError diff --git a/validate_email/validate_email.py b/validate_email/validate_email.py index 430bd54..b28ceaf 100644 --- a/validate_email/validate_email.py +++ b/validate_email/validate_email.py @@ -2,7 +2,9 @@ from logging import getLogger from typing import Optional from .domainlist_check import domainlist_check -from .exceptions import AddressFormatError, EmailValidationError +from .email_address import EmailAddress +from .exceptions import ( + AddressFormatError, EmailValidationError, FromAddressFormatError) from .mx_check import mx_check from .regex_check import regex_check @@ -19,13 +21,17 @@ def validate_email_or_fail( validation result is ambigious, and raise an exception if the validation fails. """ - if not email_address or '@' not in email_address: - raise AddressFormatError - user_part, domain_part = email_address.rsplit('@', 1) + email_address = EmailAddress(email_address) + if from_address is not None: + try: + from_address = EmailAddress(from_address) + except AddressFormatError: + raise FromAddressFormatError + if check_regex: - regex_check(user_part=user_part, domain_part=domain_part) + regex_check(email_address) if use_blacklist: - domainlist_check(user_part=user_part, domain_part=domain_part) + domainlist_check(email_address) if not check_mx: return True return mx_check(