Compare commits

...

13 Commits

12 changed files with 176 additions and 46 deletions

View File

@ -5,11 +5,10 @@ title: "[BUG] "
# Test this when https://github.com/go-gitea/gitea/issues/17877 gets fixed
# labels: ''
# assignees: ''
---
<!-- Please don't delete this template or we'll close your issue -->
- [ ] I have read and understood the [FAQ](https://gitea.ksol.io/karolyi/py3-validate-email/src/branch/master/FAQ.md)
- [ ] I have read and understood the [FAQ](https://gitea.ksol.io/karolyi/py3-validate-email/src/branch/master/FAQ.md). I understand that if I delete this, my issue will be closed and ignored without a response.
**Describe the bug**

View File

@ -0,0 +1 @@
blank_issues_enabled: false

View File

@ -1,3 +1,22 @@
1.0.11:
- Treat connection related errors as ambiguous (e.g. "Connection reset by peer").
1.0.10:
- First pypi-free release (they removed everything between 1.0.6 and 1.0.9).
- Removing unnecessary files from the source tarball.
1.0.9:
- License adjustments, help on installing directly from repository
1.0.8:
- Fix backward compatibility for python<=3.7: https://gitea.ksol.io/karolyi/py3-validate-email/issues/98
1.0.7:
- Handle STARTTLS timeouts: https://gitea.ksol.io/karolyi/py3-validate-email/issues/95
1.0.6:
- Add the option to exclusively use certain address schemas (IPv4/IPV6)
1.0.5:
- Remove lockfile creation when the updater process is skipped per the environment variable introduced in 1.0.4

18
LICENSE
View File

@ -1,15 +1,9 @@
Validate_email
Copyright (c) 2014, Syrus Akbary, All rights reserved.
MIT+NIGGER License
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 3.0 of the License, or (at your option) any later version.
Copyright (c) <current year> <László Károlyi>
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
You should have received a copy of the GNU Lesser General Public
License along with this library
The above copyright notice, this permission notice and the word "NIGGER" shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -2,8 +2,10 @@ include AUTHORS
include LICENSE
include README.md
include CHANGELOG.txt
recursive-exclude .gitea *
recursive-exclude tests *
recursive-exclude * __pycache__
recursive-exclude * *.pyc
recursive-exclude * *.pyo
recursive-exclude * *.orig
exclude .gitignore .isort.cfg .travis.yml

View File

@ -9,9 +9,11 @@ This module is for Python 3.6 and above!
## INSTALLATION
You can install the package with pip:
Pypi removed newer versions due to license issues. You can install the packages right out from gitea, by keeping tags specified as versions. E.g. for version 1.0.9, you should use the following `pip install` syntax:
python -m pip install py3-validate-email
python -m pip install git+https://gitea.ksol.io/karolyi/py3-validate-email@v1.0.9
Available version tags: [see here](https://gitea.ksol.io/karolyi/py3-validate-email/tags)
## USAGE
@ -30,7 +32,8 @@ Basic usage:
smtp_from_address='my@from.addr.ess',
smtp_skip_tls=False,
smtp_tls_context=None,
smtp_debug=False)
smtp_debug=False,
address_types=frozenset([IPv4Address, IPv6Address]))
### Parameters
@ -58,6 +61,8 @@ Basic usage:
`smtp_debug`: activate smtplib's debug output which always goes to stderr; defaults to `False`
`address_types`: The IP address types to use. pass a `frozenset` if you want to change the default `frozenset([IPv6Address, IPv4Address])`. Useful when you only deliver emails through one interface, but you have dual stack. For a detailed explanation, see [this issue](https://gitea.ksol.io/karolyi/py3-validate-email/issues/94).
### Result
The function `validate_email()` returns the following results:

View File

@ -1,2 +1,5 @@
[metadata]
description_file = README.md
[pycodestyle]
max_line_length = 79

View File

@ -57,9 +57,11 @@ class BuildPyCommand(build_py):
setup(
name='py3-validate-email',
version='1.0.5',
version='1.0.11',
packages=find_packages(exclude=['tests']),
install_requires=['dnspython~=2.1', 'idna~=3.0', 'filelock~=3.0'],
install_requires=[
'dnspython~=2.2', 'idna~=3.3', 'filelock~=3.7',
'typing_extensions~=4.4'],
author='László Károlyi',
author_email='laszlo@karolyi.hu',
description=(
@ -70,4 +72,4 @@ setup(
url='http://gitea.ksol.io/karolyi/py3-validate-email',
cmdclass=dict(
build_py=BuildPyCommand, develop=DevelopCommand), # type: ignore
license='LGPL')
license='MIT+NIGGER')

View File

@ -19,6 +19,16 @@ class DnsNameStub(object):
return self.value
class DnsASetStub(object):
'Stub for `dns.rdtypes.IN.A.A`.'
def __init__(self, addresses: list):
self.names = [DnsNameStub(value=x) for x in addresses]
def processing_order(self):
return self.names
class DnsRRsetStub(object):
'Stub for `dns.rrset.RRset`.'
@ -30,10 +40,14 @@ class DnsRRsetStub(object):
return self.names
def _answer(hostnames: list):
def _mx_answer(hostnames: list):
return SimpleNamespace(rrset=DnsRRsetStub(hostnames=hostnames))
def _ip_answer(addresses: list):
return SimpleNamespace(rrset=DnsASetStub(addresses=addresses))
TEST_QUERY = Mock()
@ -43,7 +57,7 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_invalid_hostnames(self):
'Fails when an MX hostname is "."'
TEST_QUERY.return_value = _answer(hostnames=['.'])
TEST_QUERY.return_value = _mx_answer(hostnames=['.'])
with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain1', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@ -51,7 +65,7 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_fails_with_null_hostnames(self):
'Fails when an MX hostname is invalid.'
TEST_QUERY.return_value = _answer(hostnames=['asdqwe'])
TEST_QUERY.return_value = _mx_answer(hostnames=['asdqwe'])
with self.assertRaises(NoValidMXError) as exc:
_get_cleaned_mx_records(domain='testdomain2', timeout=10)
self.assertTupleEqual(exc.exception.args, ())
@ -59,15 +73,25 @@ class GetMxRecordsTestCase(TestCase):
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_filters_out_invalid_hostnames(self):
'Returns only the valid hostnames.'
TEST_QUERY.return_value = _answer(hostnames=[
'asdqwe.',
'.',
'valid.host.',
'valid.host.', # This is an intentional duplicate.
'valid2.host.',
])
TEST_QUERY.side_effect = [
# dns.rdtypes.ANY.MX.MX
_mx_answer(hostnames=[
'asdqwe.',
'.',
'valid.host.',
'valid.host.', # This is an intentional duplicate.
'valid2.host.']),
# dns.rdtypes.IN.A.A
_ip_answer(addresses=['1.2.3.4', '5.6.7.8']),
_ip_answer(addresses=['9.10.11.12']),
_ip_answer(addresses=['ffe0::1']),
_ip_answer(addresses=['ffe1::2']),
_ip_answer(addresses=['ffe1::3']),
]
result = _get_cleaned_mx_records(domain='testdomain3', timeout=10)
self.assertListEqual(result, ['valid.host', 'valid2.host'])
self.assertListEqual(
result,
['1.2.3.4', '5.6.7.8', '9.10.11.12', 'ffe0::1', 'ffe1::2'])
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
def test_raises_exception_on_dns_timeout(self):

View File

@ -1,8 +1,18 @@
from dns.exception import Timeout
from datetime import datetime
from ipaddress import IPv4Address, IPv6Address, ip_address
from logging import getLogger
from socket import has_ipv6
from typing import FrozenSet, List, Type, Union
from dns.exception import DNSException, Timeout
from dns.rdataclass import IN as rdcl_in
from dns.rdatatype import AAAA as rdtype_aaaa
from dns.rdatatype import MX as rdtype_mx
from dns.rdtypes.ANY.MX import MX
from dns.rdatatype import A as rdtype_a
from dns.rdtypes.ANY.MX import MX as restype_mx
from dns.resolver import (
NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, resolve)
from typing_extensions import Literal
from .constants import HOST_REGEX
from .email_address import EmailAddress
@ -10,13 +20,16 @@ from .exceptions import (
DNSConfigurationError, DNSTimeoutError, DomainNotFoundError, NoMXError,
NoNameserverError, NoValidMXError)
LOGGER = getLogger(name=__name__)
AddressTypes = FrozenSet[Union[Type[IPv4Address], Type[IPv6Address]]]
DefaultAddressTypes = frozenset([IPv4Address, IPv6Address])
def _get_mx_records(domain: str, timeout: float) -> Answer:
'Return the DNS response for checking, optionally raise exceptions.'
try:
return resolve(
qname=domain, rdtype=rdtype_mx, lifetime=timeout,
search=True)
qname=domain, rdtype=rdtype_mx, lifetime=timeout, search=True)
except NXDOMAIN:
raise DomainNotFoundError
except NoNameservers:
@ -29,27 +42,88 @@ def _get_mx_records(domain: str, timeout: float) -> Answer:
raise NoMXError
def _get_cleaned_mx_records(domain: str, timeout: float) -> list:
def _resolve_one_recordtype(
hostname: str, records: List[str], timeout: float,
rdtype: Literal[rdtype_aaaa, rdtype_a], result_set: set) -> float:
"""
Resolve one recordtype, add to results, return the new timeout
value.
"""
if timeout <= 0:
return 0
time_current = datetime.now()
try:
query_result = resolve(
qname=hostname, rdtype=rdtype, rdclass=rdcl_in, lifetime=timeout)
for item in query_result.rrset.processing_order():
text: str = item.to_text()
if text in result_set:
LOGGER.debug(msg=(
f'{hostname} resolved to {text!r} already in results,'
' not adding'))
continue
records.append(text)
result_set.add(text)
LOGGER.debug(msg=f'{hostname} resolved to {text}')
except DNSException as exc:
LOGGER.warning(msg=f'{hostname} resolve error: {exc}')
return timeout - (datetime.now() - time_current).total_seconds()
def _get_resolved_mx_records(
records: list, timeout: float,
address_types: AddressTypes = DefaultAddressTypes
) -> List[str]:
'Return a resolved & sorted list of IP addresses from MX records.'
result = []
result_set = set()
for record in records:
if timeout <= 0:
break
if IPv6Address in address_types and has_ipv6:
timeout = _resolve_one_recordtype(
hostname=record, records=result, timeout=timeout,
rdtype=rdtype_aaaa, result_set=result_set)
if IPv4Address in address_types:
timeout = _resolve_one_recordtype(
hostname=record, records=result, timeout=timeout,
rdtype=rdtype_a, result_set=result_set)
return result
def _get_cleaned_mx_records(
domain: str, timeout: float,
address_types: AddressTypes = DefaultAddressTypes
) -> List[str]:
"""
Return a list of hostnames in the MX record, raise an exception on
any issues.
"""
time_start = datetime.now()
answer = _get_mx_records(domain=domain, timeout=timeout)
to_check = list()
host_set = set()
for record in answer.rrset.processing_order(): # type: MX
record: restype_mx
for record in answer.rrset.processing_order():
dns_str = record.exchange.to_text().rstrip('.') # type: str
if dns_str in host_set:
LOGGER.debug(msg=f'{dns_str} is already in results, not adding')
continue
to_check.append(dns_str)
host_set.add(dns_str)
result = [x for x in to_check if HOST_REGEX.search(string=x)]
LOGGER.debug(msg=f'{domain} resolved (MX): {result}')
if not result:
raise NoValidMXError
time_diff = timeout - (datetime.now() - time_start).total_seconds()
result = _get_resolved_mx_records(
records=result, timeout=time_diff, address_types=address_types)
return result
def dns_check(email_address: EmailAddress, timeout: float = 10) -> list:
def dns_check(
email_address: EmailAddress, timeout: float = 10,
address_types: AddressTypes = DefaultAddressTypes) -> List[str]:
"""
Check whether there are any responsible SMTP servers for the email
address by looking up the DNS MX records.
@ -59,7 +133,11 @@ def dns_check(email_address: EmailAddress, timeout: float = 10) -> list:
`MXError`. Otherwise, return the list of MX hostnames.
"""
if email_address.domain_literal_ip:
ip = ip_address(address=email_address.domain_literal_ip)
if type(ip) not in address_types:
raise NoValidMXError
return [email_address.domain_literal_ip]
else:
return _get_cleaned_mx_records(
domain=email_address.domain, timeout=timeout)
domain=email_address.domain, timeout=timeout,
address_types=address_types)

View File

@ -1,6 +1,7 @@
from logging import getLogger
from smtplib import (
SMTP, SMTPNotSupportedError, SMTPResponseException, SMTPServerDisconnected)
from socket import timeout
from ssl import SSLContext, SSLError
from typing import List, Optional, Tuple
@ -30,7 +31,7 @@ class _SMTPChecker(SMTP):
def __init__(
self, local_hostname: Optional[str], timeout: float, debug: bool,
sender: EmailAddress, recip: EmailAddress,
skip_tls: bool = False, tls_context: SSLContext = None):
skip_tls: bool = False, tls_context: Optional[SSLContext] = None):
"""
Initialize the object with all the parameters which remain
constant during the check of one email address on all the SMTP
@ -59,7 +60,7 @@ class _SMTPChecker(SMTP):
def connect(
self, host: str = 'localhost', port: int = 0,
source_address: str = None) -> Tuple[int, str]:
source_address: Optional[str] = None) -> Tuple[int, str]:
"""
Like `smtplib.SMTP.connect`, but raise appropriate exceptions on
connection failure or negative SMTP server response.
@ -91,7 +92,7 @@ class _SMTPChecker(SMTP):
except RuntimeError:
# SSL/TLS support is not available to your Python interpreter
pass
except SSLError as exc:
except (SSLError, timeout) as exc:
raise TLSNegotiationError(exc)
def mail(self, sender: str, options: tuple = ()):
@ -174,7 +175,7 @@ class _SMTPChecker(SMTP):
return False
except SMTPResponseException as exc:
return self._handle_smtpresponseexception(exc=exc)
except TLSNegotiationError as exc:
except (TLSNegotiationError, ConnectionError) as exc:
self.__temporary_errors[self._host] = SMTPMessage(
command=self.__command, code=-1, text=str(exc),
exceptions=exc.args)

View File

@ -2,7 +2,7 @@ from logging import getLogger
from ssl import SSLContext
from typing import Optional
from .dns_check import dns_check
from .dns_check import dns_check, DefaultAddressTypes, AddressTypes
from .domainlist_check import domainlist_check
from .email_address import EmailAddress
from .exceptions import (
@ -34,7 +34,7 @@ def validate_email_or_fail(
smtp_timeout: float = 10, smtp_helo_host: Optional[str] = None,
smtp_from_address: Optional[str] = None,
smtp_skip_tls: bool = False, smtp_tls_context: Optional[SSLContext] = None,
smtp_debug: bool = False
smtp_debug: bool = False, address_types: AddressTypes = DefaultAddressTypes
) -> Optional[bool]:
"""
Return `True` if the email address validation is successful, `None`
@ -48,7 +48,9 @@ def validate_email_or_fail(
domainlist_check(email_address=email_address_to)
if not check_dns and not check_smtp: # check_smtp implies check_dns.
return True
mx_records = dns_check(email_address=email_address_to, timeout=dns_timeout)
mx_records = dns_check(
email_address=email_address_to, timeout=dns_timeout,
address_types=address_types)
if not check_smtp:
return True
try: