parent
7e8b04275d
commit
a17fd45826
|
@ -3,6 +3,11 @@
|
|||
(base class validate_email.exceptions.EmailValidationError) when the
|
||||
passed email check fails, while logging a warning with the validation
|
||||
result.
|
||||
- The blacklist updater can now use a separate thread and writable temp
|
||||
paths to download and store its data, while logs about the update
|
||||
process on DEBUG.
|
||||
- Exposed a `validate_email.updater.update_builtin_blacklist` to update
|
||||
the built-in blacklists while running.
|
||||
|
||||
0.2.0:
|
||||
- Added automatic auto-updater for updating built-in blacklists.
|
||||
|
|
13
README.rst
13
README.rst
|
@ -45,7 +45,18 @@ The function :code:`validate_email_or_fail()` works exactly like :code:`validate
|
|||
|
||||
Auto-updater
|
||||
============================
|
||||
The package contains an auto-updater for downloading and updating the built-in blacklist.txt. It will run on each module load (and installation), but will try to update the content if the file is older than 5 days, and if the content is not the same that's already downloaded.
|
||||
The package contains an auto-updater for downloading and updating the built-in blacklist.txt. It will run on each module load (and installation), but will try to update the content if the file is older than 5 days, and if the content is not the same that's already downloaded. The update can be triggered manually:
|
||||
|
||||
from validate_email.updater import update_builtin_blacklist
|
||||
|
||||
update_builtin_blacklist(force: bool = False, background: bool = True,
|
||||
callback: MethodType = None) -> Optional[Thread]
|
||||
|
||||
:code:`force` forces the update even if the downloaded/installed file is fresh enough.
|
||||
|
||||
:code:`background` starts the update in a Thread so it won't make your code hang while it's updating. If you set this to true, the function will return the Thread used for starting the update so you can `join()` it if necessary.
|
||||
|
||||
:code:`callback` An optional function to be called when the update is done.
|
||||
|
||||
TODOs and BUGS
|
||||
============================
|
||||
|
|
10
setup.py
10
setup.py
|
@ -9,8 +9,13 @@ class PostInstallCommand(install):
|
|||
def run(self):
|
||||
if self.dry_run:
|
||||
return super().run()
|
||||
# The updater will walk code stack frames and see if this
|
||||
# variable is set in locals() to determine if it is run from the
|
||||
# setup, in which case it won't autoupdate.
|
||||
_IS_VALIDATEEMAIL_SETUP = True
|
||||
from validate_email.updater import BlacklistUpdater
|
||||
blacklist_updater = BlacklistUpdater()
|
||||
blacklist_updater._is_install_time = _IS_VALIDATEEMAIL_SETUP
|
||||
blacklist_updater.process(force=True)
|
||||
super().run()
|
||||
|
||||
|
@ -21,8 +26,13 @@ class PostDevelopCommand(develop):
|
|||
def run(self):
|
||||
if self.dry_run:
|
||||
return super().run()
|
||||
# The updater will walk code stack frames and see if this
|
||||
# variable is set in locals() to determine if it is run from the
|
||||
# setup, in which case it won't autoupdate.
|
||||
_IS_VALIDATEEMAIL_SETUP = True
|
||||
from validate_email.updater import BlacklistUpdater
|
||||
blacklist_updater = BlacklistUpdater()
|
||||
blacklist_updater._is_install_time = _IS_VALIDATEEMAIL_SETUP
|
||||
blacklist_updater.process(force=True)
|
||||
super().run()
|
||||
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
from unittest.case import TestCase
|
||||
|
||||
from validate_email import validate_email, validate_email_or_fail
|
||||
from validate_email.domainlist_check import BlacklistUpdater, domainlist_check
|
||||
from validate_email.domainlist_check import (
|
||||
domainlist_check, update_builtin_blacklist)
|
||||
from validate_email.exceptions import DomainBlacklistedError
|
||||
from validate_email.validate_email import (
|
||||
validate_email, validate_email_or_fail)
|
||||
|
||||
|
||||
class BlacklistCheckTestCase(TestCase):
|
||||
'Testing if the included blacklist filtering works.'
|
||||
|
||||
def setUpClass():
|
||||
blacklist_updater = BlacklistUpdater()
|
||||
blacklist_updater.process()
|
||||
update_builtin_blacklist(force=False, background=False)
|
||||
|
||||
def test_blacklist_positive(self):
|
||||
'Disallows blacklist item: mailinator.com.'
|
||||
domainlist_check._load_builtin_blacklist()
|
||||
with self.assertRaises(DomainBlacklistedError):
|
||||
domainlist_check(user_part='pa2', domain_part='mailinator.com')
|
||||
with self.assertRaises(DomainBlacklistedError):
|
||||
|
|
|
@ -1 +1 @@
|
|||
from .validate_email import validate_email, validate_email_or_fail # noqa
|
||||
from .validate_email import validate_email, validate_email_or_fail # NOQA
|
||||
|
|
|
@ -1,19 +1,20 @@
|
|||
from logging import getLogger
|
||||
from typing import Optional
|
||||
|
||||
from .exceptions import DomainBlacklistedError
|
||||
from .updater import BLACKLIST_FILE_PATH, BlacklistUpdater
|
||||
from .updater import (
|
||||
BLACKLIST_FILEPATH_INSTALLED, BLACKLIST_FILEPATH_TMP,
|
||||
update_builtin_blacklist)
|
||||
|
||||
SetOrNone = Optional[set]
|
||||
|
||||
# Start an optional update on module load
|
||||
blacklist_updater = BlacklistUpdater()
|
||||
blacklist_updater.process(force=False)
|
||||
LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
class DomainListValidator(object):
|
||||
'Check the provided email against domain lists.'
|
||||
domain_whitelist = set()
|
||||
domain_blacklist = set('localhost')
|
||||
_is_builtin_bl_used: bool = False
|
||||
|
||||
def __init__(
|
||||
self, whitelist: SetOrNone = None, blacklist: SetOrNone = None):
|
||||
|
@ -22,16 +23,33 @@ class DomainListValidator(object):
|
|||
if blacklist:
|
||||
self.domain_blacklist = set(x.lower() for x in blacklist)
|
||||
else:
|
||||
self._load_builtin_blacklist()
|
||||
self._is_builtin_bl_used = True
|
||||
self.reload_builtin_blacklist()
|
||||
|
||||
def _load_builtin_blacklist(self):
|
||||
'Load our built-in blacklist.'
|
||||
@property
|
||||
def _blacklist_path(self) -> str:
|
||||
'Return the path of the `blacklist.txt` that should be loaded.'
|
||||
try:
|
||||
with open(BLACKLIST_FILE_PATH) as fd:
|
||||
# Zero size, file is touched to indicate the
|
||||
# preinstalled file is still fresh enough
|
||||
return BLACKLIST_FILEPATH_INSTALLED \
|
||||
if BLACKLIST_FILEPATH_TMP.stat().st_size == 0 \
|
||||
else BLACKLIST_FILEPATH_TMP
|
||||
except FileNotFoundError:
|
||||
return BLACKLIST_FILEPATH_INSTALLED
|
||||
|
||||
def reload_builtin_blacklist(self):
|
||||
'(Re)load our built-in blacklist.'
|
||||
if not self._is_builtin_bl_used:
|
||||
return
|
||||
bl_path = self._blacklist_path
|
||||
LOGGER.debug(msg=f'(Re)loading blacklist: {bl_path}')
|
||||
try:
|
||||
with open(bl_path) as fd:
|
||||
lines = fd.readlines()
|
||||
except FileNotFoundError:
|
||||
return
|
||||
self.domain_blacklist.update(
|
||||
self.domain_blacklist = set(
|
||||
x.strip().lower() for x in lines if x.strip())
|
||||
|
||||
def __call__(self, user_part: str, domain_part: str) -> bool:
|
||||
|
@ -44,3 +62,7 @@ class DomainListValidator(object):
|
|||
|
||||
|
||||
domainlist_check = DomainListValidator()
|
||||
# Start an optional update on module load
|
||||
update_builtin_blacklist(
|
||||
force=False, background=True,
|
||||
callback=domainlist_check.reload_builtin_blacklist)
|
||||
|
|
|
@ -1,65 +1,93 @@
|
|||
from http.client import HTTPResponse
|
||||
from os import makedirs
|
||||
from logging import getLogger
|
||||
from pathlib import Path
|
||||
from tempfile import gettempdir, gettempprefix
|
||||
from threading import Thread
|
||||
from time import time
|
||||
from types import MethodType
|
||||
from typing import Optional
|
||||
from urllib.error import HTTPError
|
||||
from urllib.request import Request, urlopen
|
||||
from tempfile import gettempdir, gettempprefix
|
||||
|
||||
from filelock import FileLock
|
||||
|
||||
TMP_PATH = Path(gettempdir())
|
||||
from .utils import is_setuptime
|
||||
|
||||
LOGGER = getLogger(__name__)
|
||||
TMP_PATH = Path(gettempdir()).joinpath(f'{gettempprefix()}-py3-validate-email')
|
||||
TMP_PATH.mkdir(exist_ok=True)
|
||||
BLACKLIST_URL = (
|
||||
'https://raw.githubusercontent.com/martenson/disposable-email-domains/'
|
||||
'master/disposable_email_blocklist.conf')
|
||||
LIB_PATH_DEFAULT = Path(__file__).resolve().parent.joinpath('data')
|
||||
BLACKLIST_FILEPATH_INSTALLED = LIB_PATH_DEFAULT.joinpath('blacklist.txt')
|
||||
BLACKLIST_FILEPATH_TEMPORARY = TMP_PATH.joinpath(
|
||||
f'{gettempprefix()}-py3-validateemail-blacklist.txt')
|
||||
LOCK_PATH = TMP_PATH.joinpath(
|
||||
f'{gettempprefix()}-py3-validateemail-blacklistupdater.lock')
|
||||
BLACKLIST_FILEPATH_TMP = TMP_PATH.joinpath('blacklist.txt')
|
||||
ETAG_FILEPATH_INSTALLED = LIB_PATH_DEFAULT.joinpath('blacklist.etag.txt')
|
||||
ETAG_FILEPATH_TMP = TMP_PATH.joinpath('blacklist.etag.txt')
|
||||
LOCK_PATH = TMP_PATH.joinpath('blacklistupdater.lock')
|
||||
|
||||
|
||||
class BlacklistUpdater(object):
|
||||
'Optionally auto-update the built-in `blacklist.txt`.'
|
||||
"""
|
||||
Optionally auto-update the built-in `blacklist.txt`, while using
|
||||
a temporary place to put the newly downloaded one to avoid read-only
|
||||
filesystem errors. If the installed `blacklist.txt` is fresh enough
|
||||
don't look for newer versions.
|
||||
"""
|
||||
|
||||
_etag_file_path = LIB_PATH_DEFAULT.joinpath('blacklist_etag.txt')
|
||||
_lock_file_path = LIB_PATH_DEFAULT.joinpath('blacklist_lock')
|
||||
_refresh_when_older_than = 5 * 24 * 60 * 60 # 5 days
|
||||
_refresh_when_older_than: int = 5 * 24 * 60 * 60 # 5 days
|
||||
_on_update_callback: MethodType = None
|
||||
_is_install_time: bool = False
|
||||
|
||||
def __init__(self, lib_path: str = LIB_PATH_DEFAULT):
|
||||
makedirs(name=lib_path, exist_ok=True)
|
||||
self._lock_file_path.touch(exist_ok=True)
|
||||
@property
|
||||
def _etag_filepath(self) -> str:
|
||||
'Return the ETag file path to use.'
|
||||
return ETAG_FILEPATH_INSTALLED \
|
||||
if self._is_install_time else ETAG_FILEPATH_TMP
|
||||
|
||||
@property
|
||||
def _blacklist_filepath(self) -> str:
|
||||
'Return the blacklist file path to use.'
|
||||
return BLACKLIST_FILEPATH_INSTALLED \
|
||||
if self._is_install_time else BLACKLIST_FILEPATH_TMP
|
||||
|
||||
def _read_etag(self) -> Optional[str]:
|
||||
'Read the etag header from the stored etag file when exists.'
|
||||
try:
|
||||
with open(self._etag_file_path) as fd:
|
||||
return fd.read().strip()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
for path in [ETAG_FILEPATH_TMP, ETAG_FILEPATH_INSTALLED]:
|
||||
try:
|
||||
with open(path) as fd:
|
||||
return fd.read().strip()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def _write_etag(self, content: str):
|
||||
'Write the etag of the newly received file to the cache.'
|
||||
with open(self._etag_file_path, 'w') as fd:
|
||||
path = self._etag_filepath
|
||||
LOGGER.debug(msg=f'Storing ETag response into {path}.')
|
||||
with open(path, 'w') as fd:
|
||||
fd.write(content)
|
||||
|
||||
@property
|
||||
def is_local_old(self) -> bool:
|
||||
def _is_old(self) -> bool:
|
||||
'Return `True` if the locally stored file is old.'
|
||||
if not BLACKLIST_FILEPATH_TEMPORARY.exists():
|
||||
return True
|
||||
true_when_older_than = time() - self._refresh_when_older_than
|
||||
try:
|
||||
ctime = BLACKLIST_FILEPATH_TMP.stat().st_ctime
|
||||
if ctime >= true_when_older_than:
|
||||
# Downloaded tmp file is still fresh enough
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
try:
|
||||
ctime = BLACKLIST_FILEPATH_INSTALLED.stat().st_ctime
|
||||
return ctime < time() - self._refresh_when_older_than
|
||||
except FileNotFoundError:
|
||||
return True
|
||||
return ctime < true_when_older_than
|
||||
|
||||
def _get_headers(self, force_update: bool = False) -> dict:
|
||||
'Compile a header with etag if available.'
|
||||
headers = dict()
|
||||
if force_update:
|
||||
if force_update or self._is_install_time:
|
||||
return headers
|
||||
etag = self._read_etag()
|
||||
if not etag:
|
||||
|
@ -71,13 +99,17 @@ class BlacklistUpdater(object):
|
|||
'Write new data file on its arrival.'
|
||||
if 'ETag' in response.headers:
|
||||
self._write_etag(response.headers.get('ETag'))
|
||||
with open(BLACKLIST_FILEPATH_TEMPORARY, 'wb') as fd:
|
||||
path = self._blacklist_filepath
|
||||
LOGGER.debug(msg=f'Writing response into {path}')
|
||||
with open(path, 'wb') as fd:
|
||||
fd.write(response.fp.read())
|
||||
|
||||
def _process(self, force: bool = False):
|
||||
'Start optionally updating the blacklist.txt file, while locked.'
|
||||
if not force and not self.is_local_old:
|
||||
if not force and not self._is_old:
|
||||
LOGGER.debug(msg='Not updating because file is fresh enough.')
|
||||
return
|
||||
LOGGER.debug(msg=f'Checking {BLACKLIST_URL}')
|
||||
request = Request(
|
||||
url=BLACKLIST_URL, headers=self._get_headers(force_update=force))
|
||||
try:
|
||||
|
@ -86,12 +118,37 @@ class BlacklistUpdater(object):
|
|||
self._write_new_file(response=response)
|
||||
except HTTPError as exc:
|
||||
if exc.code == 304:
|
||||
# Not modified, update date on the etag file
|
||||
BLACKLIST_FILE_PATH.touch()
|
||||
# Not modified, update date on the tmp file
|
||||
LOGGER.debug(msg=f'Local file is fresh enough (same ETag).')
|
||||
BLACKLIST_FILEPATH_TMP.touch()
|
||||
return
|
||||
if type(self._on_update_callback) is MethodType:
|
||||
self._on_update_callback()
|
||||
|
||||
def process(self, force: bool = False):
|
||||
def process(
|
||||
self, force: bool = False, callback: Optional[MethodType] = None):
|
||||
'Start optionally updating the blacklist.txt file.'
|
||||
# Locking for avoiding multi-process update on multi-process
|
||||
# startup
|
||||
# Locking to avoid multi-process update on multi-process startup
|
||||
self._on_update_callback = callback
|
||||
with FileLock(lock_file=LOCK_PATH):
|
||||
self._process(force=force)
|
||||
|
||||
|
||||
def update_builtin_blacklist(
|
||||
force: bool = False, background: bool = True,
|
||||
callback: MethodType = None) -> Optional[Thread]:
|
||||
"""
|
||||
Update and reload the built-in blacklist. Return the `Thread` used
|
||||
to do the background update, so it can be `join()`-ed.
|
||||
"""
|
||||
if is_setuptime():
|
||||
return
|
||||
LOGGER.info(msg='Starting optional update of built-in blacklist.')
|
||||
blacklist_updater = BlacklistUpdater()
|
||||
kwargs = dict(force=force, callback=callback)
|
||||
if not background:
|
||||
blacklist_updater.process(**kwargs)
|
||||
return
|
||||
bl_thread = Thread(target=blacklist_updater.process, kwargs=kwargs)
|
||||
bl_thread.start()
|
||||
return bl_thread
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
try:
|
||||
from sys import _getframe
|
||||
getframe = _getframe
|
||||
except ImportError:
|
||||
getframe = None
|
||||
|
||||
from traceback import walk_stack
|
||||
|
||||
|
||||
def is_setuptime() -> bool:
|
||||
'Return `True` if called from setup.'
|
||||
if getframe is None:
|
||||
# This is not CPython, can't know if this is setup time
|
||||
return False
|
||||
for frame, lineno in walk_stack(f=getframe()):
|
||||
# @See setup.py
|
||||
if frame.f_locals and \
|
||||
frame.f_locals.get('_IS_VALIDATEEMAIL_SETUP') is True:
|
||||
return True
|
||||
return False
|
|
@ -6,7 +6,7 @@ from .exceptions import AddressFormatError, EmailValidationError
|
|||
from .mx_check import mx_check
|
||||
from .regex_check import regex_check
|
||||
|
||||
logger = getLogger(name='validate_email')
|
||||
LOGGER = getLogger(name=__name__)
|
||||
|
||||
|
||||
def validate_email_or_fail(
|
||||
|
@ -45,5 +45,5 @@ def validate_email(email_address: str, *args, **kwargs):
|
|||
return validate_email_or_fail(email_address, *args, **kwargs)
|
||||
except EmailValidationError as error:
|
||||
message = f'Validation for {email_address!r} failed: {error}'
|
||||
logger.warning(msg=message)
|
||||
LOGGER.warning(msg=message)
|
||||
return False
|
||||
|
|
Loading…
Reference in New Issue