58 lines
2.2 KiB
Python
58 lines
2.2 KiB
Python
from ipaddress import (
|
|
IPv4Address, IPv4Network, IPv6Address, IPv6Network, ip_address, ip_network)
|
|
from types import FunctionType, MethodType
|
|
from typing import Iterable, Set, Union
|
|
|
|
from flask.app import Flask
|
|
from werkzeug.wrappers import Request, Response
|
|
|
|
from .config import CONFIG
|
|
|
|
|
|
class RequesterFilterMiddleWare(object):
|
|
'Filtering requester IPs, only allowing configured ones.'
|
|
|
|
_IP4_NETS: Set[IPv4Network] = set()
|
|
_IP6_NETS: Set[IPv6Network] = set()
|
|
|
|
def __init__(self, app: MethodType, deny_code: int = 404):
|
|
self._app = app
|
|
self._flask_app: Flask = app.__self__ # type: ignore
|
|
self._deny_code = deny_code
|
|
for item in CONFIG['allowed_server_nets']:
|
|
addr = ip_network(address=item)
|
|
if type(addr) is IPv4Network:
|
|
self._IP4_NETS.add(addr)
|
|
elif type(addr) is IPv6Network:
|
|
self._IP6_NETS.add(addr)
|
|
|
|
def _deny_return(
|
|
self, addr: Union[IPv4Address, IPv6Address], environ: dict,
|
|
start_response: FunctionType
|
|
) -> Iterable[bytes]:
|
|
'Return a deny response.'
|
|
self._flask_app.logger.warning(f'API request from {addr} disallowed.')
|
|
res = Response(u'', status=self._deny_code)
|
|
return res(environ=environ, start_response=start_response)
|
|
|
|
def __call__(
|
|
self, environ: dict, start_response: FunctionType
|
|
) -> Iterable[bytes]:
|
|
'Evaluating remote IP, returning 404 if not allowed.'
|
|
# start_response is
|
|
# WSGIRequestHandler.run_wsgi.<locals>.start_response from
|
|
# werkzeug/serving.py
|
|
request = Request(environ=environ)
|
|
addr = ip_address(address=request.remote_addr)
|
|
try:
|
|
if type(addr) is IPv4Address:
|
|
# https://stackoverflow.com/a/8534381/1067833
|
|
next(filter(lambda net: addr in net, self._IP4_NETS))
|
|
elif type(addr) is IPv6Address:
|
|
next(filter(lambda net: addr in net, self._IP6_NETS))
|
|
except StopIteration:
|
|
return self._deny_return(
|
|
addr=addr, environ=environ, start_response=start_response)
|
|
return self._app(
|
|
environ=environ, start_response=start_response)
|