148 lines
5.3 KiB
Python
148 lines
5.3 KiB
Python
# RFC 2822 - style email validation for Python
|
|
# (c) 2012 Syrus Akbary <me@syrusakbary.com>
|
|
# Extended from (c) 2011 Noel Bush <noel@aitools.org>
|
|
# for support of mx and user check
|
|
# This code is made available to you under the GNU LGPL v3.
|
|
#
|
|
# This module provides a single method, valid_email_address(),
|
|
# which returns True or False to indicate whether a given address
|
|
# is valid according to the 'addr-spec' part of the specification
|
|
# given in RFC 2822. Ideally, we would like to find this
|
|
# in some other library, already thoroughly tested and well-
|
|
# maintained. The standard Python library email.utils
|
|
# contains a parse_addr() function, but it is not sufficient
|
|
# to detect many malformed addresses.
|
|
#
|
|
# This implementation aims to be faithful to the RFC, with the
|
|
# exception of a circular definition (see comments below), and
|
|
# with the omission of the pattern components marked as "obsolete".
|
|
|
|
import smtplib
|
|
import logging
|
|
import socket
|
|
|
|
class ServerError(Exception):
|
|
pass
|
|
|
|
|
|
|
|
MX_DNS_CACHE = {}
|
|
MX_CHECK_CACHE = {}
|
|
|
|
|
|
def get_mx_ip(hostname):
|
|
if hostname not in MX_DNS_CACHE:
|
|
try:
|
|
MX_DNS_CACHE[hostname] = DNS.mxlookup(hostname)
|
|
except ServerError as e:
|
|
if e.rcode == 3 or e.rcode == 2: # NXDOMAIN (Non-Existent Domain) or SERVFAIL
|
|
MX_DNS_CACHE[hostname] = None
|
|
else:
|
|
raise
|
|
|
|
return MX_DNS_CACHE[hostname]
|
|
|
|
|
|
def validate_email(email, check_mx=False, verify=False, debug=False, smtp_timeout=10):
|
|
"""Indicate whether the given string is a valid email address
|
|
according to the 'addr-spec' portion of RFC 2822 (see section
|
|
3.4.1). Parts of the spec that are marked obsolete are *not*
|
|
included in this test, and certain arcane constructions that
|
|
depend on circular definitions in the spec may not pass, but in
|
|
general this should correctly identify any email address likely
|
|
to be in use as of 2011."""
|
|
if debug:
|
|
logger = logging.getLogger('validate_email')
|
|
logger.setLevel(logging.DEBUG)
|
|
else:
|
|
logger = None
|
|
|
|
if not email_has_valid_structure(email_address):
|
|
return False
|
|
|
|
check_mx |= verify
|
|
if check_mx:
|
|
if not DNS:
|
|
raise Exception('For check the mx records or check if the email exists you must '
|
|
'have installed pyDNS python package')
|
|
hostname = email[email.find('@') + 1:]
|
|
mx_hosts = get_mx_ip(hostname)
|
|
if mx_hosts is None:
|
|
return False
|
|
for mx in mx_hosts:
|
|
try:
|
|
if not verify and mx[1] in MX_CHECK_CACHE:
|
|
return MX_CHECK_CACHE[mx[1]]
|
|
smtp = smtplib.SMTP(timeout=smtp_timeout)
|
|
smtp.connect(mx[1])
|
|
MX_CHECK_CACHE[mx[1]] = True
|
|
if not verify:
|
|
try:
|
|
smtp.quit()
|
|
except smtplib.SMTPServerDisconnected:
|
|
pass
|
|
return True
|
|
status, _ = smtp.helo()
|
|
if status != 250:
|
|
smtp.quit()
|
|
if debug:
|
|
logger.debug(u'%s answer: %s - %s', mx[1], status, _)
|
|
continue
|
|
smtp.mail('')
|
|
status, _ = smtp.rcpt(email)
|
|
if status == 250:
|
|
smtp.quit()
|
|
return True
|
|
if debug:
|
|
logger.debug(u'%s answer: %s - %s', mx[1], status, _)
|
|
smtp.quit()
|
|
except smtplib.SMTPServerDisconnected: # Server not permits verify user
|
|
if debug:
|
|
logger.debug(u'%s disconected.', mx[1])
|
|
except smtplib.SMTPConnectError:
|
|
if debug:
|
|
logger.debug(u'Unable to connect to %s.', mx[1])
|
|
return None
|
|
except AssertionError:
|
|
return False
|
|
except (ServerError, socket.error) as e:
|
|
if debug:
|
|
logger.debug('ServerError or socket.error exception raised (%s).', e)
|
|
return None
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
import time
|
|
while True:
|
|
email = raw_input('Enter email for validation: ')
|
|
|
|
mx = raw_input('Validate MX record? [yN] ')
|
|
if mx.strip().lower() == 'y':
|
|
mx = True
|
|
else:
|
|
mx = False
|
|
|
|
validate = raw_input('Try to contact server for address validation? [yN] ')
|
|
if validate.strip().lower() == 'y':
|
|
validate = True
|
|
else:
|
|
validate = False
|
|
|
|
logging.basicConfig()
|
|
|
|
result = validate_email(email, mx, validate, debug=True, smtp_timeout=1)
|
|
if result:
|
|
print("Valid!")
|
|
elif result is None:
|
|
print("I'm not sure.")
|
|
else:
|
|
print("Invalid!")
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
# import sys
|
|
|
|
# sys.modules[__name__],sys.modules['validate_email_module'] = validate_email,sys.modules[__name__]
|
|
# from validate_email_module import *
|