#!/usr/bin/env python2.5 __all__ = [] import random try: from hashlib import md5 except ImportError: from md5 import md5 import binascii import base64 import time try: long except NameError: long = int sysrand = random.SystemRandom() def gen_rand_str(bytes=33): randnum = sysrand.getrandbits(bytes*8) randstr = ("%%0%dX" % (2*bytes)) % randnum randstr = binascii.unhexlify(randstr) randstr = base64.encodestring(randstr).strip() return randstr def parse_digest_response(data, ret=dict()): """internal @raises ValueError: >>> parse_digest_response('foo=bar') {'foo': 'bar'} >>> parse_digest_response('foo="bar"') {'foo': 'bar'} >>> sorted(parse_digest_response('foo="bar=qux",spam=egg').items()) [('foo', 'bar=qux'), ('spam', 'egg')] >>> try: ... parse_digest_response('spam') ... except ValueError: ... print("ValueError") ValueError """ data = data.strip() key, rest = data.split('=', 1) # raises ValueError if rest.startswith('"'): rest = rest[1:] value, rest = rest.split('"', 1) # raises ValueError if not rest: ret[key] = value return ret if rest[0] != ',': raise ValueError("invalid digest response") rest = rest[1:] else: if ',' not in rest: ret[key] = rest return ret value, rest = rest.split(',' , 1) ret[key] = value return parse_digest_response(rest, ret) class AuthenticationRequired(Exception): pass __all__.append("AuthTokenGenerator") class AuthTokenGenerator: """Generates authentification tokens for L{AuthDigestMiddleware}. The interface consists of beeing callable with a username and having a realm attribute being a string.""" def __init__(self, realm, getpass): """ @type realm: str @param realm: is a string according to RFC2617. @type getpass: str -> str @param getpass: this function is called with a username and password is expected as result. C{None} may be used as an invalid password. """ self.realm = realm self.getpass = getpass def __call__(self, username, algo="md5"): """Generates an authentification token from a username. @type username: str @rtype: str """ assert algo.lower() in ["md5", "md5-sess"] password = self.getpass(username) if password is None: return None a1 = "%s:%s:%s" % (username, self.realm, password) return md5(a1).hexdigest() __all__.append("NonceStoreBase") class NonceStoreBase: """Nonce storage interface.""" def __init__(self): pass def newnonce(self, ident=None): """ This method is to be overriden and should return new nonces. @type ident: str @param ident: is an identifier to be associated with this nonce @rtype: str """ raise NotImplementedError def isnonce(self, nonce, ident=None): """ This method is to be overridden and should do a quick check for whether the given nonce has a chance to be a valid one. This function must not return false for a stale nonce. @type nonce: str @type ident: str @param ident: it is also checked that the nonce was associated to this identifier when given @rtype: bool """ raise NotImplementedError def checknonce(self, nonce, count=1, ident=None): """ This method is to be overridden and should do a thorough check for whether the given nonce is a valid as being used count times. @type nonce: str @type count: int @param count: indicates how often the nonce has been used (including this check) @type ident: str @param ident: it is also checked that the nonce was associated to this identifier when given @rtype: bool """ raise NotImplementedError def format_time(seconds): """ internal method formatting a unix time to a fixed-length string @type seconds: float @rtype: str """ return "%13X" % long(seconds * 1000000) __all__.append("StatelessNonceStore") class StatelessNonceStore(NonceStoreBase): """ This is a stateless nonce storage that cannot check the usage count for a nonce and thus cannot protect against replay attacks. It however can make it difficult by posing a timeout on nonces and making it difficult to forge nonces. This nonce store is usable with L{scgi.forkpool}. >>> s = StatelessNonceStore() >>> n = s.newnonce() >>> s.checknonce("spam") False >>> s.isnonce(n) True >>> s.checknonce(n) True >>> s.checknonce(n) True """ def __init__(self, maxage=300, secret=None): """ @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. @type secret: str @param secret: if not given, a secret is generated and is therefore shared after forks. Knowing this secret permits creating nonces. """ NonceStoreBase.__init__(self) self.maxage = maxage if secret: self.server_secret = secret else: self.server_secret = gen_rand_str() def newnonce(self, ident=None): """ Generates a new nonce string. @rtype: str """ nonce_time = format_time(time.time()) nonce_value = gen_rand_str() token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) if ident is not None: token = "%s:%s" % (token, ident) token = md5(token).hexdigest() return "%s:%s:%s" % (nonce_time, nonce_value, token) def isnonce(self, nonce, ident=None): """ Do a quick a stateless check for whether the provides string might be a nonce. @type nonce: str @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) if ident is not None: token = "%s:%s" % (token, ident) token = md5(token).hexdigest() return nonce_hash == token def checknonce(self, nonce, count=1, ident=None): """ Do a thorough check for whether the provided string is a nonce and increase usage count on returning True. @type nonce: str @type count: int @rtype: bool """ if count != 1: return False try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) if ident is not None: token = "%s:%s" % (token, ident) token = md5(token).hexdigest() if token != nonce_hash: return False if nonce_time < format_time(time.time() - self.maxage): return False return True __all__.append("MemoryNonceStore") class MemoryNonceStore(NonceStoreBase): """ Simple in-memory mechanism to store nonces. >>> s = MemoryNonceStore(maxuses=1) >>> n = s.newnonce() >>> s.checknonce("spam") False >>> s.isnonce(n) True >>> s.checknonce(n) True >>> s.checknonce(n) False """ def __init__(self, maxage=300, maxuses=5): """ @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. @type maxuses: int @param maxuses: is the number of times a nonce may be used (with different nc values). A value of 1 makes nonces usable exactly once resulting in more requests. Defaults to 5. """ NonceStoreBase.__init__(self) self.maxage = maxage self.maxuses = maxuses self.nonces = [] # [(creation_time, nonce_value, useage_count)] # as [(float, str, int)] self.server_secret = gen_rand_str() def _cleanup(self): """internal methods cleaning list of valid nonces""" old = format_time(time.time() - self.maxage) while self.nonces and self.nonces[0][0] < old: self.nonces.pop(0) def newnonce(self, ident=None): """ Generates a new nonce string. @rtype: str """ self._cleanup() # avoid growing self.nonces nonce_time = format_time(time.time()) nonce_value = gen_rand_str() self.nonces.append((nonce_time, nonce_value, 1)) token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) if ident is not None: token = "%s:%s" % (token, ident) token = md5(token).hexdigest() return "%s:%s:%s" % (nonce_time, nonce_value, token) def isnonce(self, nonce, ident=None): """ Do a quick a stateless check for whether the provides string might be a nonce. @type nonce: str @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) if ident is not None: token = "%s:%s" % (nonce, ident) token = md5(token).hexdigest() return nonce_hash == token def checknonce(self, nonce, count=1, ident=None): """ Do a thorough check for whether the provided string is a nonce and increase usage count on returning True. @type nonce: str @type count: int @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') except ValueError: return False token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) if ident is not None: token = "%s:%s" % (token, ident) token = md5(token).hexdigest() if token != nonce_hash: return False self._cleanup() # avoid stale nonces # searching nonce_time lower, upper = 0, len(self.nonces) - 1 while lower < upper: mid = (lower + upper) // 2 if nonce_time <= self.nonces[mid][0]: upper = mid else: lower = mid + 1 if len(self.nonces) <= lower: return False (nt, nv, uses) = self.nonces[lower] if nt != nonce_time or nv != nonce_value: return False if count != uses: del self.nonces[lower] return False if uses >= self.maxuses: del self.nonces[lower] else: self.nonces[lower] = (nt, nv, uses+1) return True __all__.append("AuthDigestMiddleware") class AuthDigestMiddleware: """Middleware partly implementing RFC2617. (md5-sess was omited)""" algorithms = {"md5": lambda data: md5(data).hexdigest()} def __init__(self, app, gentoken, maxage=300, maxuses=5, store=None): """ @param app: is the wsgi application to be served with authentification. @type gentoken: str -> str @param gentoken: has to have the same functionality and interface as the L{AuthTokenGenerator} class. @type maxage: int @param maxage: deprecated, see L{MemoryNonceStore} or L{StatelessNonceStore} @type maxuses: int @param maxuses: deprecated, see L{MemoryNonceStore} @type store: L{NonceStoreBase} @param store: a nonce storage implementation object. Usage of this parameter will override maxage and maxuses. """ self.app = app self.gentoken = gentoken if store is None: self.noncestore = MemoryNonceStore(maxage, maxuses) else: assert hasattr(store, "newnonce") assert hasattr(store, "isnonce") assert hasattr(store, "checknonce") self.noncestore = store def __call__(self, environ, start_response): """wsgi interface""" try: auth = environ["HTTP_AUTHORIZATION"] # raises KeyError method, rest = auth.split(' ', 1) # raises ValueError if method.lower() != "digest": raise AuthenticationRequired credentials = parse_digest_response(rest) # raises ValueError ### Check algorithm field credentials["algorithm"] = credentials.get("algorithm", "md5").lower() if not credentials["algorithm"] in self.algorithms: raise AuthenticationRequired ### Check uri field # Doing this by stripping known parts from the passed uri field # until something trivial remains, as the uri cannot be # reconstructed from the environment exactly. uri = credentials["uri"] # raises KeyError if "QUERY_STRING" in environ and environ["QUERY_STRING"]: if not uri.endswith(environ["QUERY_STRING"]): raise AuthenticationRequired uri = uri[:-len(environ["QUERY_STRING"])] if "SCRIPT_NAME" in environ: if not uri.startswith(environ["SCRIPT_NAME"]): raise AuthenticationRequired uri = uri[len(environ["SCRIPT_NAME"]):] if "PATH_INFO" in environ: if not uri.startswith(environ["PATH_INFO"]): raise AuthenticationRequired uri = uri[len(environ["PATH_INFO"]):] if uri not in ('', '?'): raise AuthenticationRequired del uri if ("username" not in credentials or "nonce" not in credentials or "response" not in credentials or "qop" in credentials and ( credentials["qop"] != "auth" or "nc" not in credentials or credentials["nc"].lower().strip("0123456789abcdef") or "cnonce" not in credentials)): raise AuthenticationRequired if not self.noncestore.isnonce(credentials["nonce"]): raise AuthenticationRequired # raises KeyError, ValueError response = self.auth_response(credentials, environ["REQUEST_METHOD"]) if response is None or response != credentials["response"]: raise AuthenticationRequired noncecount = 1 if credentials.get("qop") is not None: # raises ValueError noncecount = long(credentials["nc"], 16) if not self.noncestore.checknonce(credentials["nonce"], noncecount): return self.authorization_required(environ, start_response, stale=True) # stale nonce! except (KeyError, ValueError, AuthenticationRequired): return self.authorization_required(environ, start_response) else: environ["REMOTE_USER"] = credentials["username"] def modified_start_response(status, headers, exc_info=None): digest = dict(nextnonce=self.noncestore.newnonce()) if "qop" in credentials: digest["qop"] = "auth" digest["cnonce"] = credentials["cnonce"] # no KeyError digest["rspauth"] = self.auth_response(credentials, "") challenge = ", ".join(map('%s="%s"'.__mod__, digest.items())) headers.append(("Authentication-Info", challenge)) return start_response(status, headers, exc_info) return self.app(environ, modified_start_response) def auth_response(self, credentials, reqmethod): """internal method generating authentication tokens @raise KeyError: @raise ValueError: """ username = credentials["username"] algo = credentials["algorithm"] uri = credentials["uri"] nonce = credentials["nonce"] a1h = self.gentoken(username, algo) if a1h is None: raise ValueError a2h = self.algorithms[algo]("%s:%s" % (reqmethod, uri)) qop = credentials.get("qop", None) if qop is None: dig = ":".join((a1h, nonce, a2h)) else: if qop != "auth": return ValueError # raises KeyError dig = ":".join((a1h, nonce, credentials["nc"], credentials["cnonce"], qop, a2h)) return self.algorithms[algo](dig) def authorization_required(self, environ, start_response, stale=False): """internal method implementing wsgi interface, serving 401 page""" digest = dict(nonce=self.noncestore.newnonce(), realm=self.gentoken.realm, algorithm="md5", qop="auth") if stale: digest["stale"] = "TRUE" challenge = ", ".join(map('%s="%s"'.__mod__, digest.items())) status = "401 Not authorized" headers = [("Content-type", "text/html"), ("WWW-Authenticate", "Digest %s" % challenge)] data = "401 Not authorized

" data += "401 Not authorized

" headers.append(("Content-length", str(len(data)))) start_response(status, headers) if environ["REQUEST_METHOD"] == "HEAD": return [] return [data]