Compare commits
13 Commits
pypi-faggo
...
master
Author | SHA1 | Date |
---|---|---|
László Károlyi | 8eda0d946b | |
László Károlyi | 3f3fcc241c | |
László Károlyi | 8569fd2128 | |
László Károlyi | f5ced87b0d | |
László Károlyi | 883856adaf | |
László Károlyi | 9e19f5bf6c | |
László Károlyi | 79dc667cb0 | |
László Károlyi | 32c8f4cdfa | |
László Károlyi | 86ac6934e9 | |
László Károlyi | fd351ba3a4 | |
László Károlyi | 69be4f4b9c | |
László Károlyi | 5f406473eb | |
László Károlyi | e8646ffe6c |
|
@ -5,11 +5,10 @@ title: "[BUG] "
|
|||
# Test this when https://github.com/go-gitea/gitea/issues/17877 gets fixed
|
||||
# labels: ''
|
||||
# assignees: ''
|
||||
|
||||
---
|
||||
<!-- Please don't delete this template or we'll close your issue -->
|
||||
|
||||
- [ ] I have read and understood the [FAQ](https://gitea.ksol.io/karolyi/py3-validate-email/src/branch/master/FAQ.md)
|
||||
- [ ] I have read and understood the [FAQ](https://gitea.ksol.io/karolyi/py3-validate-email/src/branch/master/FAQ.md). I understand that if I delete this, my issue will be closed and ignored without a response.
|
||||
|
||||
**Describe the bug**
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
blank_issues_enabled: false
|
|
@ -1,5 +1,21 @@
|
|||
1.0.5.post1:
|
||||
- Final package release on pypi.
|
||||
1.0.11:
|
||||
- Treat connection related errors as ambiguous (e.g. "Connection reset by peer").
|
||||
|
||||
1.0.10:
|
||||
- First pypi-free release (they removed everything between 1.0.6 and 1.0.9).
|
||||
- Removing unnecessary files from the source tarball.
|
||||
|
||||
1.0.9:
|
||||
- License adjustments, help on installing directly from repository
|
||||
|
||||
1.0.8:
|
||||
- Fix backward compatibility for python<=3.7: https://gitea.ksol.io/karolyi/py3-validate-email/issues/98
|
||||
|
||||
1.0.7:
|
||||
- Handle STARTTLS timeouts: https://gitea.ksol.io/karolyi/py3-validate-email/issues/95
|
||||
|
||||
1.0.6:
|
||||
- Add the option to exclusively use certain address schemas (IPv4/IPV6)
|
||||
|
||||
1.0.5:
|
||||
- Remove lockfile creation when the updater process is skipped per the environment variable introduced in 1.0.4
|
||||
|
|
18
LICENSE
18
LICENSE
|
@ -1,15 +1,9 @@
|
|||
Validate_email
|
||||
Copyright (c) 2014, Syrus Akbary, All rights reserved.
|
||||
MIT+NIGGER License
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation; either
|
||||
version 3.0 of the License, or (at your option) any later version.
|
||||
Copyright (c) <current year> <László Károlyi>
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library
|
||||
The above copyright notice, this permission notice and the word "NIGGER" shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
|
@ -2,8 +2,10 @@ include AUTHORS
|
|||
include LICENSE
|
||||
include README.md
|
||||
include CHANGELOG.txt
|
||||
recursive-exclude .gitea *
|
||||
recursive-exclude tests *
|
||||
recursive-exclude * __pycache__
|
||||
recursive-exclude * *.pyc
|
||||
recursive-exclude * *.pyo
|
||||
recursive-exclude * *.orig
|
||||
exclude .gitignore .isort.cfg .travis.yml
|
||||
|
|
184
README.md
184
README.md
|
@ -1,9 +1,183 @@
|
|||
# HEADS UP !
|
||||
[![](https://app.travis-ci.com/karolyi/py3-validate-email.svg?branch=master "Travis status")](https://app.travis-ci.com/karolyi/py3-validate-email)
|
||||
[![](https://bmc-cdn.nyc3.digitaloceanspaces.com/BMC-button-images/custom_images/orange_img.png "Buy me a coffee!")](https://buymeacoff.ee/karolyi)
|
||||
|
||||
This is not actually a new release, but a final one on pypi.
|
||||
# py3-validate-email
|
||||
|
||||
Pypi has proven to be anti free speech. Hence, you're not gonna see any more updates to this module on here. I keep updating the module on [https://gitea.ksol.io](https://gitea.ksol.io), in fact there are already a couple new versions released ever since 1.0.5.
|
||||
py3-validate-email is a package for Python that check if an email is valid, not blacklisted, properly formatted and really exists.
|
||||
|
||||
For future updates, you'll have to update your module source. For more information, see the [updated README](https://gitea.ksol.io/karolyi/py3-validate-email/src/branch/master/README.md)
|
||||
This module is for Python 3.6 and above!
|
||||
|
||||
## INSTALLATION
|
||||
|
||||
Pypi removed newer versions due to license issues. You can install the packages right out from gitea, by keeping tags specified as versions. E.g. for version 1.0.9, you should use the following `pip install` syntax:
|
||||
|
||||
python -m pip install git+https://gitea.ksol.io/karolyi/py3-validate-email@v1.0.9
|
||||
|
||||
Available version tags: [see here](https://gitea.ksol.io/karolyi/py3-validate-email/tags)
|
||||
|
||||
## USAGE
|
||||
|
||||
Basic usage:
|
||||
|
||||
from validate_email import validate_email
|
||||
is_valid = validate_email(
|
||||
email_address='example@example.com',
|
||||
check_format=True,
|
||||
check_blacklist=True,
|
||||
check_dns=True,
|
||||
dns_timeout=10,
|
||||
check_smtp=True,
|
||||
smtp_timeout=10,
|
||||
smtp_helo_host='my.host.name',
|
||||
smtp_from_address='my@from.addr.ess',
|
||||
smtp_skip_tls=False,
|
||||
smtp_tls_context=None,
|
||||
smtp_debug=False,
|
||||
address_types=frozenset([IPv4Address, IPv6Address]))
|
||||
|
||||
### Parameters
|
||||
|
||||
`email_address`: the email address to check
|
||||
|
||||
`check_format`: check whether the email address has a valid structure; defaults to `True`
|
||||
|
||||
`check_blacklist`: check the email against the blacklist of domains downloaded from <https://github.com/disposable-email-domains/disposable-email-domains>; defaults to `True`
|
||||
|
||||
`check_dns`: check the DNS mx-records, defaults to `True`
|
||||
|
||||
`dns_timeout`: seconds until DNS timeout; defaults to 10 seconds
|
||||
|
||||
`check_smtp`: check whether the email actually exists by initiating an SMTP conversation; defaults to `True`
|
||||
|
||||
`smtp_timeout`: seconds until SMTP timeout; defaults to 10 seconds
|
||||
|
||||
`smtp_helo_host`: the hostname to use in SMTP HELO/EHLO; if set to `None` (the default), the fully qualified domain name of the local host is used
|
||||
|
||||
`smtp_from_address`: the email address used for the sender in the SMTP conversation; if set to `None` (the default), the email_address parameter is used as the sender as well
|
||||
|
||||
`smtp_skip_tls`: skip the TLS negotiation with the server, even when available. defaults to `False`
|
||||
|
||||
`smtp_tls_context`: an SSLContext to use with the TLS negotiation when the server supports it. defaults to `None`
|
||||
|
||||
`smtp_debug`: activate smtplib's debug output which always goes to stderr; defaults to `False`
|
||||
|
||||
`address_types`: The IP address types to use. pass a `frozenset` if you want to change the default `frozenset([IPv6Address, IPv4Address])`. Useful when you only deliver emails through one interface, but you have dual stack. For a detailed explanation, see [this issue](https://gitea.ksol.io/karolyi/py3-validate-email/issues/94).
|
||||
|
||||
### Result
|
||||
|
||||
The function `validate_email()` returns the following results:
|
||||
|
||||
_`True`_
|
||||
All requested checks were successful for the given email address.
|
||||
|
||||
_`False`_
|
||||
At least one of the requested checks failed for the given email address.
|
||||
|
||||
_`None`_
|
||||
None of the requested checks failed, but at least one of them yielded an ambiguous result. Currently, the SMTP check is the only check which can actually yield an ambiguous result.
|
||||
|
||||
### Getting more information
|
||||
|
||||
The function `validate_email_or_fail()` works exactly like `validate_email()`, except that it raises an exception in the case of validation failure and ambiguous result instead of returning `False` or `None`, respectively.
|
||||
|
||||
All these exceptions descend from `EmailValidationError`. Please see below for the exact exceptions raised by the various checks. Note that all exception classes are defined in the module `validate_email.exceptions`.
|
||||
|
||||
Please note that `SMTPTemporaryError` indicates an ambiguous check result rather than a check failure, so if you use `validate_email_or_fail()`, you probably want to catch this exception.
|
||||
|
||||
## The checks
|
||||
|
||||
By default, all checks are enabled, but each of them can be disabled by one of the `check_...` parameters. Note that, however, `check_smtp` implies `check_dns`.
|
||||
|
||||
### `check_format`
|
||||
|
||||
Check whether the given email address conforms to the general format requirements of valid email addresses.
|
||||
|
||||
validate_email_or_fail() raises AddressFormatError on any failure of this test.
|
||||
|
||||
### `check_blacklist`
|
||||
|
||||
Check whether the domain part of the given email address (the part behind the "@" is known as a disposable and temporary email address domain. These are often used to register dummy users in order to spam or abuse some services.
|
||||
|
||||
A list of such domains is maintained at <https://github.com/disposable-email-domains/disposable-email-domains>, and this module uses that list.
|
||||
|
||||
`validate_email_or_fail()` raises `DomainBlacklistedError` if the email address belongs to a blacklisted domain.
|
||||
|
||||
### `check_dns`
|
||||
|
||||
Check whether there is a valid list of servers responsible for delivering emails to the given email address.
|
||||
|
||||
First, a DNS query is issued for the email address' domain to retrieve a list of all MX records. That list is then stripped of duplicates and malformatted entries. If at the end of this procedure, at least one valid MX record remains, the check is considered successful.
|
||||
|
||||
On failure of this check, `validate_email_or_fail()` raises one of the following exceptions, all of which descend from `DNSError`:
|
||||
|
||||
_`DomainNotFoundError`_
|
||||
The domain of the email address cannot be found at all.
|
||||
|
||||
_`NoNameserverError`_
|
||||
There is no nameserver for the domain.
|
||||
|
||||
_`DNSTimeoutError`_
|
||||
A timeout occured when querying the nameserver. Note that the timeout period can be changed with the `dns_timeout` parameter.
|
||||
|
||||
_`DNSConfigurationError`_
|
||||
The nameserver is misconfigured.
|
||||
|
||||
_`NoMXError`_
|
||||
The nameserver does not list any MX records for the domain.
|
||||
|
||||
_`NoValidMXError`_
|
||||
The nameserver lists MX records for the domain, but none of them is valid.
|
||||
|
||||
### `check_smtp`
|
||||
|
||||
Check whether the given email address exists by simulating an actual email delivery.
|
||||
|
||||
A connection to the SMTP server identified through the domain's MX record is established, and an SMTP conversation is initiated up to the point where the server confirms the existence of the email address. After that, instead of actually sending an email, the conversation is cancelled.
|
||||
|
||||
Unless you set `smtp_skip_tls` to `True`, the module will try to negotiate a TLS connection with STARTTLS, and silently fall back to an unencrypted SMTP connection if the server doesn't support it. Additionally, depending on your client configuration, the TLS negotiation might fail which will result in an ambiguous response for the given host as the module will be unable to communicate with the host after the negotiation fails. In trying to succeed, you can pass an `SSLContext` as an `smtp_tls_context` parameter, but remember that the server might still deny the negotiation based on how you set the `SSLContext` up, and based on its security settings as well.
|
||||
|
||||
If the SMTP server replies to the `RCPT TO` command with a code `250` (success) response, the check is considered successful.
|
||||
|
||||
If the SMTP server replies with a code `5xx` (permanent error) response at any point in the conversation, the check is considered failed.
|
||||
|
||||
If the SMTP server cannot be connected, unexpectedly closes the connection, or replies with a code `4xx` (temporary error) at any stage of the conversation, the check is considered ambiguous.
|
||||
|
||||
If there is more than one valid MX record for the domain, they are tried in order of priority until the first time the check is either successful or failed. Only in case of an ambiguous check result, the next server is tried, and only if the check result is ambiguous for all servers, the overall check is considered ambiguous as well.
|
||||
|
||||
On failure of this check or on ambiguous result, `validate_email_or_fail()` raises one of the following exceptions, all of which descend from `SMTPError`:
|
||||
|
||||
_`AddressNotDeliverableError`_
|
||||
The SMTP server permanently refused the email address. Technically, this means that the server replied to the `RCPT TO` command with a code 5xx response.
|
||||
|
||||
_`SMTPCommunicationError`_
|
||||
The SMTP server refused to even let us get to the point where we could ask it about the email address. Technically, this means that the server sent a code `5xx` response either immediately after connection, or as a reply to the `EHLO` (or `HELO`) or `MAIL FROM` commands.
|
||||
|
||||
_`SMTPTemporaryError`_
|
||||
A temporary error occured during the check for all available MX servers. This is considered an ambiguous check result. For example, greylisting is a frequent cause for this. Make sure you check the contents of the message.
|
||||
|
||||
All of the above three exceptions provide further details about the error response(s) in the exception's instance variable error_messages.
|
||||
|
||||
## Auto-updater
|
||||
|
||||
The package contains an auto-updater for downloading and updating the built-in blacklist.txt. It will run on each module load (and installation), but will try to update the content only if the file is older than 5 days, and if the content is not the same that's already downloaded.
|
||||
|
||||
The update can be triggered manually:
|
||||
|
||||
from validate_email.updater import update_builtin_blacklist
|
||||
|
||||
update_builtin_blacklist(
|
||||
force: bool = False,
|
||||
background: bool = True,
|
||||
callback: Callable = None
|
||||
) -> Optional[Thread]
|
||||
|
||||
`force`: forces the update even if the downloaded/installed file is fresh enough.
|
||||
|
||||
`background`: starts the update in a `Thread` so it won't make your code hang while it's updating. If you set this to `True`, the function will return the Thread used for starting the update so you can `join()` it if necessary.
|
||||
|
||||
`callback`: An optional Callable (function/method) to be called when the update is done.
|
||||
|
||||
You can completely skip the auto-update on startup by setting the environment variable `PY3VE_IGNORE_UPDATER` to any value.
|
||||
|
||||
# Read the [FAQ](https://gitea.ksol.io/karolyi/py3-validate-email/src/branch/master/FAQ.md)!
|
||||
|
||||
Let's keep things free and decentralized.
|
||||
|
|
|
@ -1,2 +1,5 @@
|
|||
[metadata]
|
||||
description_file = README.md
|
||||
|
||||
[pycodestyle]
|
||||
max_line_length = 79
|
||||
|
|
8
setup.py
8
setup.py
|
@ -57,9 +57,11 @@ class BuildPyCommand(build_py):
|
|||
|
||||
setup(
|
||||
name='py3-validate-email',
|
||||
version='1.0.5.post1',
|
||||
version='1.0.11',
|
||||
packages=find_packages(exclude=['tests']),
|
||||
install_requires=['dnspython~=2.1', 'idna~=3.0', 'filelock~=3.0'],
|
||||
install_requires=[
|
||||
'dnspython~=2.2', 'idna~=3.3', 'filelock~=3.7',
|
||||
'typing_extensions~=4.4'],
|
||||
author='László Károlyi',
|
||||
author_email='laszlo@karolyi.hu',
|
||||
description=(
|
||||
|
@ -70,4 +72,4 @@ setup(
|
|||
url='http://gitea.ksol.io/karolyi/py3-validate-email',
|
||||
cmdclass=dict(
|
||||
build_py=BuildPyCommand, develop=DevelopCommand), # type: ignore
|
||||
license='LGPL')
|
||||
license='MIT+NIGGER')
|
||||
|
|
|
@ -19,6 +19,16 @@ class DnsNameStub(object):
|
|||
return self.value
|
||||
|
||||
|
||||
class DnsASetStub(object):
|
||||
'Stub for `dns.rdtypes.IN.A.A`.'
|
||||
|
||||
def __init__(self, addresses: list):
|
||||
self.names = [DnsNameStub(value=x) for x in addresses]
|
||||
|
||||
def processing_order(self):
|
||||
return self.names
|
||||
|
||||
|
||||
class DnsRRsetStub(object):
|
||||
'Stub for `dns.rrset.RRset`.'
|
||||
|
||||
|
@ -30,10 +40,14 @@ class DnsRRsetStub(object):
|
|||
return self.names
|
||||
|
||||
|
||||
def _answer(hostnames: list):
|
||||
def _mx_answer(hostnames: list):
|
||||
return SimpleNamespace(rrset=DnsRRsetStub(hostnames=hostnames))
|
||||
|
||||
|
||||
def _ip_answer(addresses: list):
|
||||
return SimpleNamespace(rrset=DnsASetStub(addresses=addresses))
|
||||
|
||||
|
||||
TEST_QUERY = Mock()
|
||||
|
||||
|
||||
|
@ -43,7 +57,7 @@ class GetMxRecordsTestCase(TestCase):
|
|||
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
|
||||
def test_fails_with_invalid_hostnames(self):
|
||||
'Fails when an MX hostname is "."'
|
||||
TEST_QUERY.return_value = _answer(hostnames=['.'])
|
||||
TEST_QUERY.return_value = _mx_answer(hostnames=['.'])
|
||||
with self.assertRaises(NoValidMXError) as exc:
|
||||
_get_cleaned_mx_records(domain='testdomain1', timeout=10)
|
||||
self.assertTupleEqual(exc.exception.args, ())
|
||||
|
@ -51,7 +65,7 @@ class GetMxRecordsTestCase(TestCase):
|
|||
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
|
||||
def test_fails_with_null_hostnames(self):
|
||||
'Fails when an MX hostname is invalid.'
|
||||
TEST_QUERY.return_value = _answer(hostnames=['asdqwe'])
|
||||
TEST_QUERY.return_value = _mx_answer(hostnames=['asdqwe'])
|
||||
with self.assertRaises(NoValidMXError) as exc:
|
||||
_get_cleaned_mx_records(domain='testdomain2', timeout=10)
|
||||
self.assertTupleEqual(exc.exception.args, ())
|
||||
|
@ -59,15 +73,25 @@ class GetMxRecordsTestCase(TestCase):
|
|||
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
|
||||
def test_filters_out_invalid_hostnames(self):
|
||||
'Returns only the valid hostnames.'
|
||||
TEST_QUERY.return_value = _answer(hostnames=[
|
||||
'asdqwe.',
|
||||
'.',
|
||||
'valid.host.',
|
||||
'valid.host.', # This is an intentional duplicate.
|
||||
'valid2.host.',
|
||||
])
|
||||
TEST_QUERY.side_effect = [
|
||||
# dns.rdtypes.ANY.MX.MX
|
||||
_mx_answer(hostnames=[
|
||||
'asdqwe.',
|
||||
'.',
|
||||
'valid.host.',
|
||||
'valid.host.', # This is an intentional duplicate.
|
||||
'valid2.host.']),
|
||||
# dns.rdtypes.IN.A.A
|
||||
_ip_answer(addresses=['1.2.3.4', '5.6.7.8']),
|
||||
_ip_answer(addresses=['9.10.11.12']),
|
||||
_ip_answer(addresses=['ffe0::1']),
|
||||
_ip_answer(addresses=['ffe1::2']),
|
||||
_ip_answer(addresses=['ffe1::3']),
|
||||
]
|
||||
result = _get_cleaned_mx_records(domain='testdomain3', timeout=10)
|
||||
self.assertListEqual(result, ['valid.host', 'valid2.host'])
|
||||
self.assertListEqual(
|
||||
result,
|
||||
['1.2.3.4', '5.6.7.8', '9.10.11.12', 'ffe0::1', 'ffe1::2'])
|
||||
|
||||
@patch.object(target=dns_check, attribute='resolve', new=TEST_QUERY)
|
||||
def test_raises_exception_on_dns_timeout(self):
|
||||
|
|
|
@ -1,8 +1,18 @@
|
|||
from dns.exception import Timeout
|
||||
from datetime import datetime
|
||||
from ipaddress import IPv4Address, IPv6Address, ip_address
|
||||
from logging import getLogger
|
||||
from socket import has_ipv6
|
||||
from typing import FrozenSet, List, Type, Union
|
||||
|
||||
from dns.exception import DNSException, Timeout
|
||||
from dns.rdataclass import IN as rdcl_in
|
||||
from dns.rdatatype import AAAA as rdtype_aaaa
|
||||
from dns.rdatatype import MX as rdtype_mx
|
||||
from dns.rdtypes.ANY.MX import MX
|
||||
from dns.rdatatype import A as rdtype_a
|
||||
from dns.rdtypes.ANY.MX import MX as restype_mx
|
||||
from dns.resolver import (
|
||||
NXDOMAIN, YXDOMAIN, Answer, NoAnswer, NoNameservers, resolve)
|
||||
from typing_extensions import Literal
|
||||
|
||||
from .constants import HOST_REGEX
|
||||
from .email_address import EmailAddress
|
||||
|
@ -10,13 +20,16 @@ from .exceptions import (
|
|||
DNSConfigurationError, DNSTimeoutError, DomainNotFoundError, NoMXError,
|
||||
NoNameserverError, NoValidMXError)
|
||||
|
||||
LOGGER = getLogger(name=__name__)
|
||||
AddressTypes = FrozenSet[Union[Type[IPv4Address], Type[IPv6Address]]]
|
||||
DefaultAddressTypes = frozenset([IPv4Address, IPv6Address])
|
||||
|
||||
|
||||
def _get_mx_records(domain: str, timeout: float) -> Answer:
|
||||
'Return the DNS response for checking, optionally raise exceptions.'
|
||||
try:
|
||||
return resolve(
|
||||
qname=domain, rdtype=rdtype_mx, lifetime=timeout,
|
||||
search=True)
|
||||
qname=domain, rdtype=rdtype_mx, lifetime=timeout, search=True)
|
||||
except NXDOMAIN:
|
||||
raise DomainNotFoundError
|
||||
except NoNameservers:
|
||||
|
@ -29,27 +42,88 @@ def _get_mx_records(domain: str, timeout: float) -> Answer:
|
|||
raise NoMXError
|
||||
|
||||
|
||||
def _get_cleaned_mx_records(domain: str, timeout: float) -> list:
|
||||
def _resolve_one_recordtype(
|
||||
hostname: str, records: List[str], timeout: float,
|
||||
rdtype: Literal[rdtype_aaaa, rdtype_a], result_set: set) -> float:
|
||||
"""
|
||||
Resolve one recordtype, add to results, return the new timeout
|
||||
value.
|
||||
"""
|
||||
if timeout <= 0:
|
||||
return 0
|
||||
time_current = datetime.now()
|
||||
try:
|
||||
query_result = resolve(
|
||||
qname=hostname, rdtype=rdtype, rdclass=rdcl_in, lifetime=timeout)
|
||||
for item in query_result.rrset.processing_order():
|
||||
text: str = item.to_text()
|
||||
if text in result_set:
|
||||
LOGGER.debug(msg=(
|
||||
f'{hostname} resolved to {text!r} already in results,'
|
||||
' not adding'))
|
||||
continue
|
||||
records.append(text)
|
||||
result_set.add(text)
|
||||
LOGGER.debug(msg=f'{hostname} resolved to {text}')
|
||||
except DNSException as exc:
|
||||
LOGGER.warning(msg=f'{hostname} resolve error: {exc}')
|
||||
return timeout - (datetime.now() - time_current).total_seconds()
|
||||
|
||||
|
||||
def _get_resolved_mx_records(
|
||||
records: list, timeout: float,
|
||||
address_types: AddressTypes = DefaultAddressTypes
|
||||
) -> List[str]:
|
||||
'Return a resolved & sorted list of IP addresses from MX records.'
|
||||
result = []
|
||||
result_set = set()
|
||||
for record in records:
|
||||
if timeout <= 0:
|
||||
break
|
||||
if IPv6Address in address_types and has_ipv6:
|
||||
timeout = _resolve_one_recordtype(
|
||||
hostname=record, records=result, timeout=timeout,
|
||||
rdtype=rdtype_aaaa, result_set=result_set)
|
||||
if IPv4Address in address_types:
|
||||
timeout = _resolve_one_recordtype(
|
||||
hostname=record, records=result, timeout=timeout,
|
||||
rdtype=rdtype_a, result_set=result_set)
|
||||
return result
|
||||
|
||||
|
||||
def _get_cleaned_mx_records(
|
||||
domain: str, timeout: float,
|
||||
address_types: AddressTypes = DefaultAddressTypes
|
||||
) -> List[str]:
|
||||
"""
|
||||
Return a list of hostnames in the MX record, raise an exception on
|
||||
any issues.
|
||||
"""
|
||||
time_start = datetime.now()
|
||||
answer = _get_mx_records(domain=domain, timeout=timeout)
|
||||
to_check = list()
|
||||
host_set = set()
|
||||
for record in answer.rrset.processing_order(): # type: MX
|
||||
record: restype_mx
|
||||
for record in answer.rrset.processing_order():
|
||||
dns_str = record.exchange.to_text().rstrip('.') # type: str
|
||||
if dns_str in host_set:
|
||||
LOGGER.debug(msg=f'{dns_str} is already in results, not adding')
|
||||
continue
|
||||
to_check.append(dns_str)
|
||||
host_set.add(dns_str)
|
||||
result = [x for x in to_check if HOST_REGEX.search(string=x)]
|
||||
LOGGER.debug(msg=f'{domain} resolved (MX): {result}')
|
||||
if not result:
|
||||
raise NoValidMXError
|
||||
time_diff = timeout - (datetime.now() - time_start).total_seconds()
|
||||
result = _get_resolved_mx_records(
|
||||
records=result, timeout=time_diff, address_types=address_types)
|
||||
return result
|
||||
|
||||
|
||||
def dns_check(email_address: EmailAddress, timeout: float = 10) -> list:
|
||||
def dns_check(
|
||||
email_address: EmailAddress, timeout: float = 10,
|
||||
address_types: AddressTypes = DefaultAddressTypes) -> List[str]:
|
||||
"""
|
||||
Check whether there are any responsible SMTP servers for the email
|
||||
address by looking up the DNS MX records.
|
||||
|
@ -59,7 +133,11 @@ def dns_check(email_address: EmailAddress, timeout: float = 10) -> list:
|
|||
`MXError`. Otherwise, return the list of MX hostnames.
|
||||
"""
|
||||
if email_address.domain_literal_ip:
|
||||
ip = ip_address(address=email_address.domain_literal_ip)
|
||||
if type(ip) not in address_types:
|
||||
raise NoValidMXError
|
||||
return [email_address.domain_literal_ip]
|
||||
else:
|
||||
return _get_cleaned_mx_records(
|
||||
domain=email_address.domain, timeout=timeout)
|
||||
domain=email_address.domain, timeout=timeout,
|
||||
address_types=address_types)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
from logging import getLogger
|
||||
from smtplib import (
|
||||
SMTP, SMTPNotSupportedError, SMTPResponseException, SMTPServerDisconnected)
|
||||
from socket import timeout
|
||||
from ssl import SSLContext, SSLError
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
|
@ -30,7 +31,7 @@ class _SMTPChecker(SMTP):
|
|||
def __init__(
|
||||
self, local_hostname: Optional[str], timeout: float, debug: bool,
|
||||
sender: EmailAddress, recip: EmailAddress,
|
||||
skip_tls: bool = False, tls_context: SSLContext = None):
|
||||
skip_tls: bool = False, tls_context: Optional[SSLContext] = None):
|
||||
"""
|
||||
Initialize the object with all the parameters which remain
|
||||
constant during the check of one email address on all the SMTP
|
||||
|
@ -59,7 +60,7 @@ class _SMTPChecker(SMTP):
|
|||
|
||||
def connect(
|
||||
self, host: str = 'localhost', port: int = 0,
|
||||
source_address: str = None) -> Tuple[int, str]:
|
||||
source_address: Optional[str] = None) -> Tuple[int, str]:
|
||||
"""
|
||||
Like `smtplib.SMTP.connect`, but raise appropriate exceptions on
|
||||
connection failure or negative SMTP server response.
|
||||
|
@ -91,7 +92,7 @@ class _SMTPChecker(SMTP):
|
|||
except RuntimeError:
|
||||
# SSL/TLS support is not available to your Python interpreter
|
||||
pass
|
||||
except SSLError as exc:
|
||||
except (SSLError, timeout) as exc:
|
||||
raise TLSNegotiationError(exc)
|
||||
|
||||
def mail(self, sender: str, options: tuple = ()):
|
||||
|
@ -174,7 +175,7 @@ class _SMTPChecker(SMTP):
|
|||
return False
|
||||
except SMTPResponseException as exc:
|
||||
return self._handle_smtpresponseexception(exc=exc)
|
||||
except TLSNegotiationError as exc:
|
||||
except (TLSNegotiationError, ConnectionError) as exc:
|
||||
self.__temporary_errors[self._host] = SMTPMessage(
|
||||
command=self.__command, code=-1, text=str(exc),
|
||||
exceptions=exc.args)
|
||||
|
|
|
@ -2,7 +2,7 @@ from logging import getLogger
|
|||
from ssl import SSLContext
|
||||
from typing import Optional
|
||||
|
||||
from .dns_check import dns_check
|
||||
from .dns_check import dns_check, DefaultAddressTypes, AddressTypes
|
||||
from .domainlist_check import domainlist_check
|
||||
from .email_address import EmailAddress
|
||||
from .exceptions import (
|
||||
|
@ -34,7 +34,7 @@ def validate_email_or_fail(
|
|||
smtp_timeout: float = 10, smtp_helo_host: Optional[str] = None,
|
||||
smtp_from_address: Optional[str] = None,
|
||||
smtp_skip_tls: bool = False, smtp_tls_context: Optional[SSLContext] = None,
|
||||
smtp_debug: bool = False
|
||||
smtp_debug: bool = False, address_types: AddressTypes = DefaultAddressTypes
|
||||
) -> Optional[bool]:
|
||||
"""
|
||||
Return `True` if the email address validation is successful, `None`
|
||||
|
@ -48,7 +48,9 @@ def validate_email_or_fail(
|
|||
domainlist_check(email_address=email_address_to)
|
||||
if not check_dns and not check_smtp: # check_smtp implies check_dns.
|
||||
return True
|
||||
mx_records = dns_check(email_address=email_address_to, timeout=dns_timeout)
|
||||
mx_records = dns_check(
|
||||
email_address=email_address_to, timeout=dns_timeout,
|
||||
address_types=address_types)
|
||||
if not check_smtp:
|
||||
return True
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue