Refactoring to contain results in an object

This commit is contained in:
László Károlyi 2022-02-13 22:56:34 +01:00
parent 2794aa1d9a
commit 4baa684e1d
Signed by: karolyi
GPG Key ID: 2DCAF25E55735BFE
10 changed files with 145 additions and 105 deletions

View File

@ -31,7 +31,7 @@ def _amazon_match_v4(address: IPv4Address) -> Optional[str]:
'Try to match an IPv4 address, return data if it matches.'
for network, description in _IP4_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
return f'Amazon blacklist: {network} {description}'
@lru_cache(maxsize=50)
@ -39,7 +39,7 @@ def _amazon_match_v6(address: IPv6Address) -> Optional[str]:
'Try to match an IPv6 address, return data if it matches.'
for network, description in _IP6_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
return f'Amazon blacklist: {network} {description}'
def _amazon_load():

View File

@ -1,12 +1,8 @@
__doc__ = 'Blacklisted ASNs module'
from functools import lru_cache
from pathlib import Path
from typing import Optional, Set
from geoip2.database import Reader
from geoip2.errors import AddressNotFoundError
from geoip2.types import IPAddress
from geoip2.models import ASN
from yaml import load
try:
@ -26,34 +22,14 @@ if not _PATH_CONF.is_file():
_PATH_CONF.write_text(_DEFAULT_CFG)
@lru_cache(maxsize=50)
def _match_asn_bl(address: IPAddress) -> Optional[str]:
'Match ASN, return info if banned.'
try:
with Reader(fileish=CONFIG['geoip']['asn_db_path']) as reader:
response = reader.asn(ip_address=address)
except AddressNotFoundError:
return
asn = response.autonomous_system_number
if asn in _ASN_BL_SET:
return (
f'ASN BLACKLISTED: {address} is of ASN {asn} ({response.network}): '
f'{response.autonomous_system_organization}')
def is_asn_blacklisted(asn: ASN) -> bool:
'Return `True` when ASN is blacklisted `False` otherwise.'
return asn.autonomous_system_number in _ASN_BL_SET
@lru_cache(maxsize=50)
def _match_asn_wl(address: IPAddress) -> Optional[str]:
'Match ASN, return info if whitelisted.'
try:
with Reader(fileish=CONFIG['geoip']['asn_db_path']) as reader:
response = reader.asn(ip_address=address)
except AddressNotFoundError:
return
asn = response.autonomous_system_number
if asn in _ASN_WL_SET:
return (
f'ASN WHITELISTED: {address} is of ASN {asn} ({response.network}): '
f'{response.autonomous_system_organization}')
def is_asn_whitelisted(asn: ASN) -> bool:
'Return `True` when ASN is blacklisted `False` otherwise.'
return asn.autonomous_system_number in _ASN_WL_SET
def _reload_asns():
@ -63,5 +39,3 @@ def _reload_asns():
_ASN_CONFIG = load(stream=fd, Loader=Loader)
_ASN_BL_SET = set(_ASN_CONFIG['blacklist'])
_ASN_WL_SET = set(_ASN_CONFIG['whitelist'])
_match_asn_bl.cache_clear()
_match_asn_wl.cache_clear()

View File

@ -25,7 +25,7 @@ def _custom_blmatch_v4(address: IPv4Address) -> Optional[str]:
'Try to match an IPv4 address, return data if it matches.'
for network, description in _IP4_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
return f'Custom blacklist: {network} {description}'
@lru_cache(maxsize=50)
@ -33,7 +33,7 @@ def _custom_blmatch_v6(address: IPv6Address) -> Optional[str]:
'Try to match an IPv6 address, return data if it matches.'
for network, description in _IP6_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
return f'Custom blacklist: {network} {description}'
def _custom_load_bl():

View File

@ -25,7 +25,7 @@ def _custom_wlmatch_v4(address: IPv4Address) -> Optional[str]:
'Try to match an IPv4 address, return data if it matches.'
for network, description in _IP4_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
return f'Custom whitelist: {network} {description}'
@lru_cache(maxsize=50)
@ -33,9 +33,7 @@ def _custom_wlmatch_v6(address: IPv6Address) -> Optional[str]:
'Try to match an IPv6 address, return data if it matches.'
for network, description in _IP6_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
if address in network:
return f'{address} in {network}: {description}'
return f'Custom whitelist: {network} {description}'
def _custom_load_wl():

View File

