#!/usr/bin/env python2.5 __all__ = [] import random try: from hashlib import md5 except ImportError: from md5 import md5 import time sysrand = random.SystemRandom() def parse_digest_response(data, ret=dict()): """internal @raises ValueError: >>> parse_digest_response('foo=bar') {'foo': 'bar'} >>> parse_digest_response('foo="bar"') {'foo': 'bar'} >>> sorted(parse_digest_response('foo="bar=qux",spam=egg').items()) [('foo', 'bar=qux'), ('spam', 'egg')] """ data = data.strip() key, rest = data.split('=', 1) # raises ValueError if rest.startswith('"'): rest = rest[1:] value, rest = rest.split('"', 1) # raises ValueError if not rest: ret[key] = value return ret if rest[0] != ',': raise ValueError, "invalid digest response" rest = rest[1:] else: if ',' not in rest: ret[key] = rest return ret value, rest = rest.split(',' , 1) ret[key] = value return parse_digest_response(rest, ret) class AuthenticationRequired(Exception): pass __all__.append("AuthTokenGenerator") class AuthTokenGenerator: """Generates authentification tokens for AuthDigestMiddleware. The interface consists of beeing callable with a username and having a realm attribute being a string.""" def __init__(self, realm, getpass): """ @type realm: str @param realm: is a string according to RFC2617. @param getpass: this function is called with a username and password is expected as result. None may be used as an invalid password.""" self.realm = realm self.getpass = getpass def __call__(self, username, algo="md5"): """Generates an authentification token from a username. @type username: str @rtype: str """ assert algo.lower() in ["md5", "md5-sess"] password = self.getpass(username) if password is None: return None a1 = "%s:%s:%s" % (username, self.realm, password) return md5(a1).hexdigest() __all__.append("NonceStoreBase") class NonceStoreBase: """Nonce storage interface.""" def __init__(self): pass def newnonce(self): """ This method is to be overriden and should return new nonces. @rtype: str """ raise NotImplementedError def isnonce(self, nonce): """ This method is to be overridden and should do a quick check for whether the given nonce has a chance to be a valid one. This function must not return false for a stale nonce. @type nonce: str @rtype: bool """ raise NotImplementedError def checknonce(self, nonce, qop, nc): """ This method is to be overridden and should do a thorough check for whether the given nonce is a valid one taking qop and nc into account. @type nonce: str @type qop: str or None @type nc: str or None @rtype: bool """ raise NotImplementedError def format_time(seconds): """ internal method formatting a unix time to a fixed-length string @type seconds: float @rtype: str """ return "%13X" % long(seconds * 1000000) __all__.append("StatelessNonceStore") class StatelessNonceStore(NonceStoreBase): """ This is a stateless nonce storage that cannot check the usage count for a nonce and thus cannot protect against replay attacks. It however can make it difficult by posing a timeout on nonces and making it difficult to forge nonces. This nonce store is usable with scgi.forkpool. """ def __init__(self, maxage=300, secret=None): """ @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. @type secret: str @param secret: if not given, a secret is generated and is therefore shared after forks. Knowing this secret permits creating nonces. """ NonceStoreBase.__init__(self) self.maxage = maxage if secret: self.server_secret = secret else: self.server_secret = ("%066X" % sysrand.getrandbits(33*8) ).decode("hex").encode("base64").strip() def newnonce(self): """ Generates a new nonce string. @rtype: str """ nonce_time = format_time(time.time()) nonce_value = ("%066X" % sysrand.getrandbits(33*8) ).decode("hex").encode("base64").strip() token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) token = md5(token).hexdigest() return "%s:%s:%s" % (nonce_time, nonce_value, token) def isnonce(self, nonce): """ Do a quick a stateless check for whether the provides string might be a nonce. @type nonce: str @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) token = md5(token).hexdigest() return nonce_hash == token def checknonce(self, nonce, qop, nc): """ Do a thorough check for whether the provided string is a nonce and increase usage count on returning True. @type nonce: str @type qop: str or None @type nc: str or None @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) token = md5(token).hexdigest() if token != nonce_hash: return False if nonce_time < format_time(time.time() - self.maxage): return False return True __all__.append("MemoryNonceStore") class MemoryNonceStore(NonceStoreBase): """Simple in-memory mechanism to store nonces.""" def __init__(self, maxage=300, maxuses=5): """ @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. @type maxuses: int @param maxuses: is the number of times a nonce may be used (with different nc values). A value of 1 makes nonces usable exactly once resulting in more requests. Defaults to 5. """ NonceStoreBase.__init__(self) self.maxage = maxage self.maxuses = maxuses self.nonces = [] # [(creation_time, nonce_value, useage_count)] # as [(float, str, int)] self.server_secret = ("%066X" % sysrand.getrandbits(33*8) ).decode("hex").encode("base64").strip() def _cleanup(self): """internal methods cleaning list of valid nonces""" old = format_time(time.time() - self.maxage) while self.nonces and self.nonces[0][0] < old: self.nonces.pop(0) def newnonce(self): """ Generates a new nonce string. @rtype: str """ self._cleanup() # avoid growing self.nonces nonce_time = format_time(time.time()) nonce_value = ("%066X" % sysrand.getrandbits(33*8) ).decode("hex").encode("base64").strip() self.nonces.append((nonce_time, nonce_value, 1)) token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) token = md5(token).hexdigest() return "%s:%s:%s" % (nonce_time, nonce_value, token) def isnonce(self, nonce): """ Do a quick a stateless check for whether the provides string might be a nonce. @type nonce: str @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) token = md5(token).hexdigest() return nonce_hash == token def checknonce(self, nonce, qop, nc): """ Do a thorough check for whether the provided string is a nonce and increase usage count on returning True. @type nonce: str @type qop: str or None @type nc: str or None @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) token = md5(token).hexdigest() if token != nonce_hash: return False if qop is None: nc = 1 else: try: nc = long(nc, 16) except (KeyError, ValueError): return False self._cleanup() # avoid stale nonces # searching nonce_time lower, upper = 0, len(self.nonces) - 1 while lower < upper: mid = (lower + upper) // 2 if nonce_time <= self.nonces[mid][0]: upper = mid else: lower = mid + 1 (nt, nv, uses) = self.nonces[lower] if nt != nonce_time or nv != nonce_value: return False if nc != uses: del self.nonces[lower] return False if uses >= self.maxuses: del self.nonces[lower] else: self.nonces[lower] = (nt, nv, uses+1) return True __all__.append("AuthDigestMiddleware") class AuthDigestMiddleware: """Middleware partly implementing RFC2617. (md5-sess was omited) This middleware does not work with cgi servers or fork servers like scgi.forkpool. """ algorithms = {"md5": lambda data: md5(data).hexdigest()} def __init__(self, app, gentoken, maxage=300, maxuses=5, store=None): """ @param app: is the wsgi application to be served with authentification. @type gentoken: str -> str @param gentoken: has to have the same functionality and interface as the AuthTokenGenerator class. @type maxage: int @param maxage: deprecated, see MemoryNonceStore or StatelessNonceStore @type maxuses: int @param maxuses: deprecated, see MemoryNonceStore @type store: NonceStoreBase @param store: a nonce storage implementation object. Usage of this parameter will override maxage and maxuses. """ self.app = app self.gentoken = gentoken if store is None: self.noncestore = MemoryNonceStore(maxage, maxuses) else: assert hasattr(store, "newnonce") assert hasattr(store, "isnonce") assert hasattr(store, "checknonce") self.noncestore = store def __call__(self, environ, start_response): """wsgi interface""" try: auth = environ["HTTP_AUTHORIZATION"] # raises KeyError method, rest = auth.split(' ', 1) # raises ValueError if method.lower() != "digest": raise AuthenticationRequired credentials = parse_digest_response(rest) # raises ValueError ### Check algorithm field credentials["algorithm"] = credentials.get("algorithm", "md5").lower() if not credentials["algorithm"] in self.algorithms: raise AuthenticationRequired ### Check uri field # Doing this by stripping known parts from the passed uri field # until something trivial remains, as the uri cannot be # reconstructed from the environment exactly. uri = credentials["uri"] # raises KeyError if "QUERY_STRING" in environ and environ["QUERY_STRING"]: if not uri.endswith(environ["QUERY_STRING"]): raise AuthenticationRequired uri = uri[:-len(environ["QUERY_STRING"])] if "SCRIPT_NAME" in environ: if not uri.startswith(environ["SCRIPT_NAME"]): raise AuthenticationRequired uri = uri[len(environ["SCRIPT_NAME"]):] if "PATH_INFO" in environ: if not uri.startswith(environ["PATH_INFO"]): raise AuthenticationRequired uri = uri[len(environ["PATH_INFO"]):] if uri not in ('', '?'): raise AuthenticationRequired del uri if ("username" not in credentials or "nonce" not in credentials or "response" not in credentials or "qop" in credentials and ( credentials["qop"] != "auth" or "nc" not in credentials or credentials["nc"].lower().strip("0123456789abcdef") or "cnonce" not in credentials)): raise AuthenticationRequired if not self.noncestore.isnonce(credentials["nonce"]): raise AuthenticationRequired # raises KeyError, ValueError response = self.auth_response(credentials, environ["REQUEST_METHOD"]) if response is None or response != credentials["response"]: raise AuthenticationRequired if not self.noncestore.checknonce(credentials["nonce"], credentials.get("qop"), credentials.get("nc")): return self.authorization_required(environ, start_response, stale=True) # stale nonce! except (KeyError, ValueError, AuthenticationRequired): return self.authorization_required(environ, start_response) else: environ["REMOTE_USER"] = credentials["username"] def modified_start_response(status, headers, exc_info=None): digest = dict(nextnonce=self.noncestore.newnonce()) if "qop" in credentials: digest["qop"] = "auth" digest["cnonce"] = credentials["cnonce"] # no KeyError digest["rspauth"] = self.auth_response(credentials, "") challenge = ", ".join(map('%s="%s"'.__mod__, digest.items())) headers.append(("Authentication-Info", challenge)) return start_response(status, headers, exc_info) return self.app(environ, modified_start_response) def auth_response(self, credentials, reqmethod): """internal method generating authentication tokens @raise KeyError: @raise ValueError: """ username = credentials["username"] algo = credentials["algorithm"] uri = credentials["uri"] nonce = credentials["nonce"] a1h = self.gentoken(username, algo) if a1h is None: raise ValueError a2 = "%s:%s" % (reqmethod, uri) a2h = self.algorithms[algo](a2) qop = credentials.get("qop", None) if qop is None: dig = ":".join((a1h, nonce, a2h)) else: nc = credentials["nc"] # raises KeyError cnonce = credentials["cnonce"] # raises KeyError if qop != "auth": return ValueError dig = ":".join((a1h, nonce, nc, cnonce, qop, a2h)) digh = self.algorithms[algo](dig) return digh def authorization_required(self, environ, start_response, stale=False): """internal method implementing wsgi interface, serving 401 page""" digest = dict(nonce=self.noncestore.newnonce(), realm=self.gentoken.realm, algorithm="md5", qop="auth") if stale: digest["stale"] = "TRUE" challenge = ", ".join(map('%s="%s"'.__mod__, digest.items())) status = "401 Not authorized" headers = [("Content-type", "text/html"), ("WWW-Authenticate", "Digest %s" % challenge)] data = "401 Not authorized

" data += "401 Not authorized

" headers.append(("Content-length", str(len(data)))) start_response(status, headers) if environ["REQUEST_METHOD"] == "HEAD": return [] return [data]