226 lines
7.8 KiB
Python
226 lines
7.8 KiB
Python
from datetime import timedelta
|
|
from functools import cached_property
|
|
from ipaddress import IPv4Address, IPv6Address, ip_address
|
|
from logging import Logger
|
|
from typing import Optional
|
|
|
|
from flask.helpers import make_response
|
|
from flask.wrappers import Response
|
|
from geoip2.database import Reader
|
|
from geoip2.errors import AddressNotFoundError
|
|
from geoip2.models import ASN
|
|
from geoip2.types import IPAddress
|
|
from werkzeug.datastructures import ImmutableMultiDict
|
|
|
|
from .config import CONFIG
|
|
from .ip_collector.asn_checker import is_asn_blacklisted, is_asn_whitelisted
|
|
from .ip_collector.reader import (
|
|
ListingType, match_bl_v4, match_bl_v6, match_wl_v4, match_wl_v6)
|
|
from .ip_collector.useragent import useragent_match_bl, useragent_match_wl
|
|
|
|
|
|
class ListenerEvaluatorBase(object):
|
|
'Base for evaluation classes.'
|
|
|
|
_status: ListingType = ListingType.UNLISTED
|
|
_asn_description = '-'
|
|
_result_description = '-'
|
|
|
|
def __init__(self, form: ImmutableMultiDict, logger: Logger):
|
|
self._logger = logger
|
|
self._form = form
|
|
self.process()
|
|
|
|
@cached_property
|
|
def response(self) -> Response:
|
|
response = make_response('alrighty', 200)
|
|
return response
|
|
|
|
def set_result(self, status: ListingType, description: str):
|
|
'Set internal result status.'
|
|
self._status = status
|
|
self._result_description = description
|
|
|
|
@cached_property
|
|
def _address(self) -> Optional[IPAddress]:
|
|
'Evaluate IP address.'
|
|
ip = self._form.get('ip')
|
|
if not ip:
|
|
self.response.headers['Icecast-Auth-Message'] = \
|
|
'IP address not specified'
|
|
self._status = ListingType.BLACKLISTED
|
|
self.set_result(
|
|
status=ListingType.BLACKLISTED,
|
|
description=f'IP not passed: {self._form}')
|
|
return
|
|
return ip_address(address=ip)
|
|
|
|
@cached_property
|
|
def _client_id(self) -> str:
|
|
return self._form.get('client', '-')
|
|
|
|
@cached_property
|
|
def _mount_user_pass(self) -> str:
|
|
mount = self._form.get('mount')
|
|
user = self._form.get('user')
|
|
user = repr(user) if user else '-'
|
|
passwd = self._form.get('pass')
|
|
passwd = repr(passwd) if passwd else '-'
|
|
return f'{mount!r}|{user}|{passwd}'
|
|
|
|
@cached_property
|
|
def _city(self) -> str:
|
|
if not self._address:
|
|
return '-'
|
|
try:
|
|
with Reader(fileish=CONFIG['geoip']['city_db_path']) as reader:
|
|
res = reader.city(ip_address=self._address)
|
|
# from IPython import embed; embed()
|
|
county = res.subdivisions.most_specific.name or '-'
|
|
city = res.city.name or '-'
|
|
return f'{res.country.name}|{county}|{city}'
|
|
except AddressNotFoundError:
|
|
return '-'
|
|
|
|
@cached_property
|
|
def asn(self) -> Optional[ASN]:
|
|
'Do the DB lookup, return results.'
|
|
if not self._address:
|
|
return
|
|
try:
|
|
with Reader(fileish=CONFIG['geoip']['asn_db_path']) as reader:
|
|
asn = reader.asn(ip_address=self._address)
|
|
self._asn_description = (
|
|
f'AS{asn.autonomous_system_number} ({asn.network}): '
|
|
f'{asn.autonomous_system_organization}')
|
|
return asn
|
|
except AddressNotFoundError:
|
|
pass
|
|
|
|
@cached_property
|
|
def _user_agent(self) -> Optional[str]:
|
|
'Evaluate useragent.'
|
|
return self._form.get('agent')
|
|
|
|
@cached_property
|
|
def _server_port(self) -> str:
|
|
server = self._form.get('server') or '-'
|
|
port = self._form.get('port') or '-'
|
|
return f'{server}:{port}'
|
|
|
|
def process(self):
|
|
'Placeholder method.'
|
|
|
|
|
|
class ListenerAddEvaluator(ListenerEvaluatorBase):
|
|
'Evaluating listener_add requests.'
|
|
|
|
@cached_property
|
|
def _referer(self) -> str:
|
|
if referer := self._form.get('ClientHeader.referer'):
|
|
return repr(referer)
|
|
return '-'
|
|
|
|
@cached_property
|
|
def _metadata(self) -> str:
|
|
if metadata := self._form.get('ClientHeader.icy-metadata'):
|
|
return f'Metadata: True({metadata})'
|
|
return 'Metadata: False'
|
|
|
|
def _match_wl(self) -> bool:
|
|
'Return `True` when whitelisted, `False` otherwise.'
|
|
# ASN lists
|
|
if self.asn and self.asn.autonomous_system_number:
|
|
if result := is_asn_whitelisted(
|
|
asn_id=self.asn.autonomous_system_number):
|
|
self.set_result(
|
|
status=ListingType.WHITELISTED, description=result)
|
|
return True
|
|
# Useragent whitelists
|
|
if self._user_agent:
|
|
if result := useragent_match_wl(user_agent=self._user_agent):
|
|
self.set_result(
|
|
status=ListingType.WHITELISTED, description=result)
|
|
return True
|
|
# Network whitelists
|
|
result = None
|
|
if type(self._address) is IPv4Address:
|
|
result = match_wl_v4(address=self._address)
|
|
elif type(self._address) is IPv6Address:
|
|
result = match_wl_v6(address=self._address)
|
|
if result:
|
|
self.set_result(status=ListingType.WHITELISTED, description=result)
|
|
return True
|
|
return False
|
|
|
|
def _match_bl(self) -> bool:
|
|
'Return `True` when blacklisted, `False` otherwise.'
|
|
# ASN lists
|
|
if self.asn and self.asn.autonomous_system_number:
|
|
if result := is_asn_blacklisted(
|
|
asn_id=self.asn.autonomous_system_number):
|
|
self.set_result(
|
|
status=ListingType.BLACKLISTED, description=result)
|
|
return True
|
|
# Useragent blacklists
|
|
if self._user_agent:
|
|
if result := useragent_match_bl(user_agent=self._user_agent):
|
|
self.set_result(
|
|
status=ListingType.BLACKLISTED, description=result)
|
|
return True
|
|
# Network blacklists
|
|
result = None
|
|
if type(self._address) is IPv4Address:
|
|
result = match_bl_v4(address=self._address)
|
|
elif type(self._address) is IPv6Address:
|
|
result = match_bl_v6(address=self._address)
|
|
if result:
|
|
self.set_result(status=ListingType.BLACKLISTED, description=result)
|
|
return True
|
|
return False
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f'ADD: <{self._client_id}|{self._address}> <{self._status}|'
|
|
f'{self._result_description}> <{self._city}> '
|
|
f'<{self._asn_description}> <{self._user_agent}> '
|
|
f'<{self._mount_user_pass}> <{self._metadata}> '
|
|
f'<{self._server_port}> <{self._referer}>')
|
|
|
|
def process(self):
|
|
'Start processing.'
|
|
super().process()
|
|
if not self._address:
|
|
self._logger.debug(msg=self)
|
|
return
|
|
self._match_wl() or self._match_bl() # type: ignore
|
|
if self._status == ListingType.BLACKLISTED:
|
|
self.response.headers['Icecast-Auth-Message'] = 'Stream not found'
|
|
else:
|
|
self.response.headers['Icecast-Auth-User'] = 1
|
|
self._logger.debug(msg=self)
|
|
|
|
|
|
class ListenerRemoveEvaluator(ListenerEvaluatorBase):
|
|
'Evaluating disconnecting listeners.'
|
|
|
|
@cached_property
|
|
def _duration(self) -> str:
|
|
seconds = int(self._form.get('duration') or 0)
|
|
return str(timedelta(seconds=seconds))
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f'REMOVE: <{self._client_id}|{self._address}> <{self._duration}> '
|
|
f'<{self._city}> <{self._asn_description}> <{self._user_agent}> '
|
|
f'<{self._mount_user_pass}> <{self._server_port}>')
|
|
|
|
def process(self):
|
|
'Start processing.'
|
|
super().process()
|
|
if not self._address:
|
|
self._logger.debug(msg=self)
|
|
return
|
|
self.asn # Do the ASN variables setting for logging
|
|
self._logger.debug(msg=self)
|