@ -32,7 +32,7 @@ def _google_match_v4(address: IPv4Address) -> Optional[str]:
'Try to match an IPv4 address, return data if it matches.'
for network, description in _IP4_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
return f'Google blacklist: {network} {description}'
@lru_cache(maxsize=50)
@ -40,7 +40,7 @@ def _google_match_v6(address: IPv6Address) -> Optional[str]:
'Try to match an IPv6 address, return data if it matches.'
for network, description in _IP6_DICT.items():
if address in network:
return f'{address} in {network}: {description}'
return f'Google blacklist: {network} {description}'
def _google_load():

View File

@ -1,51 +1,8 @@
from functools import lru_cache
from ipaddress import IPv4Address, IPv6Address
from typing import Optional, Union
from .amazon import (
_amazon_download, _amazon_load, _amazon_match_v4, _amazon_match_v6)
from .asn_checker import _match_asn_bl, _match_asn_wl, _reload_asns
from .custom_blacklist import (
_custom_blmatch_v4, _custom_blmatch_v6, _custom_load_bl)
from .custom_whitelist import (
_custom_load_wl, _custom_wlmatch_v4, _custom_wlmatch_v6)
from .google import (
_google_download, _google_load, _google_match_v4, _google_match_v6)
_MATCH_FUNCS_BL_V4 = [
_match_asn_bl, _custom_blmatch_v4, _amazon_match_v4, _google_match_v4]
_MATCH_FUNCS_BL_V6 = [
_match_asn_bl, _custom_blmatch_v6, _amazon_match_v6, _google_match_v6]
_MATCH_FUNCS_WL_V4 = [_custom_wlmatch_v4, _match_asn_wl]
_MATCH_FUNCS_WL_V6 = [_custom_wlmatch_v6, _match_asn_wl]
@lru_cache(maxsize=50)
def _match_v4(address: IPv4Address) -> Optional[str]:
for func in _MATCH_FUNCS_WL_V4:
if func(address=address):
# Listener IP was whitelisted, return None
return
for func in _MATCH_FUNCS_BL_V4:
if result := func(address=address):
return result
@lru_cache(maxsize=50)
def _match_v6(address: IPv6Address) -> Optional[str]:
for func in _MATCH_FUNCS_WL_V6:
if func(address=address):
# Listener IP was whitelisted, return None
return
for func in _MATCH_FUNCS_BL_V6:
if result := func(address=address):
return result
def match(address: Union[IPv4Address, IPv6Address]) -> Optional[str]:
if type(address) is IPv4Address:
return _match_v4(address=address)
return _match_v6(address=address)
from .amazon import _amazon_download, _amazon_load
from .asn_checker import _reload_asns
from .custom_blacklist import _custom_load_bl
from .custom_whitelist import _custom_load_wl
from .google import _google_download, _google_load
def load():
@ -62,8 +19,6 @@ def redownload_ranges():
_amazon_download()
_google_download()
load()
_match_v4.cache_clear()
_match_v6.cache_clear()
load()

View File

@ -1,5 +1,6 @@
from ipaddress import IPv4Network, IPv6Network
from typing import Dict
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Dict, Union
Ip4DictType = Dict[IPv4Network, str]
Ip6DictType = Dict[IPv6Network, str]
IP46AddressType = Union[IPv4Address, IPv6Address]

View File

