Fix tests, prepare a new release

This commit is contained in:
László Károlyi 2021-02-11 17:49:29 +01:00
parent 81f5769ac8
commit 8439d94742
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
6 changed files with 49 additions and 39 deletions

View File

@ -1,3 +1,6 @@
0.2.15:
- Added a `skip_smtp` option to optionally skip the SMTP protocol check after the DNS level checks, by @SergeyKons
0.2.14:
- More improvements, courtesy of Reinhard Müller: https://github.com/karolyi/py3-validate-email/pull/48

View File

@ -43,7 +43,7 @@ Basic usage::
:code:`debug`: emit debug/warning messages while checking email
:code:`no_smtp`: not send SMTP HELO requests when checking for an email
:code:`skip_smtp`: (default :code:`False`) skip the SMTP conversation with the server, after MX checks. Will automatically be set to :code:`True` when :code:`check_mx` is :code:`False`!
The function :code:`validate_email_or_fail()` works exactly like :code:`validate_email`, except that it raises an exception in the case of validation failure instead of returning :code:`False`.

View File

@ -56,7 +56,7 @@ class BuildPyCommand(build_py):
setup(
name='py3-validate-email',
version='0.2.14',
version='0.2.15',
packages=find_packages(exclude=['tests']),
install_requires=['dnspython~=2.0', 'idna~=2.10', 'filelock~=3.0'],
author='László Károlyi',
@ -68,5 +68,4 @@ setup(
keywords='email validation verification mx verify',
url='http://github.com/karolyi/py3-validate-email',
cmdclass=dict(build_py=BuildPyCommand, develop=DevelopCommand),
license='LGPL',
)
license='LGPL')

View File

