Fixing a rare error where aol.co returns '.' as MX

This commit is contained in:
László Károlyi 2019-04-03 21:47:37 +02:00
parent 807b966ad6
commit acad51a5bb
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
4 changed files with 92 additions and 26 deletions

View File

@ -1,6 +1,9 @@
from unittest.case import TestCase
from unittest.mock import Mock, patch
from types import SimpleNamespace
from validate_email.mx_check import _get_domain_from_email_address
from validate_email import mx_check as mx_module
from validate_email.mx_check import _get_domain_from_email_address, _get_mx_records
DOMAINS = {
'email@domain.com': 'domain.com',
@ -12,9 +15,60 @@ DOMAINS = {
}
class MxTestCase(TestCase):
class DnsNameStub(object):
'Stub for `dns.name.Name`.'
def __init__(self, value: str):
self.value = value
def to_text(self) -> str:
return self.value
TEST_QUERY = Mock()
class DomainTestCase(TestCase):
def test_domain_from_email_address(self):
for address, domain in DOMAINS.items():
domain_from_function = _get_domain_from_email_address(address)
self.assertEqual(domain_from_function, domain)
class GetMxRecordsTest(TestCase):
'Testing `_get_mx_records`.'
@patch.object(target=mx_module, attribute='query', new=TEST_QUERY)
def test_fails_with_invalid_hostnames(self):
'Fails when an MX hostname is "."'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='.'))]
with self.assertRaises(ValueError) as exc:
_get_mx_records(domain='testdomain1')
self.assertEqual(
exc.exception.args[0],
'Domain testdomain1 does not have a valid MX record')
@patch.object(target=mx_module, attribute='query', new=TEST_QUERY)
def test_fails_with_null_hostnames(self):
'Fails when an MX hostname is invalid.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe'))]
with self.assertRaises(ValueError) as exc:
_get_mx_records(domain='testdomain2')
self.assertEqual(
exc.exception.args[0],
'Domain testdomain2 does not have a valid MX record')
@patch.object(target=mx_module, attribute='query', new=TEST_QUERY)
def test_filters_out_invalid_hostnames(self):
'Returns only the valid hostnames.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe')),
SimpleNamespace(exchange=DnsNameStub(value='.')),
SimpleNamespace(exchange=DnsNameStub(value='valid.host')),
SimpleNamespace(exchange=DnsNameStub(value='valid2.host')),
]
result = _get_mx_records(domain='testdomain3')
self.assertListEqual(result, ['valid.host', 'valid2.host'])

View File

@ -0,0 +1,17 @@
from re import compile as re_compile
from re import IGNORECASE
HOST_REGEX = re_compile(
# max length for domain name labels is 63 characters per RFC 1034
r'((?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+)'
r'(?:[A-Z0-9-]{2,63}(?<!-))\Z', IGNORECASE)
EMAIL_EXTRACT_HOST_REGEX = re_compile(r'(?<=@)\[?([^\[\]]+)')
LITERAL_REGEX = re_compile(
# literal form, ipv4 or ipv6 address (SMTP 4.1.3)
r'\[([A-f0-9:\.]+)\]\Z', IGNORECASE)
USER_REGEX = re_compile(
# dot-atom
r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*\Z"
# quoted-string
r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013'
r'\014\016-\177])*"\Z)', IGNORECASE)

View File

@ -4,14 +4,14 @@ from socket import error as SocketError
from socket import gethostname
from typing import Optional
from dns.resolver import NXDOMAIN, NoAnswer, query
DOMAIN_REGEX = re_compile(r'(?<=@)\[?([^\[\]]+)')
from dns.rdtypes.ANY.MX import MX
from dns.resolver import NXDOMAIN, NoAnswer, query, Answer
from .constants import EMAIL_EXTRACT_HOST_REGEX, HOST_REGEX
def _get_domain_from_email_address(email_address):
try:
return DOMAIN_REGEX.search(string=email_address)[1]
return EMAIL_EXTRACT_HOST_REGEX.search(string=email_address)[1]
except TypeError:
raise ValueError('Invalid email address')
except IndexError:
@ -19,13 +19,21 @@ def _get_domain_from_email_address(email_address):
def _get_mx_records(domain: str) -> list:
'Return a list of hostnames in the MX record.'
try:
records = query(domain, 'MX')
records = query(domain, 'MX') # type: Answer
except NXDOMAIN:
raise ValueError(f'Domain {domain} does not seem to exist')
except NoAnswer:
raise ValueError(f'Domain {domain} does not have an MX record')
return [str(x.exchange) for x in records]
to_check = dict()
for record in records: # type: MX
dns_str = record.exchange.to_text() # type: str
to_check[dns_str] = dns_str[:-1] if dns_str.endswith('.') else dns_str
result = [k for k, v in to_check.items() if HOST_REGEX.search(string=v)]
if not len(result):
raise ValueError(f'Domain {domain} does not have a valid MX record')
return result
def _check_mx_records(

View File

@ -1,9 +1,9 @@
from ipaddress import IPv4Address, IPv6Address
from os.path import dirname, join
from re import IGNORECASE
from re import compile as re_compile
from typing import Optional
from .constants import HOST_REGEX, LITERAL_REGEX, USER_REGEX
SetOrNone = Optional[set]
@ -34,19 +34,6 @@ def _validate_ipv46_address(value: str) -> bool:
class EmailValidator(object):
'Slightly adjusted email regex checker from the Django project.'
user_regex = re_compile(
# dot-atom
r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*\Z"
# quoted-string
r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013'
r'\014\016-\177])*"\Z)', IGNORECASE)
domain_regex = re_compile(
# max length for domain name labels is 63 characters per RFC 1034
r'((?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+)'
r'(?:[A-Z0-9-]{2,63}(?<!-))\Z', IGNORECASE)
literal_regex = re_compile(
# literal form, ipv4 or ipv6 address (SMTP 4.1.3)
r'\[([A-f0-9:\.]+)\]\Z', IGNORECASE)
domain_whitelist = frozenset('localhost')
domain_blacklist = frozenset()
@ -75,7 +62,7 @@ class EmailValidator(object):
user_part, domain_part = value.rsplit('@', 1)
if not self.user_regex.match(user_part):
if not USER_REGEX.match(user_part):
return False
if domain_part in self.domain_whitelist:
@ -96,10 +83,10 @@ class EmailValidator(object):
return True
def validate_domain_part(self, domain_part):
if self.domain_regex.match(domain_part):
if HOST_REGEX.match(domain_part):
return True
literal_match = self.literal_regex.match(domain_part)
literal_match = LITERAL_REGEX.match(domain_part)
if literal_match:
ip_address = literal_match.group(1)
return _validate_ipv46_address(ip_address)