diff --git a/.gitignore b/.gitignore index f8b73e7..5adf2aa 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,6 @@ dmypy.json # Cython debug symbols cython_debug/ +icygov.yaml +pyrightconfig.json +data/ diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..73017e5 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,3 @@ +[settings] +multi_line_output=4 +src_paths=backend diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config.py b/config.py new file mode 100644 index 0000000..8f6a5ba --- /dev/null +++ b/config.py @@ -0,0 +1,20 @@ +from os import environ +from pathlib import Path + +from yaml import load + +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + +_CONFPATH = environ.get('ICYGOV_CONFIG') +if not _CONFPATH: + raise FileNotFoundError('ICYGOV_CONFIG must be an environment variable!') + +_PATH_CONF = Path(_CONFPATH) +if not _PATH_CONF.exists(): + raise FileNotFoundError(_CONFPATH) + +with _PATH_CONF.open() as fd: + CONFIG = load(stream=fd, Loader=Loader) diff --git a/icygov-sample.yaml b/icygov-sample.yaml new file mode 100644 index 0000000..dcf015a --- /dev/null +++ b/icygov-sample.yaml @@ -0,0 +1 @@ +path_datadir: /etc/icygov/data diff --git a/ip_collector/__init__.py b/ip_collector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ip_collector/dl_amazon.py b/ip_collector/dl_amazon.py new file mode 100644 index 0000000..a99c8b8 --- /dev/null +++ b/ip_collector/dl_amazon.py @@ -0,0 +1,89 @@ +__doc__ = 'Downloader for amazon IP ranges' + +from functools import lru_cache +from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network +from json import loads +from pathlib import Path +from tempfile import mkstemp +from typing import Optional +from urllib.request import urlopen + +from ..config import CONFIG +from .typing import Ip4DictType, Ip6DictType + +# https://docs.aws.amazon.com/general/latest/gr/aws-ip-ranges.html + +URL = 'https://ip-ranges.amazonaws.com/ip-ranges.json' +_SEP = '||' +_PATH_MY_DATA_V4 = Path(CONFIG['path_datadir'], 'amazon-ipv4.txt') +_PATH_MY_DATA_V6 = Path(CONFIG['path_datadir'], 'amazon-ipv6.txt') +_IP4_DICT: Ip4DictType = {} +_IP6_DICT: Ip6DictType = {} + +if not _PATH_MY_DATA_V4.is_file(): + _PATH_MY_DATA_V4.touch() +if not _PATH_MY_DATA_V6.is_file(): + _PATH_MY_DATA_V6.touch() + + +@lru_cache(maxsize=50) +def _amazon_match_v4(address: IPv4Address) -> Optional[str]: + 'Try to match an IPv4 address, return data if it matches.' + for network, description in _IP4_DICT.items(): + if address in network: + return f'{address} in {network}: {description}' + + +@lru_cache(maxsize=50) +def _amazon_match_v6(address: IPv6Address) -> Optional[str]: + 'Try to match an IPv6 address, return data if it matches.' + for network, description in _IP6_DICT.items(): + if address in network: + return f'{address} in {network}: {description}' + + +def _amazon_load(): + 'Load data from files' + global _IP4_DICT, _IP6_DICT + my_ip4dict: Ip4DictType = {} + my_ip6dict: Ip6DictType = {} + with _PATH_MY_DATA_V4.open('r') as fd: + for line in fd: + line = line.strip() + if not line: + continue + address, description = line.split(_SEP) + my_ip4dict[IPv4Network(address=address)] = description + with _PATH_MY_DATA_V6.open('r') as fd: + for line in fd: + line = line.strip() + if not line: + continue + address, description = line.split(_SEP) + my_ip6dict[IPv6Network(address=address)] = description + _IP4_DICT = my_ip4dict + _IP6_DICT = my_ip6dict + + +def _amazon_download(): + 'Download, parse and save files.' + with urlopen(url=URL) as fd: + data = loads(s=fd.read()) + my_tmp_fd_v4, my_tmp_path_v4 = mkstemp() + with open(my_tmp_fd_v4, 'w') as fd: + for ip4_items in data['prefixes']: + fd.write( + f'{ip4_items["ip_prefix"]}{_SEP}' + f'Amazon: {ip4_items["service"]} ' + f'{ip4_items["network_border_group"]}\n') + my_tmp_fd_v6, my_tmp_path_v6 = mkstemp() + Path(_PATH_MY_DATA_V4).write_text(Path(my_tmp_path_v4).read_text()) + Path(my_tmp_path_v4).unlink() + with open(my_tmp_fd_v4, 'w') as fd: + for ip6_items in data['ipv6_prefixes']: + fd.write( + f'{ip6_items["ipv6_prefix"]}{_SEP}' + f'Amazon: {ip6_items["service"]} ' + f'{ip6_items["network_border_group"]}\n') + Path(_PATH_MY_DATA_V6).write_text(Path(my_tmp_path_v6).read_text()) + Path(my_tmp_path_v6).unlink() diff --git a/ip_collector/reader.py b/ip_collector/reader.py new file mode 100644 index 0000000..335b7a0 --- /dev/null +++ b/ip_collector/reader.py @@ -0,0 +1,35 @@ +from functools import lru_cache +from ipaddress import IPv4Address, IPv6Address +from typing import Optional, Union + +from .dl_amazon import ( + _amazon_download, _amazon_load, _amazon_match_v4, _amazon_match_v6) + + +@lru_cache(maxsize=50) +def _match_v4(address: IPv4Address) -> Optional[str]: + return _amazon_match_v4(address=address) + + +@lru_cache(maxsize=50) +def _match_v6(address: IPv6Address) -> Optional[str]: + return _amazon_match_v6(address=address) + + +def match(address: Union[IPv4Address, IPv6Address]) -> Optional[str]: + if type(address) is IPv4Address: + return _match_v4(address=address) + return _match_v6(address=address) + + +def redownload_ranges(): + 'Redownload the ranges from all available providers.' + _amazon_download() + + +def load(): + 'Load values from file.' + _amazon_load() + + +load() diff --git a/ip_collector/typing.py b/ip_collector/typing.py new file mode 100644 index 0000000..88bf891 --- /dev/null +++ b/ip_collector/typing.py @@ -0,0 +1,5 @@ +from ipaddress import IPv4Network, IPv6Network +from typing import Dict + +Ip4DictType = Dict[IPv4Network, str] +Ip6DictType = Dict[IPv6Network, str] diff --git a/main.py b/main.py index 628da84..e163b2a 100755 --- a/main.py +++ b/main.py @@ -1,11 +1,50 @@ #!/usr/bin/env python3 -from time import sleep + +from ipaddress import IPv4Network, ip_address + +from flask.app import Flask +from flask.globals import request +from flask.helpers import make_response +from flask.wrappers import Response + +from .ip_collector.reader import redownload_ranges, match + +app = Flask(import_name="icy_governor") +BASE_PATH = '/icy-governor' -def main(): - while True: - sleep(10) +@app.route(rule=f'{BASE_PATH}/reload-ranges/', methods=['GET']) +def reload_ranges() -> Response: + redownload_ranges() + response = make_response('', 200) + return response + + +@app.route(rule=f'{BASE_PATH}/listener-add/', methods=['POST']) +def listener_add() -> Response: + app.logger.debug('Add data received: %s', request.form) + response = make_response('alrighty', 200) + ip = request.form.get('ip') + if not ip: + response.headers['Icecast-Auth-Message'] = 'IP not specified' + return response + ip_addr = ip_address(address=ip) + result = match(address=ip_addr) + app.logger.debug('IP is: %s, result: %r', ip_addr, result) + if result: + response.headers['Icecast-Auth-Message'] = result + else: + response.headers['Icecast-Auth-User'] = 1 + # response.headers['icecast-auth-timelimit'] = 10 + return response + + +@app.route(rule=f'{BASE_PATH}/listener-remove/', methods=['POST']) +def listener_remove() -> Response: + app.logger.debug('Remove data received: %s', request.form) + response = make_response('alrighty', 200) + return response if __name__ == '__main__': - main() + app.run()