__all__ = [] import time import sys import cgitb import binascii from wsgitools.filters import CloseableList, CloseableIterator try: import io except ImportError: try: import cStringIO as io except ImportError: import StringIO as io try: next except NameError: def next(it): return it.next() __all__.append("SubdirMiddleware") class SubdirMiddleware: """Middleware choosing wsgi applications based on a dict.""" def __init__(self, default, mapping={}): """ @type default: wsgi app @type mapping: {str: wsgi app} """ self.default = default self.mapping = mapping def __call__(self, environ, start_response): """wsgi interface @type environ: {str: str} @rtype: gen([str]) """ assert isinstance(environ, dict) app = None script = environ["PATH_INFO"] path_info = "" while '/' in script: if script in self.mapping: app = self.mapping[script] break script, tail = script.rsplit('/', 1) path_info = "/%s%s" % (tail, path_info) if app is None: app = self.mapping.get(script, None) if app is None: app = self.default environ["SCRIPT_NAME"] += script environ["PATH_INFO"] = path_info return app(environ, start_response) __all__.append("NoWriteCallableMiddleware") class NoWriteCallableMiddleware: """This middleware wraps a wsgi application that needs the return value of C{start_response} function to a wsgi application that doesn't need one by writing the data to a C{StringIO} and then making it be the first result element.""" def __init__(self, app): """Wraps wsgi application app.""" self.app = app def __call__(self, environ, start_response): """wsgi interface @type environ: {str, str} @rtype: gen([str]) """ assert isinstance(environ, dict) todo = [] def modified_start_response(status, headers, exc_info=None): assert isinstance(status, str) assert isinstance(headers, list) if exc_info is not None: todo.append(None) return start_response(status, headers, exc_info) else: sio = io.StringIO() todo.append((status, headers, sio)) return sio.write ret = self.app(environ, modified_start_response) assert hasattr(ret, "__iter__") assert len(todo) == 1 if todo[0] is None: return ret status, headers, data = todo[0] data = data.getvalue() if not data: start_response(status, headers) return ret if isinstance(ret, list): ret.insert(0, data) start_response(status, headers) return ret ret = iter(ret) stopped = False try: first = next(ret) except StopIteration: stopped = True start_response(status, headers) if stopped: return CloseableList(getattr(ret, "close", None), (data,)) return CloseableIterator(getattr(ret, "close", None), (data, first), ret) __all__.append("ContentLengthMiddleware") class ContentLengthMiddleware: """Guesses the content length header if possible. @note: The application used must not use the C{write} callable returned by C{start_response}.""" def __init__(self, app, maxstore=0): """Wraps wsgi application app. It can also store the first result bytes to possibly return a list of strings which will make guessing the size of iterators possible. At most maxstore bytes will be accumulated. Please note that a value larger than 0 will violate the wsgi standard. The magical value C{()} will make it always gather all data. @type maxstore: int or () """ self.app = app self.maxstore = maxstore def __call__(self, environ, start_response): """wsgi interface""" assert isinstance(environ, dict) todo = [] def modified_start_response(status, headers, exc_info=None): assert isinstance(status, str) assert isinstance(headers, list) if (exc_info is not None or [v for h, v in headers if h.lower() == "content-length"]): todo[:] = (None,) return start_response(status, headers, exc_info) else: todo[:] = ((status, headers),) def raise_not_imp(*args): raise NotImplementedError return raise_not_imp ret = self.app(environ, modified_start_response) assert hasattr(ret, "__iter__") if todo and todo[0] is None: # nothing to do #print "content-length: nothing" return ret if isinstance(ret, list): #print "content-length: simple" status, headers = todo[0] length = sum(map(len, ret)) headers.append(("Content-length", str(length))) start_response(status, headers) return ret ret = iter(ret) stopped = False data = CloseableList(getattr(ret, "close", None)) length = 0 try: data.append(next(ret)) # fills todo length += len(data[-1]) except StopIteration: stopped = True status, headers = todo[0] while (not stopped) and length < self.maxstore: try: data.append(next(ret)) length += len(data[-1]) except StopIteration: stopped = True if stopped: #print "content-length: gathered" headers.append(("Content-length", str(length))) start_response(status, headers) return data #print "content-length: passthrough" start_response(status, headers) return CloseableIterator(getattr(ret, "close", None), data, ret) def storable(environ): if environ["REQUEST_METHOD"] != "GET": return False return True def cacheable(environ): if environ.get("HTTP_CACHE_CONTROL", "") == "max-age=0": return False return True __all__.append("CachingMiddleware") class CachingMiddleware: """Caches reponses to requests based on C{SCRIPT_NAME}, C{PATH_INFO} and C{QUERY_STRING}.""" def __init__(self, app, maxage=60, storable=storable, cacheable=cacheable): """ @param app: is a wsgi application to be cached. @type maxage: int @param maxage: is the number of seconds a reponse may be cached. @param storable: is a predicate that determines whether the response may be cached at all based on the C{environ} dict. @param cacheable: is a predicate that determines whether this request invalidates the cache.""" self.app = app self.maxage = maxage self.storable = storable self.cacheable = cacheable self.cache = {} def __call__(self, environ, start_response): """wsgi interface @type environ: {str: str} """ assert isinstance(environ, dict) if not self.storable(environ): return self.app(environ, start_response) path = environ.get("SCRIPT_NAME", "/") path += environ.get("PATH_INFO", '') path += "?" + environ.get("QUERY_STRING", "") if self.cacheable(environ) and path in self.cache: if self.cache[path][0] + self.maxage >= time.time(): start_response(self.cache[path][1], self.cache[path][2]) return self.cache[path][3] else: del self.cache[path] cache_object = [time.time(), "", [], []] def modified_start_respesponse(status, headers, exc_info=None): assert isinstance(status, str) assert isinstance(headers, list) if exc_info is not None: return self.app(status, headers, exc_info) cache_object[1] = status cache_object[2] = headers write = start_response(status, headers) def modified_write(data): cache_object[3].append(data) write(data) return modified_write ret = self.app(environ, modified_start_respesponse) assert hasattr(ret, "__iter__") if isinstance(ret, list): cache_object[3].extend(ret) self.cache[path] = cache_object return ret def pass_through(): for data in ret: cache_object[3].append(data) yield data self.cache[path] = cache_object return CloseableIterator(getattr(ret, "close", None), pass_through()) __all__.append("DictAuthChecker") class DictAuthChecker: """Verifies usernames and passwords by looking them up in a dict.""" def __init__(self, users): """ @type users: {str: str} @param users: is a dict mapping usernames to password.""" self.users = users def __call__(self, username, password): """check_function interface taking username and password and resulting in a bool. @type username: str @type password: str @rtype: bool """ return username in self.users and self.users[username] == password __all__.append("BasicAuthMiddleware") class BasicAuthMiddleware: """Middleware implementing HTTP Basic Auth.""" def __init__(self, app, check_function, realm='www', app401=None): """ @param app: is a WSGI application. @param check_function: is a function taking three arguments username, password and environment returning a bool indicating whether the request may is allowed. The older interface of taking only the first two arguments is still supported via catching a C{TypeError}. @type realm: str @param app401: is an optional WSGI application to be used for error messages """ self.app = app self.check_function = check_function self.realm = realm self.app401 = app401 def __call__(self, environ, start_response): """wsgi interface @type environ: {str: str} """ assert isinstance(environ, dict) auth = environ.get("HTTP_AUTHORIZATION") if not auth or ' ' not in auth: return self.authorization_required(environ, start_response) auth_type, enc_auth_info = auth.split(None, 1) try: auth_info = enc_auth_info.decode("base64") except binascii.Error: return self.authorization_required(environ, start_response) if auth_type.lower() != "basic" or ':' not in auth_info: return self.authorization_required(environ, start_response) username, password = auth_info.split(':', 1) try: result = self.check_function(username, password, environ) except TypeError: # catch old interface result = self.check_function(username, password) if result: environ["REMOTE_USER"] = username return self.app(environ, start_response) return self.authorization_required(environ, start_response) def authorization_required(self, environ, start_response): """wsgi application for indicating authorization is required. @type environ: {str: str} """ if self.app401 is None: status = "401 Authorization required" html = "