#!/usr/bin/env python2.5 __all__ = [] import random import md5 import time sysrand = random.SystemRandom() def parse_digest_response(data, ret=dict()): """internal""" data = data.strip() key, rest = data.split('=', 1) # raises ValueError if rest.startswith('"'): rest = rest[1:] value, rest = rest.split('"', 1) # raises ValueError if not rest: ret[key] = value return ret if rest[0] != ',': raise ValueError, "invalid digest response" rest = rest[1:] else: if ',' not in rest: ret[key] = rest return ret value, rest = rest.split(',' , 1) ret[key] = value return parse_digest_response(rest, ret) class AuthenticationRequired(Exception): pass __all__.append("AuthTokenGenerator") class AuthTokenGenerator: """Generates authentification tokens for AuthDigestMiddleware. The interface consists of beeing callable with a username and having a realm attribute being a string.""" def __init__(self, realm, getpass): """Realm is a string according to RFC2617. The provided getpass function is called with a username and password is expected as result. None may be used as an invalid password.""" self.realm = realm self.getpass = getpass def __call__(self, username, algo="md5"): """Generates an authentification token from a username.""" assert algo.lower() in ["md5", "md5-sess"] password = self.getpass(username) if password is None: return None a1 = "%s:%s:%s" % (username, self.realm, password) return md5.new(a1).hexdigest() __all__.append("AuthDigestMiddleware") class AuthDigestMiddleware: """Middleware partly implementing RFC2617. (md5-sess was omited)""" algorithms = {"md5": lambda data: md5.new(data).hexdigest()} def __init__(self, app, gentoken, maxage=300, maxuses=5): """app is the wsgi application to be served with authentification. gentoken has to have the same functionality and interface as the AuthTokenGenerator class. maxage is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. maxuses is the number of times a nonce may be used (with different nc values). A value of 1 makes nonces usable exactly once resulting in more requests. Defaults to 5. """ self.app = app self.gentoken = gentoken self.maxage = maxage self.maxuses = maxuses # TODO implement better nonce-lookup (i.e. not O(n)) self.nonces = [] # [(creation_time, nonce_value, useage_count)] # as [(float, str, int)] def __call__(self, environ, start_response): """wsgi interface""" self.cleanup_nonces() try: auth = environ["HTTP_AUTHORIZATION"] # raises KeyError method, rest = auth.split(' ', 1) # raises ValueError if method.lower() != "digest": raise AuthenticationRequired credentials = parse_digest_response(rest) # raises ValueError ### Check algorithm field credentials["algorithm"] = credentials.get("algorithm", "md5").lower() if not credentials["algorithm"] in self.algorithms: raise AuthenticationRequired ### Check uri field # Doing this by stripping known parts from the passed uri field # until something trivial remains, as the uri cannot be # reconstructed from the environment exactly. uri = credentials["uri"] # raises KeyError if "QUERY_STRING" in environ and environ["QUERY_STRING"]: if not uri.endswith(environ["QUERY_STRING"]): raise AuthenticationRequired uri = uri[:-len(environ["QUERY_STRING"])] if "SCRIPT_NAME" in environ: if not uri.startswith(environ["SCRIPT_NAME"]): raise AuthenticationRequired uri = uri[len(environ["SCRIPT_NAME"]):] if "PATH_INFO" in environ: if not uri.startswith(environ["PATH_INFO"]): raise AuthenticationRequired uri = uri[len(environ["PATH_INFO"]):] if uri not in ('', '?'): raise AuthenticationRequired del uri if ("username" not in credentials or "nonce" not in credentials or "response" not in credentials or "qop" in credentials and ( credentials["qop"] != "auth" or "nc" not in credentials or credentials["nc"].lower().strip("0123456789abcdef") or "cnonce" not in credentials)): raise AuthenticationRequired if not self.check_nonce(credentials): # raises KeyError, ValueError return self.authorization_required(environ, start_response, stale=True) # stale nonce! # raises KeyError, ValueError response = self.auth_response(credentials, environ["REQUEST_METHOD"]) if response is None or response != credentials["response"]: raise AuthenticationRequired except (KeyError, ValueError, AuthenticationRequired): return self.authorization_required(environ, start_response) else: environ["REMOTE_USER"] = credentials["username"] def modified_start_response(status, headers, exc_info=None): digest = dict(nextnonce=self.new_nonce()) if "qop" in credentials: digest["qop"] = "auth" digest["cnonce"] = credentials["cnonce"] # no KeyError digest["rspauth"] = self.auth_response(credentials, "") challenge = ", ".join(map('%s="%s"'.__mod__, digest.items())) headers.append(("Authentication-Info", challenge)) return start_response(status, headers, exc_info) return self.app(environ, modified_start_response) def auth_response(self, credentials, reqmethod): """internal method generating authentication tokens""" username = credentials["username"] algo = credentials["algorithm"] uri = credentials["uri"] nonce = credentials["nonce"] a1h = self.gentoken(username, algo) if a1h is None: raise ValueError a2 = "%s:%s" % (reqmethod, uri) a2h = self.algorithms[algo](a2) qop = credentials.get("qop", None) if qop is None: dig = ":".join((a1h, nonce, a2h)) else: nc = credentials["nc"] # raises KeyError cnonce = credentials["cnonce"] # raises KeyError if qop != "auth": return ValueError dig = ":".join((a1h, nonce, nc, cnonce, qop, a2h)) digh = self.algorithms[algo](dig) return digh def cleanup_nonces(self): """internal methods cleaning list of valid nonces""" old = time.time() - self.maxage while self.nonces and self.nonces[0][0] < old: self.nonces.pop(0) def check_nonce(self, credentials): """internal method checking nonce validity""" nonce = credentials["nonce"] qop = credentials.get("qop", None) if qop is None: nc = 1 else: nc = long(credentials["nc"], 16) # raises KeyError, ValueError for p, (nt, nv, uses) in enumerate(self.nonces): if nv == nonce: if uses != nc: del self.nonces[p] return False if uses >= self.maxuses: del self.nonces[p] else: self.nonces[p] = (nt, nv, uses+1) return True return False def new_nonce(self): """internal method generating a new nonce""" nonce_time = time.time() randval = sysrand.getrandbits(33*8) nonce_value = ("%066X" % randval).decode("hex").encode("base64").strip() self.nonces.append((nonce_time, nonce_value, 1)) return nonce_value def authorization_required(self, environ, start_response, stale=False): """internal method implementing wsgi interface, serving 401 page""" nonce = self.new_nonce() digest = dict(nonce=nonce, realm=self.gentoken.realm, algorithm="md5", qop="auth") if stale: digest["stale"] = "TRUE" challenge = ", ".join(map('%s="%s"'.__mod__, digest.items())) status = "401 Not authorized" headers = [("Content-type", "text/html"), ("WWW-Authenticate", "Digest %s" % challenge)] data = "401 Not authorized

" data += "401 Not authorized

" headers.append(("Content-length", str(len(data)))) start_response(status, headers) if environ["REQUEST_METHOD"] == "HEAD": return [] return [data]