icy-governor/src/middleware.py

58 lines
2.2 KiB
Python

from ipaddress import (
IPv4Address, IPv4Network, IPv6Address, IPv6Network, ip_address, ip_network)
from types import FunctionType, MethodType
from typing import Iterable, Set, Union
from flask.app import Flask
from werkzeug.wrappers import Request, Response
from .config import CONFIG
class RequesterFilterMiddleWare(object):
'Filtering requester IPs, only allowing configured ones.'
_IP4_NETS: Set[IPv4Network] = set()
_IP6_NETS: Set[IPv6Network] = set()
def __init__(self, app: MethodType, deny_code: int = 404):
self._app = app
self._flask_app: Flask = app.__self__ # type: ignore
self._deny_code = deny_code
for item in CONFIG['allowed_server_nets']:
addr = ip_network(address=item)
if type(addr) is IPv4Network:
self._IP4_NETS.add(addr)
elif type(addr) is IPv6Network:
self._IP6_NETS.add(addr)
def _deny_return(
self, addr: Union[IPv4Address, IPv6Address], environ: dict,
start_response: FunctionType
) -> Iterable[bytes]:
'Return a deny response.'
self._flask_app.logger.warning(f'API request from {addr} disallowed.')
res = Response(u'', status=self._deny_code)
return res(environ=environ, start_response=start_response)
def __call__(
self, environ: dict, start_response: FunctionType
) -> Iterable[bytes]:
'Evaluating remote IP, returning 404 if not allowed.'
# start_response is
# WSGIRequestHandler.run_wsgi.<locals>.start_response from
# werkzeug/serving.py
request = Request(environ=environ)
addr = ip_address(address=request.remote_addr)
try:
if type(addr) is IPv4Address:
# https://stackoverflow.com/a/8534381/1067833
next(filter(lambda net: addr in net, self._IP4_NETS))
elif type(addr) is IPv6Address:
next(filter(lambda net: addr in net, self._IP6_NETS))
except StopIteration:
return self._deny_return(
addr=addr, environ=environ, start_response=start_response)
return self._app(
environ=environ, start_response=start_response)