icy-governor/src/utils.py

226 lines
7.8 KiB
Python

from datetime import timedelta
from functools import cached_property
from ipaddress import IPv4Address, IPv6Address, ip_address
from logging import Logger
from typing import Optional
from flask.helpers import make_response
from flask.wrappers import Response
from geoip2.database import Reader
from geoip2.errors import AddressNotFoundError
from geoip2.models import ASN
from geoip2.types import IPAddress
from werkzeug.datastructures import ImmutableMultiDict
from .config import CONFIG
from .ip_collector.asn_checker import is_asn_blacklisted, is_asn_whitelisted
from .ip_collector.reader import (
ListingType, match_bl_v4, match_bl_v6, match_wl_v4, match_wl_v6)
from .ip_collector.useragent import useragent_match_bl, useragent_match_wl
class ListenerEvaluatorBase(object):
'Base for evaluation classes.'
_status: ListingType = ListingType.UNLISTED
_asn_description = '-'
_result_description = '-'
def __init__(self, form: ImmutableMultiDict, logger: Logger):
self._logger = logger
self._form = form
self.process()
@cached_property
def response(self) -> Response:
response = make_response('alrighty', 200)
return response
def set_result(self, status: ListingType, description: str):
'Set internal result status.'
self._status = status
self._result_description = description
@cached_property
def _address(self) -> Optional[IPAddress]:
'Evaluate IP address.'
ip = self._form.get('ip')
if not ip:
self.response.headers['Icecast-Auth-Message'] = \
'IP address not specified'
self._status = ListingType.BLACKLISTED
self.set_result(
status=ListingType.BLACKLISTED,
description=f'IP not passed: {self._form}')
return
return ip_address(address=ip)
@cached_property
def _client_id(self) -> str:
return self._form.get('client', '-')
@cached_property
def _mount_user_pass(self) -> str:
mount = self._form.get('mount')
user = self._form.get('user')
user = repr(user) if user else '-'
passwd = self._form.get('pass')
passwd = repr(passwd) if passwd else '-'
return f'{mount!r}|{user}|{passwd}'
@cached_property
def _city(self) -> str:
if not self._address:
return '-'
try:
with Reader(fileish=CONFIG['geoip']['city_db_path']) as reader:
res = reader.city(ip_address=self._address)
# from IPython import embed; embed()
county = res.subdivisions.most_specific.name or '-'
city = res.city.name or '-'
return f'{res.country.name}|{county}|{city}'
except AddressNotFoundError:
return '-'
@cached_property
def asn(self) -> Optional[ASN]:
'Do the DB lookup, return results.'
if not self._address:
return
try:
with Reader(fileish=CONFIG['geoip']['asn_db_path']) as reader:
asn = reader.asn(ip_address=self._address)
self._asn_description = (
f'AS{asn.autonomous_system_number} ({asn.network}): '
f'{asn.autonomous_system_organization}')
return asn
except AddressNotFoundError:
pass
@cached_property
def _user_agent(self) -> Optional[str]:
'Evaluate useragent.'
return self._form.get('agent', '-')
@cached_property
def _server_port(self) -> str:
server = self._form.get('server') or '-'
port = self._form.get('port') or '-'
return f'{server}:{port}'
def process(self):
'Placeholder method.'
class ListenerAddEvaluator(ListenerEvaluatorBase):
'Evaluating listener_add requests.'
@cached_property
def _referer(self) -> str:
if referer := self._form.get('ClientHeader.referer'):
return repr(referer)
return '-'
@cached_property
def _metadata(self) -> str:
if metadata := self._form.get('ClientHeader.icy-metadata'):
return f'Metadata: True({metadata})'
return 'Metadata: False'
def _match_wl(self) -> bool:
'Return `True` when whitelisted, `False` otherwise.'
# ASN lists
if self.asn and self.asn.autonomous_system_number:
if result := is_asn_whitelisted(
asn_id=self.asn.autonomous_system_number):
self.set_result(
status=ListingType.WHITELISTED, description=result)
return True
# Useragent whitelists
if self._user_agent:
if result := useragent_match_wl(user_agent=self._user_agent):
self.set_result(
status=ListingType.WHITELISTED, description=result)
return True
# Network whitelists
result = None
if type(self._address) is IPv4Address:
result = match_wl_v4(address=self._address)
elif type(self._address) is IPv6Address:
result = match_wl_v6(address=self._address)
if result:
self.set_result(status=ListingType.WHITELISTED, description=result)
return True
return False
def _match_bl(self) -> bool:
'Return `True` when blacklisted, `False` otherwise.'
# ASN lists
if self.asn and self.asn.autonomous_system_number:
if result := is_asn_blacklisted(
asn_id=self.asn.autonomous_system_number):
self.set_result(
status=ListingType.BLACKLISTED, description=result)
return True
# Useragent blacklists
if self._user_agent:
if result := useragent_match_bl(user_agent=self._user_agent):
self.set_result(
status=ListingType.BLACKLISTED, description=result)
return True
# Network blacklists
result = None
if type(self._address) is IPv4Address:
result = match_bl_v4(address=self._address)
elif type(self._address) is IPv6Address:
result = match_bl_v6(address=self._address)
if result:
self.set_result(status=ListingType.BLACKLISTED, description=result)
return True
return False
def __str__(self) -> str:
return (
f'ADD: <{self._client_id}|{self._address}> <{self._status}: '
f'{self._result_description}> <{self._city}> '
f'<{self._asn_description}> <{self._user_agent}> '
f'<{self._mount_user_pass}> <{self._metadata}> '
f'<{self._server_port}> <{self._referer}>')
def process(self):
'Start processing.'
super().process()
if not self._address:
self._logger.debug(msg=self)
return
self._match_wl() or self._match_bl() # type: ignore
if self._status == ListingType.BLACKLISTED:
self.response.headers['Icecast-Auth-Message'] = 'Stream not found'
else:
self.response.headers['Icecast-Auth-User'] = 1
self._logger.debug(msg=self)
class ListenerRemoveEvaluator(ListenerEvaluatorBase):
'Evaluating disconnecting listeners.'
@cached_property
def _duration(self) -> str:
seconds = int(self._form.get('duration') or 0)
return str(timedelta(seconds=seconds))
def __str__(self) -> str:
return (
f'REMOVE: <{self._client_id}|{self._address}> <{self._duration}> '
f'<{self._city}> <{self._asn_description}> <{self._user_agent}> '
f'<{self._mount_user_pass}> <{self._server_port}>')
def process(self):
'Start processing.'
super().process()
if not self._address:
self._logger.debug(msg=self)
return
self.asn # Do the ASN variables setting for logging
self._logger.debug(msg=self)