@ -7,7 +7,7 @@ from dns.exception import Timeout
from validate_email import mx_check as mx_module
from validate_email.email_address import EmailAddress
from validate_email.exceptions import DNSTimeoutError, NoValidMXError
from validate_email.mx_check import _get_mx_records, mx_check
from validate_email.mx_check import _get_cleaned_mx_records, mx_check
class DnsNameStub(object):
@ -26,49 +26,49 @@ TEST_QUERY = Mock()
class GetMxRecordsTestCase(TestCase):
'Testing `_get_mx_records`.'
@patch.object(target=mx_module, attribute='query', new=TEST_QUERY)
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
def test_fails_with_invalid_hostnames(self):
'Fails when an MX hostname is "."'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='.'))]
with self.assertRaises(NoValidMXError) as exc:
_get_mx_records(domain='testdomain1', timeout=10)
_get_cleaned_mx_records(domain='testdomain1', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@patch.object(target=mx_module, attribute='query', new=TEST_QUERY)
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
def test_fails_with_null_hostnames(self):
'Fails when an MX hostname is invalid.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe'))]
with self.assertRaises(NoValidMXError) as exc:
_get_mx_records(domain='testdomain2', timeout=10)
_get_cleaned_mx_records(domain='testdomain2', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@patch.object(target=mx_module, attribute='query', new=TEST_QUERY)
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
def test_filters_out_invalid_hostnames(self):
'Returns only the valid hostnames.'
TEST_QUERY.return_value = [
SimpleNamespace(exchange=DnsNameStub(value='asdqwe.')),
SimpleNamespace(exchange=DnsNameStub(value='.')),
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')),
# This is an intentional duplicate.
SimpleNamespace(exchange=DnsNameStub(value='valid.host.')),
SimpleNamespace(exchange=DnsNameStub(value='valid2.host.')),
]
result = _get_mx_records(domain='testdomain3', timeout=10)
result = _get_cleaned_mx_records(domain='testdomain3', timeout=10)
self.assertListEqual(result, ['valid.host', 'valid2.host'])
@patch.object(target=mx_module, attribute='query', new=TEST_QUERY)
@patch.object(target=mx_module, attribute='resolve', new=TEST_QUERY)
def test_raises_exception_on_dns_timeout(self):
'Raises exception on DNS timeout.'
TEST_QUERY.side_effect = Timeout()
with self.assertRaises(DNSTimeoutError) as exc:
_get_mx_records(domain='testdomain3', timeout=10)
_get_cleaned_mx_records(domain='testdomain3', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@patch.object(target=mx_module, attribute='_check_mx_records')
def test_no_smtp_argument(self, check_mx_records_mock):
'Check correct work of no_smtp argument.'
self.assertTrue(
mx_check(EmailAddress('test@mail.ru'), debug=False, no_smtp=True)
)
def test_skip_smtp_argument(self, check_mx_records_mock):
'Check correct work of `skip_smtp` argument.'
self.assertTrue(mx_check(
EmailAddress('test@mail.ru'), debug=False, skip_smtp=True))
self.assertEqual(check_mx_records_mock.call_count, 0)

View File

@ -8,7 +8,7 @@ from dns.exception import Timeout
from dns.rdatatype import MX as rdtype_mx
from dns.rdtypes.ANY.MX import MX
from dns.resolver import (
NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, query)
NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, resolve)
from .constants import HOST_REGEX
from .email_address import EmailAddress
@ -35,13 +35,11 @@ class _ProtocolError(Exception):
def _get_mx_records(domain: str, timeout: int) -> list:
"""
Return a list of hostnames in the MX record, raise an exception on
any issues.
"""
'Return the DNS response for checking, optionally raise exceptions.'
try:
records = query(
qname=domain, rdtype=rdtype_mx, lifetime=timeout) # type: Answer
return resolve(
qname=domain, rdtype=rdtype_mx, lifetime=timeout,
search=True) # type: Answer
except NXDOMAIN:
raise DomainNotFoundError
except NoNameservers:
@ -52,10 +50,22 @@ def _get_mx_records(domain: str, timeout: int) -> list:
raise DNSConfigurationError
except NoAnswer:
raise NoMXError
to_check = set()
def _get_cleaned_mx_records(domain: str, timeout: int) -> list:
"""
Return a list of hostnames in the MX record, raise an exception on
any issues.
"""
records = _get_mx_records(domain=domain, timeout=timeout)
to_check = list()
host_set = set()
for record in records: # type: MX
dns_str = record.exchange.to_text().rstrip('.') # type: str
to_check.add(dns_str)
if dns_str in host_set:
continue
to_check.append(dns_str)
host_set.add(dns_str)
result = [x for x in to_check if HOST_REGEX.search(string=x)]
if not result:
raise NoValidMXError
@ -91,9 +101,8 @@ def _smtp_mail(smtp: SMTP, from_address: EmailAddress):
def _smtp_converse(
mx_record: str, smtp_timeout: int, debug: bool, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress
):
mx_record: str, smtp_timeout: int, debug: bool, helo_host: str,
from_address: EmailAddress, email_address: EmailAddress):
"""
Do the `SMTP` conversation, handle errors in the caller.
@ -167,7 +176,7 @@ def mx_check(
email_address: EmailAddress, debug: bool,
from_address: Optional[EmailAddress] = None,
helo_host: Optional[str] = None, smtp_timeout: int = 10,
dns_timeout: int = 10, no_smtp: bool = False
dns_timeout: int = 10, skip_smtp: bool = False
) -> Optional[bool]:
"""
Return `True` if the host responds with a deliverable response code,
@ -179,9 +188,9 @@ def mx_check(
if email_address.domain_literal_ip:
mx_records = [email_address.domain_literal_ip]
else:
mx_records = _get_mx_records(
mx_records = _get_cleaned_mx_records(
domain=email_address.domain, timeout=dns_timeout)
if no_smtp:
if skip_smtp:
return True
return _check_mx_records(
mx_records=mx_records, smtp_timeout=smtp_timeout, helo_host=host,

View File

@ -15,8 +15,8 @@ def validate_email_or_fail(
email_address: str, check_regex: bool = True, check_mx: bool = True,
from_address: Optional[str] = None, helo_host: Optional[str] = None,
smtp_timeout: int = 10, dns_timeout: int = 10,
use_blacklist: bool = True, debug: bool = False, no_smtp: bool = False,
) -> Optional[bool]:
use_blacklist: bool = True, debug: bool = False,
skip_smtp: bool = False) -> Optional[bool]:
"""
Return `True` if the email address validation is successful, `None` if the
validation result is ambigious, and raise an exception if the validation
@ -38,11 +38,10 @@ def validate_email_or_fail(
return mx_check(
email_address=email_address, from_address=from_address,
helo_host=helo_host, smtp_timeout=smtp_timeout,
dns_timeout=dns_timeout, no_smtp=no_smtp, debug=debug)
dns_timeout=dns_timeout, skip_smtp=skip_smtp, debug=debug)
def validate_email(
email_address: str, *args, **kwargs):
def validate_email(email_address: str, *args, **kwargs):
"""
Return `True` or `False` depending if the email address exists
or/and can be delivered.