summaryrefslogtreecommitdiff
path: root/wsgitools/middlewares.py
diff options
context:
space:
mode:
Diffstat (limited to 'wsgitools/middlewares.py')
-rw-r--r--wsgitools/middlewares.py176
1 files changed, 107 insertions, 69 deletions
diff --git a/wsgitools/middlewares.py b/wsgitools/middlewares.py
index ef9fe84..8577384 100644
--- a/wsgitools/middlewares.py
+++ b/wsgitools/middlewares.py
@@ -6,27 +6,28 @@ import sys
import cgitb
import collections
import io
+import typing
from wsgitools.internal import bytes2str, str2bytes
from wsgitools.filters import CloseableList, CloseableIterator
from wsgitools.authentication import AuthenticationRequired, \
ProtocolViolation, AuthenticationMiddleware
+from wsgitools.internal import (
+ Environ, HeaderList, OptExcInfo, StartResponse, WriteCallback, WsgiApp
+)
__all__.append("SubdirMiddleware")
class SubdirMiddleware:
"""Middleware choosing wsgi applications based on a dict."""
- def __init__(self, default, mapping={}):
- """
- @type default: wsgi app
- @type mapping: {str: wsgi app}
- """
+ def __init__(
+ self, default: WsgiApp, mapping: typing.Dict[str, WsgiApp] = {}
+ ):
self.default = default
self.mapping = mapping
- def __call__(self, environ, start_response):
- """wsgi interface
- @type environ: {str: str}
- @rtype: gen([bytes])
- """
+ def __call__(
+ self, environ: Environ, start_response: StartResponse
+ ) -> typing.Iterable[bytes]:
+ """wsgi interface"""
assert isinstance(environ, dict)
app = None
script = environ["PATH_INFO"]
@@ -51,22 +52,24 @@ class NoWriteCallableMiddleware:
C{start_response} function to a wsgi application that doesn't need one by
writing the data to a C{BytesIO} and then making it be the first result
element."""
- def __init__(self, app):
+ def __init__(self, app: WsgiApp):
"""Wraps wsgi application app."""
self.app = app
- def __call__(self, environ, start_response):
- """wsgi interface
- @type environ: {str, str}
- @rtype: gen([bytes])
- """
+ def __call__(
+ self, environ: Environ, start_response: StartResponse
+ ) -> typing.Iterable[bytes]:
+ """wsgi interface"""
assert isinstance(environ, dict)
- todo = [None]
+ todo: typing.Optional[typing.Tuple[str, HeaderList]] = None
sio = io.BytesIO()
gotiterdata = False
- def write_calleable(data):
+ def write_calleable(data: bytes) -> None:
assert not gotiterdata
sio.write(data)
- def modified_start_response(status, headers, exc_info=None):
+ def modified_start_response(
+ status: str, headers: HeaderList, exc_info: OptExcInfo = None
+ ) -> WriteCallback:
+ nonlocal todo
try:
if sio.tell() > 0 or gotiterdata:
assert exc_info is not None
@@ -75,7 +78,7 @@ class NoWriteCallableMiddleware:
exc_info = None
assert isinstance(status, str)
assert isinstance(headers, list)
- todo[0] = (status, headers)
+ todo = (status, headers)
return write_calleable
ret = self.app(environ, modified_start_response)
@@ -96,8 +99,8 @@ class NoWriteCallableMiddleware:
else:
gotiterdata = True
- assert todo[0] is not None
- status, headers = todo[0]
+ assert todo is not None
+ status, headers = todo
data = sio.getvalue()
if isinstance(ret, list):
@@ -117,27 +120,36 @@ class ContentLengthMiddleware:
"""Guesses the content length header if possible.
@note: The application used must not use the C{write} callable returned by
C{start_response}."""
- def __init__(self, app, maxstore=0):
+ maxstore: typing.Union[float, int]
+ def __init__(
+ self, app: WsgiApp, maxstore: typing.Union[int, typing.Tuple[()]] = 0
+ ):
"""Wraps wsgi application app. If the application returns a list, the
total length of strings is available and the content length header is
set unless there already is one. For an iterator data is accumulated up
to a total of maxstore bytes (where maxstore=() means infinity). If the
iterator is exhaused within maxstore bytes a content length header is
added unless already present.
- @type maxstore: int or ()
@note: that setting maxstore to a value other than 0 will violate the
wsgi standard
"""
self.app = app
if maxstore == ():
- maxstore = float("inf")
- self.maxstore = maxstore
- def __call__(self, environ, start_response):
+ self.maxstore = float("inf")
+ else:
+ assert isinstance(maxstore, int)
+ self.maxstore = maxstore
+ def __call__(
+ self, environ: Environ, start_response: StartResponse
+ ) -> typing.Iterable[bytes]:
"""wsgi interface"""
assert isinstance(environ, dict)
- todo = []
+ todo: typing.Optional[typing.Tuple[str, HeaderList]] = None
gotdata = False
- def modified_start_response(status, headers, exc_info=None):
+ def modified_start_response(
+ status: str, headers: HeaderList, exc_info: OptExcInfo = None
+ ) -> WriteCallback:
+ nonlocal todo
try:
if gotdata:
assert exc_info is not None
@@ -146,8 +158,8 @@ class ContentLengthMiddleware:
exc_info = None
assert isinstance(status, str)
assert isinstance(headers, list)
- todo[:] = ((status, headers),)
- def raise_not_imp(*args):
+ todo = (status, headers)
+ def raise_not_imp(_: bytes) -> None:
raise NotImplementedError
return raise_not_imp
@@ -156,8 +168,8 @@ class ContentLengthMiddleware:
if isinstance(ret, list):
gotdata = True
- assert bool(todo)
- status, headers = todo[0]
+ assert todo is not None
+ status, headers = todo
if all(k.lower() != "content-length" for k, _ in headers):
length = sum(map(len, ret))
headers.append(("Content-Length", str(length)))
@@ -173,8 +185,8 @@ class ContentLengthMiddleware:
except StopIteration:
stopped = True
gotdata = True
- assert bool(todo)
- status, headers = todo[0]
+ assert todo is not None
+ status, headers = todo
data = CloseableList(getattr(ret, "close", None))
if first:
data.append(first)
@@ -197,12 +209,12 @@ class ContentLengthMiddleware:
return CloseableIterator(getattr(ret, "close", None), data, ret)
-def storable(environ):
+def storable(environ: Environ) -> bool:
if environ["REQUEST_METHOD"] != "GET":
return False
return True
-def cacheable(environ):
+def cacheable(environ: Environ) -> bool:
if environ.get("HTTP_CACHE_CONTROL", "") == "max-age=0":
return False
return True
@@ -213,16 +225,24 @@ class CachingMiddleware:
C{QUERY_STRING}."""
class CachedRequest:
- def __init__(self, timestamp):
+ def __init__(self, timestamp: float):
self.timestamp = timestamp
self.status = ""
- self.headers = []
- self.body = []
-
- def __init__(self, app, maxage=60, storable=storable, cacheable=cacheable):
+ self.headers: HeaderList = []
+ self.body: typing.List[bytes] = []
+
+ cache: typing.Dict[str, CachedRequest]
+ lastcached: typing.Deque[typing.Tuple[str, float]]
+
+ def __init__(
+ self,
+ app: WsgiApp,
+ maxage: int = 60,
+ storable: typing.Callable[[Environ], bool] = storable,
+ cacheable: typing.Callable[[Environ], bool] = cacheable,
+ ):
"""
@param app: is a wsgi application to be cached.
- @type maxage: int
@param maxage: is the number of seconds a reponse may be cached.
@param storable: is a predicate that determines whether the response
may be cached at all based on the C{environ} dict.
@@ -235,13 +255,20 @@ class CachingMiddleware:
self.cache = {}
self.lastcached = collections.deque()
- def insert_cache(self, key, obj, now=None):
+ def insert_cache(
+ self,
+ key: str,
+ obj: CachedRequest,
+ now: typing.Optional[float] = None,
+ ) -> None:
if now is None:
now = time.time()
self.cache[key] = obj
self.lastcached.append((key, now))
- def prune_cache(self, maxclean=16, now=None):
+ def prune_cache(
+ self, maxclean: int = 16, now: typing.Optional[float] = None
+ ) -> None:
if now is None:
now = time.time()
old = now - self.maxage
@@ -258,10 +285,10 @@ class CachingMiddleware:
if obj.timestamp <= old:
del self.cache[key]
- def __call__(self, environ, start_response):
- """wsgi interface
- @type environ: {str: str}
- """
+ def __call__(
+ self, environ: Environ, start_response: StartResponse
+ ) -> typing.Iterable[bytes]:
+ """wsgi interface"""
assert isinstance(environ, dict)
now = time.time()
self.prune_cache(now=now)
@@ -279,7 +306,9 @@ class CachingMiddleware:
else:
del self.cache[path]
cache_object = self.CachedRequest(now)
- def modified_start_respesponse(status, headers, exc_info=None):
+ def modified_start_respesponse(
+ status: str, headers: HeaderList, exc_info: OptExcInfo = None
+ ) -> WriteCallback:
try:
if cache_object.body:
assert exc_info is not None
@@ -291,7 +320,7 @@ class CachingMiddleware:
cache_object.status = status
cache_object.headers = headers
write = start_response(status, list(headers))
- def modified_write(data):
+ def modified_write(data: bytes) -> None:
cache_object.body.append(data)
write(data)
return modified_write
@@ -303,7 +332,7 @@ class CachingMiddleware:
cache_object.body.extend(ret)
self.insert_cache(path, cache_object, now)
return ret
- def pass_through():
+ def pass_through() -> typing.Iterator[bytes]:
for data in ret:
cache_object.body.append(data)
yield data
@@ -313,18 +342,13 @@ class CachingMiddleware:
__all__.append("DictAuthChecker")
class DictAuthChecker:
"""Verifies usernames and passwords by looking them up in a dict."""
- def __init__(self, users):
+ def __init__(self, users: typing.Dict[str, str]):
"""
- @type users: {str: str}
@param users: is a dict mapping usernames to password."""
self.users = users
- def __call__(self, username, password, environ):
+ def __call__(self, username: str, password: str, environ: Environ) -> bool:
"""check_function interface taking username and password and resulting
in a bool.
- @type username: str
- @type password: str
- @type environ: {str: object}
- @rtype: bool
"""
return username in self.users and self.users[username] == password
@@ -334,7 +358,13 @@ class BasicAuthMiddleware(AuthenticationMiddleware):
the warpped application the environ dictionary is augmented by a REMOTE_USER
key."""
authorization_method = "basic"
- def __init__(self, app, check_function, realm='www', app401=None):
+ def __init__(
+ self,
+ app: WsgiApp,
+ check_function: typing.Callable[[str, str, Environ], bool],
+ realm: str = 'www',
+ app401: typing.Optional[WsgiApp] = None,
+ ):
"""
@param app: is a WSGI application.
@param check_function: is a function taking three arguments username,
@@ -342,7 +372,6 @@ class BasicAuthMiddleware(AuthenticationMiddleware):
request may is allowed. The older interface of taking only the
first two arguments is still supported via catching a
C{TypeError}.
- @type realm: str
@param app401: is an optional WSGI application to be used for error
messages
"""
@@ -351,7 +380,9 @@ class BasicAuthMiddleware(AuthenticationMiddleware):
self.realm = realm
self.app401 = app401
- def authenticate(self, auth, environ):
+ def authenticate(
+ self, auth: str, environ: Environ
+ ) -> typing.Dict[str, str]:
assert isinstance(auth, str)
assert isinstance(environ, dict)
authb = str2bytes(auth)
@@ -372,10 +403,17 @@ class BasicAuthMiddleware(AuthenticationMiddleware):
return dict(user=username)
raise AuthenticationRequired("credentials not valid")
- def www_authenticate(self, exception):
+ def www_authenticate(
+ self, exception: AuthenticationRequired
+ ) -> typing.Tuple[str, str]:
return ("WWW-Authenticate", 'Basic realm="%s"' % self.realm)
- def authorization_required(self, environ, start_response, exception):
+ def authorization_required(
+ self,
+ environ: Environ,
+ start_response: StartResponse,
+ exception: AuthenticationRequired,
+ ) -> typing.Iterable[bytes]:
if self.app401 is not None:
return self.app401(environ, start_response)
return AuthenticationMiddleware.authorization_required(
@@ -385,13 +423,13 @@ __all__.append("TracebackMiddleware")
class TracebackMiddleware:
"""In case the application throws an exception this middleware will show an
html-formatted traceback using C{cgitb}."""
- def __init__(self, app):
+ def __init__(self, app: WsgiApp):
"""app is the wsgi application to proxy."""
self.app = app
- def __call__(self, environ, start_response):
- """wsgi interface
- @type environ: {str: str}
- """
+ def __call__(
+ self, environ: Environ, start_response: StartResponse
+ ) -> typing.Iterable[bytes]:
+ """wsgi interface"""
try:
assert isinstance(environ, dict)
ret = self.app(environ, start_response)