@ -1,15 +1,14 @@
#!/usr/bin/env python3
from ipaddress import ip_address
from flask.app import Flask
from flask.globals import request
from flask.helpers import make_response
from flask.wrappers import Response
from werkzeug.middleware.proxy_fix import ProxyFix
from .ip_collector.reader import match, redownload_ranges
from .ip_collector.reader import redownload_ranges
from .middleware import RequesterFilterMiddleWare
from .utils import IcyGovIpAddress
flask_app = Flask(import_name='icy_governor')
flask_app.wsgi_app = RequesterFilterMiddleWare( # type: ignore
@ -35,13 +34,12 @@ def listener_add() -> Response:
if not ip:
response.headers['Icecast-Auth-Message'] = 'IP not specified'
return response
ip_addr = ip_address(address=ip)
result = match(address=ip_addr)
flask_app.logger.debug('AUTH: %r', result)
if result:
response.headers['Icecast-Auth-Message'] = 'Stream not found'
else:
icygov_addr = IcyGovIpAddress(address=ip)
flask_app.logger.debug(icygov_addr)
if icygov_addr.can_listen:
response.headers['Icecast-Auth-User'] = 1
else:
response.headers['Icecast-Auth-Message'] = 'Stream not found'
# response.headers['Icecast-Auth-Timelimit'] = 10
return response

View File

@ -47,7 +47,6 @@ class RequesterFilterMiddleWare(object):
try:
if type(addr) is IPv4Address:
# https://stackoverflow.com/a/8534381/1067833
print(self._IP4_NETS)
next(filter(lambda net: addr in net, self._IP4_NETS))
elif type(addr) is IPv6Address:
next(filter(lambda net: addr in net, self._IP6_NETS))

115
src/utils.py Normal file
View File

@ -0,0 +1,115 @@
from enum import Enum
from functools import cached_property
from ipaddress import IPv4Address, ip_address
from typing import Optional
from geoip2.database import Reader
from geoip2.errors import AddressNotFoundError
from geoip2.models import ASN
from .config import CONFIG
from .ip_collector.amazon import _amazon_match_v4, _amazon_match_v6
from .ip_collector.asn_checker import is_asn_blacklisted, is_asn_whitelisted
from .ip_collector.custom_blacklist import (
_custom_blmatch_v4, _custom_blmatch_v6)
from .ip_collector.custom_whitelist import (
_custom_wlmatch_v4, _custom_wlmatch_v6)
from .ip_collector.google import _google_match_v4, _google_match_v6
_MATCH_FUNCS_BL_V4 = [_custom_blmatch_v4, _amazon_match_v4, _google_match_v4]
_MATCH_FUNCS_BL_V6 = [_custom_blmatch_v6, _amazon_match_v6, _google_match_v6]
_MATCH_FUNCS_WL_V4 = [_custom_wlmatch_v4]
_MATCH_FUNCS_WL_V6 = [_custom_wlmatch_v6]
class ListenerType(Enum):
UNKNOWN = 0
WHITELISTED = 1
BLACKLISTED = 2
class IcyGovIpAddress(object):
'Depicting IP addresses internally.'
status: ListenerType = ListenerType.UNKNOWN
_asn_description = 'ASN unknown'
_result_description = 'no result'
can_listen = False
def __init__(self, address: str):
self.address = ip_address(address=address)
self.process()
def _match_v4(self):
for func in _MATCH_FUNCS_WL_V4:
if result := func(address=self.address):
# Listener IP was whitelisted
return self.set_result(
status=ListenerType.WHITELISTED, description=result,
can_listen=True)
for func in _MATCH_FUNCS_BL_V4:
if result := func(address=self.address):
# Listener IP was blacklisted
return self.set_result(
status=ListenerType.BLACKLISTED, description=result,
can_listen=False)
self.set_result(
status=ListenerType.UNKNOWN, description='', can_listen=True)
def _match_v6(self):
for func in _MATCH_FUNCS_WL_V6:
if result := func(address=self.address):
# Listener IP was whitelisted, return None
return self.set_result(
status=ListenerType.WHITELISTED, description=result,
can_listen=True)
for func in _MATCH_FUNCS_BL_V6:
if result := func(address=self.address):
# Listener IP was blacklisted
return self.set_result(
status=ListenerType.BLACKLISTED, description=result,
can_listen=False)
self.set_result(
status=ListenerType.UNKNOWN, description='', can_listen=True)
def _match(self):
if type(self.address) is IPv4Address:
return self._match_v4()
self._match_v6()
@cached_property
def asn(self) -> Optional[ASN]:
'Do the DB lookup, return results.'
try:
with Reader(fileish=CONFIG['geoip']['asn_db_path']) as reader:
_asn = reader.asn(ip_address=self.address)
self._asn_description = (
f'AS{_asn.autonomous_system_number} ({_asn.network}): '
f'{_asn.autonomous_system_organization}')
return _asn
except AddressNotFoundError:
pass
def __str__(self) -> str:
return (
f'{self.address}: {self.status}, <{self._asn_description}>, '
f'<{self._result_description}>')
def set_result(
self, status: ListenerType, description: str, can_listen: bool):
'Set internal result status.'
self.status = status
self._result_description = description
self.can_listen = can_listen
def process(self):
if self.asn:
if is_asn_whitelisted(asn=self.asn):
return self.set_result(
status=ListenerType.WHITELISTED,
description='ASN whitelisted', can_listen=True)
if is_asn_blacklisted(asn=self.asn):
return self.set_result(
status=ListenerType.BLACKLISTED,
description='ASN blacklisted', can_listen=False)
self._match()