Introduce EmailAddress class

This unifies the tasks of splitting an email address into user and
domain parts as well as converting an international domain name into the
ASCII-compatible encoding (ACE).
This commit is contained in:
Reinhard Müller 2020-04-13 09:44:50 +02:00
parent e3d2bf24c7
commit d7666bd6ad
11 changed files with 190 additions and 145 deletions

View File

@ -2,6 +2,7 @@ from unittest.case import TestCase
from validate_email.domainlist_check import (
domainlist_check, update_builtin_blacklist)
from validate_email.email_address import EmailAddress
from validate_email.exceptions import DomainBlacklistedError
from validate_email.validate_email import (
validate_email, validate_email_or_fail)
@ -16,7 +17,7 @@ class BlacklistCheckTestCase(TestCase):
def test_blacklist_positive(self):
'Disallows blacklist item: mailinator.com.'
with self.assertRaises(DomainBlacklistedError):
domainlist_check(user_part='pa2', domain_part='mailinator.com')
domainlist_check(EmailAddress('pa2@mailinator.com'))
with self.assertRaises(DomainBlacklistedError):
validate_email_or_fail(
email_address='pa2@mailinator.com', check_regex=False,
@ -37,5 +38,4 @@ class BlacklistCheckTestCase(TestCase):
def test_blacklist_negative(self):
'Allows a domain not in the blacklist.'
self.assertTrue(expr=domainlist_check(
user_part='pa2',
domain_part='some-random-domain-thats-not-blacklisted.com'))
EmailAddress('pa2@some-random-domain-thats-not-blacklisted.com')))

View File

@ -0,0 +1,68 @@
from unittest.case import TestCase
from validate_email import validate_email
from validate_email.email_address import EmailAddress
from validate_email.exceptions import AddressFormatError
class UserDomainTestCase(TestCase):
'Test the split of an email address into user and domain.'
valid_tests = {
'email@domain.com': ('email', 'domain.com'),
'email@subdomain.domain.com': ('email', 'subdomain.domain.com'),
'email@123.123.123.123': ('email', '123.123.123.123'),
'email@[123.123.123.123]': ('email', '[123.123.123.123]'),
'email@domain-one.com': ('email', 'domain-one.com'),
'email@domain.co.jp': ('email', 'domain.co.jp'),
}
invalid_tests = [
'plainaddress', # missing @ sign and domain
'email.domain.com', # missing @
]
def test_user_domain_valid(self):
'Splits email address into user and domain parts.'
for address, (user, domain) in self.valid_tests.items():
self.assertEqual(EmailAddress(address).user, user)
self.assertEqual(EmailAddress(address).domain, domain)
def test_user_domain_invalid(self):
'Rejects unparseable email address.'
for address in self.invalid_tests:
# This must be rejected directly by the EmailAddress constructor...
with self.assertRaises(AddressFormatError) as exc:
EmailAddress(address)
self.assertTupleEqual(exc.exception.args, ())
# ...and indirectly by validate_email().
self.assertFalse(validate_email(address))
class IdnaTestCase(TestCase):
'Testing IDNA conversion.'
valid_tests = {
'email@address.com': 'email@address.com',
'email@motörhéád.com': 'email@xn--motrhd-tta7d3f.com',
'email@[123.123.123.123]': ('email@[123.123.123.123]'),
}
invalid_tests = [
'test@♥web.de',
]
def test_idna_conversion_valid(self):
'Converts email address into ASCII-compatible encoding.'
for address, ace in self.valid_tests.items():
self.assertEqual(EmailAddress(address).ace, ace)
def test_idna_conversion_invalid(self):
'Rejects email address which is not IDNA-convertible.'
for address in self.invalid_tests:
# This must be rejected directly by the EmailAddress constructor...
with self.assertRaises(AddressFormatError) as exc:
EmailAddress(address)
self.assertTupleEqual(exc.exception.args, ())
# ...and indirectly by validate_email().
self.assertFalse(validate_email(address))

View File

@ -5,19 +5,8 @@ from unittest.mock import Mock, patch
from dns.exception import Timeout
from validate_email import mx_check as mx_module
from validate_email.exceptions import (
AddressFormatError, DNSTimeoutError, NoValidMXError)
from validate_email.mx_check import (
_dissect_email, _get_idna_address, _get_mx_records)
DOMAINS = {
'email@domain.com': 'domain.com',
'email@subdomain.domain.com': 'subdomain.domain.com',
'email@123.123.123.123': '123.123.123.123',
'email@[123.123.123.123]': '123.123.123.123',
'email@domain-one.com': 'domain-one.com',
'email@domain.co.jp': 'domain.co.jp',
}
from validate_email.exceptions import DNSTimeoutError, NoValidMXError
from validate_email.mx_check import _get_mx_records
class DnsNameStub(object):
@ -33,30 +22,6 @@ class DnsNameStub(object):
TEST_QUERY = Mock()
class DomainTestCase(TestCase):
def test_domain_from_email_address(self):
for address, domain in DOMAINS.items():
_user, domain_from_function = _dissect_email(email_address=address)
self.assertEqual(domain_from_function, domain)
class IdnaTestCase(TestCase):
'Testing IDNA converting.'
def test_resolves_idna_domains(self):
'Resolves email@motörhéád.com.'
self.assertEqual(
first=_get_idna_address(email_address='email@motörhéád.com'),
second='email@xn--motrhd-tta7d3f.com')
def test_resolves_conventional_domains(self):
'Resolves email@address.com.'
self.assertEqual(
first=_get_idna_address(email_address='email@address.com'),
second='email@address.com')
class GetMxRecordsTestCase(TestCase):
'Testing `_get_mx_records`.'
@ -97,10 +62,3 @@ class GetMxRecordsTestCase(TestCase):
with self.assertRaises(DNSTimeoutError) as exc:
_get_mx_records(domain='testdomain3', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
def test_returns_false_on_idna_failure(self):
'Returns `False` on IDNA failure.'
with self.assertRaises(AddressFormatError) as exc:
mx_module.mx_check(
email_address='test@♥web.de', from_address='mail@example.com')
self.assertTupleEqual(exc.exception.args, ())

View File

@ -1,8 +1,8 @@
from unittest.case import TestCase
from validate_email.email_address import EmailAddress
from validate_email.exceptions import AddressFormatError
from validate_email.regex_check import regex_check
from validate_email.validate_email import validate_email
VALID_EXAMPLES = [
'email@domain.com', # basic valid email
@ -35,11 +35,6 @@ INVALID_EXAMPLES = [
'email@domain..com', # multiple dot in the domain portion is invalid
]
UNPARSEABLE_EXAMPLES = [
'plainaddress', # missing @ sign and domain
'email.domain.com', # missing @
]
class FormatValidity(TestCase):
'Testing regex validation + format validity.'
@ -47,21 +42,14 @@ class FormatValidity(TestCase):
def test_valid_email_structure_regex(self):
'Accepts an email with a valid structure.'
for address in VALID_EXAMPLES:
user_part, domain_part = address.rsplit('@', 1)
self.assertTrue(
expr=regex_check(user_part=user_part, domain_part=domain_part),
expr=regex_check(EmailAddress(address)),
msg=f'Check is not true with {address}')
def test_invalid_email_structure_regex(self):
'Rejects an email with an invalid structure.'
for address in INVALID_EXAMPLES:
user_part, domain_part = address.rsplit('@', 1)
with self.assertRaises(
expected_exception=AddressFormatError,
msg=f'Test failed for {address}'):
regex_check(user_part=user_part, domain_part=domain_part),
def test_unparseable_email(self):
'Rejects an unparseable email.'
for address in UNPARSEABLE_EXAMPLES:
self.assertFalse(expr=validate_email(email_address=address))
regex_check(EmailAddress(address))

View File

@ -5,7 +5,6 @@ HOST_REGEX = re_compile(
# max length for domain name labels is 63 characters per RFC 1034
r'((?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+)'
r'(?:[A-Z0-9-]{2,63}(?<!-))\Z', IGNORECASE)
EMAIL_EXTRACT_HOST_REGEX = re_compile(r'(?<=@)\[?([^\[\]]+)')
LITERAL_REGEX = re_compile(
# literal form, ipv4 or ipv6 address (SMTP 4.1.3)
r'\[([A-f0-9:\.]+)\]\Z', IGNORECASE)

View File

@ -3,6 +3,7 @@ from typing import Optional
from filelock import FileLock
from .email_address import EmailAddress
from .exceptions import DomainBlacklistedError
from .updater import (
BLACKLIST_FILEPATH_INSTALLED, BLACKLIST_FILEPATH_TMP, LOCK_PATH,
@ -55,11 +56,11 @@ class DomainListValidator(object):
self.domain_blacklist = set(
x.strip().lower() for x in lines if x.strip())
def __call__(self, user_part: str, domain_part: str) -> bool:
def __call__(self, address: EmailAddress) -> bool:
'Do the checking here.'
if domain_part in self.domain_whitelist:
if address.domain in self.domain_whitelist:
return True
if domain_part in self.domain_blacklist:
if address.domain in self.domain_blacklist:
raise DomainBlacklistedError
return True

View File

@ -0,0 +1,60 @@
from idna.core import IDNAError, encode
from .exceptions import AddressFormatError
class EmailAddress(object):
"""
Internally used class to hold an email address.
This class featuers splitting the email address into user and domain
part as well as converting internationalized domain name into the
ASCII-compatible encoding (ACE) according to the IDNA standard.
"""
def __init__(self, address: str):
self._address = address
# Split email address into user and domain part.
try:
self._user, self._domain = self._address.rsplit('@', 1)
except ValueError:
raise AddressFormatError
# Convert internationalized domain name into the ACE encoding
if self._domain.startswith('[') and self._domain.endswith(']'):
self._ace_domain = self._domain
else:
try:
self._ace_domain = encode(self._domain).decode('ascii')
except IDNAError:
raise AddressFormatError
@property
def user(self) -> str:
"""
The username part of the email address, that is the part before
the "@" sign.
"""
return self._user
@property
def domain(self) -> str:
"""
The domain part of the email address, that is the part after the
"@" sign.
"""
return self._domain
@property
def ace(self) -> str:
'The ASCII-compatible encoding for the email address.'
return '@'.join((self._user, self._ace_domain))
@property
def ace_domain(self) -> str:
"""
The ASCII-compatible encoding for the domain part of the email
address.
"""
return self._ace_domain

View File

@ -14,6 +14,14 @@ class AddressFormatError(EmailValidationError):
message = 'Invalid email address.'
class FromAddressFormatError(EmailValidationError):
"""
Raised when the from email address used for the MX check has an
invalid format.
"""
message = 'Invalid "From:" email address.'
class DomainBlacklistedError(EmailValidationError):
"""
Raised when the domain of the email address is blacklisted on

View File

@ -1,41 +1,19 @@
from functools import lru_cache
from smtplib import SMTP, SMTPServerDisconnected
from socket import error as SocketError
from socket import gethostname
from typing import Optional, Tuple
from typing import Optional
from dns.exception import Timeout
from dns.rdatatype import MX as rdtype_mx
from dns.rdtypes.ANY.MX import MX
from dns.resolver import (
NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, query)
from idna.core import IDNAError, encode
from .constants import EMAIL_EXTRACT_HOST_REGEX, HOST_REGEX
from .constants import HOST_REGEX
from .email_address import EmailAddress
from .exceptions import (
AddressFormatError, AddressNotDeliverableError, DNSConfigurationError,
DNSTimeoutError, DomainNotFoundError, NoMXError, NoNameserverError,
NoValidMXError)
@lru_cache(maxsize=10)
def _dissect_email(email_address: str) -> Tuple[str, str]:
'Return a tuple of the user and domain part.'
try:
domain = EMAIL_EXTRACT_HOST_REGEX.search(string=email_address)[1]
except TypeError:
raise AddressFormatError
except IndexError:
raise AddressFormatError
return email_address[:-(len(domain) + 1)], domain
@lru_cache(maxsize=10)
def _get_idna_address(email_address: str) -> str:
'Return an IDNA converted email address.'
user, domain = _dissect_email(email_address=email_address)
idna_resolved_domain = encode(s=domain).decode('ascii')
return f'{user}@{idna_resolved_domain}'
AddressNotDeliverableError, DNSConfigurationError, DNSTimeoutError,
DomainNotFoundError, NoMXError, NoNameserverError, NoValidMXError)
def _get_mx_records(domain: str, timeout: int) -> list:
@ -68,7 +46,7 @@ def _get_mx_records(domain: str, timeout: int) -> list:
def _check_one_mx(
smtp: SMTP, error_messages: list, mx_record: str, helo_host: str,
from_address: str, email_address: str) -> bool:
from_address: EmailAddress, email_address: EmailAddress) -> bool:
"""
Check one MX server, return the `is_ambigious` boolean or raise
`StopIteration` if this MX accepts the email.
@ -76,8 +54,8 @@ def _check_one_mx(
try:
smtp.connect(host=mx_record)
smtp.helo(name=helo_host)
smtp.mail(sender=from_address)
code, message = smtp.rcpt(recip=email_address)
smtp.mail(sender=from_address.ace)
code, message = smtp.rcpt(recip=email_address.ace)
smtp.quit()
except SMTPServerDisconnected:
return True
@ -96,8 +74,8 @@ def _check_one_mx(
def _check_mx_records(
mx_records: list, smtp_timeout: int, helo_host: str, from_address: str,
email_address: str
mx_records: list, smtp_timeout: int, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress
) -> Optional[bool]:
'Check the mx records for a given email address.'
smtp = SMTP(timeout=smtp_timeout)
@ -119,7 +97,7 @@ def _check_mx_records(
def mx_check(
email_address: str, from_address: Optional[str] = None,
email_address: EmailAddress, from_address: Optional[EmailAddress] = None,
helo_host: Optional[str] = None, smtp_timeout: int = 10,
dns_timeout: int = 10
) -> Optional[bool]:
@ -130,13 +108,9 @@ def mx_check(
(e.g. temporary errors or graylisting).
"""
host = helo_host or gethostname()
idna_from = _get_idna_address(email_address=from_address or email_address)
try:
idna_to = _get_idna_address(email_address=email_address)
except IDNAError:
raise AddressFormatError
_user, domain = _dissect_email(email_address=email_address)
mx_records = _get_mx_records(domain=domain, timeout=dns_timeout)
from_address = from_address or email_address
mx_records = _get_mx_records(
domain=email_address.domain, timeout=dns_timeout)
return _check_mx_records(
mx_records=mx_records, smtp_timeout=smtp_timeout, helo_host=host,
from_address=idna_from, email_address=idna_to)
from_address=from_address, email_address=email_address)

View File

@ -1,11 +1,9 @@
from ipaddress import IPv4Address, IPv6Address
from typing import Optional
from .constants import HOST_REGEX, LITERAL_REGEX, USER_REGEX
from .email_address import EmailAddress
from .exceptions import AddressFormatError
SetOrNone = Optional[set]
def _validate_ipv4_address(value: str):
try:
@ -27,41 +25,26 @@ def _validate_ipv6_address(value: str) -> bool:
def _validate_ipv46_address(value: str) -> bool:
if _validate_ipv4_address(value):
return True
return _validate_ipv6_address(value)
return _validate_ipv4_address(value) or _validate_ipv6_address(value)
class RegexValidator(object):
def regex_check(address: EmailAddress) -> bool:
'Slightly adjusted email regex checker from the Django project.'
def __call__(
self, user_part: str, domain_part: str,
use_blacklist: bool = True) -> bool:
if not USER_REGEX.match(user_part):
raise AddressFormatError
# Validate user part.
if not USER_REGEX.match(address.user):
raise AddressFormatError
if not self.validate_domain_part(domain_part):
# Try for possible IDN domain-part
try:
domain_part = domain_part.encode('idna').decode('ascii')
except UnicodeError:
pass
else:
if self.validate_domain_part(domain_part):
return True
raise AddressFormatError
# Validate domain part: a) hostname.
if HOST_REGEX.match(address.ace_domain):
return True
def validate_domain_part(self, domain_part: str):
if HOST_REGEX.match(domain_part):
# Validate domain part: b) literal IP address.
literal_match = LITERAL_REGEX.match(address.ace_domain)
if literal_match:
ip_address = literal_match.group(1)
if _validate_ipv46_address(ip_address):
return True
literal_match = LITERAL_REGEX.match(domain_part)
if literal_match:
ip_address = literal_match.group(1)
return _validate_ipv46_address(ip_address)
return False
regex_check = RegexValidator()
# Domain part not successfully validated.
raise AddressFormatError

View File

@ -2,7 +2,9 @@ from logging import getLogger
from typing import Optional
from .domainlist_check import domainlist_check
from .exceptions import AddressFormatError, EmailValidationError
from .email_address import EmailAddress
from .exceptions import (
AddressFormatError, EmailValidationError, FromAddressFormatError)
from .mx_check import mx_check
from .regex_check import regex_check
@ -19,13 +21,17 @@ def validate_email_or_fail(
validation result is ambigious, and raise an exception if the validation
fails.
"""
if not email_address or '@' not in email_address:
raise AddressFormatError
user_part, domain_part = email_address.rsplit('@', 1)
email_address = EmailAddress(email_address)
if from_address is not None:
try:
from_address = EmailAddress(from_address)
except AddressFormatError:
raise FromAddressFormatError
if check_regex:
regex_check(user_part=user_part, domain_part=domain_part)
regex_check(email_address)
if use_blacklist:
domainlist_check(user_part=user_part, domain_part=domain_part)
domainlist_check(email_address)
if not check_mx:
return True
return mx_check(