summaryrefslogtreecommitdiff
path: root/wsgitools/digest.py
diff options
context:
space:
mode:
Diffstat (limited to 'wsgitools/digest.py')
-rw-r--r--wsgitools/digest.py95
1 files changed, 55 insertions, 40 deletions
diff --git a/wsgitools/digest.py b/wsgitools/digest.py
index 5ed05f8..2f49ff7 100644
--- a/wsgitools/digest.py
+++ b/wsgitools/digest.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python2.5
"""
This module contains an C{AuthDigestMiddleware} for authenticating HTTP
requests using the method described in RFC2617. The credentials are to be
@@ -15,32 +14,39 @@ database using C{DBAPI2NonceStore}.
__all__ = []
import random
-try:
- from hashlib import md5
-except ImportError:
- from md5 import md5
-import binascii
import base64
+import hashlib
import time
import os
+from wsgitools.internal import bytes2str, str2bytes, textopen
from wsgitools.authentication import AuthenticationRequired, \
ProtocolViolation, AuthenticationMiddleware
sysrand = random.SystemRandom()
-def gen_rand_str(bytes=33):
+def md5hex(data):
+ """
+ @type data: str
+ @rtype: str
+ """
+ return hashlib.md5(str2bytes(data)).hexdigest()
+
+def gen_rand_str(bytesentropy=33):
"""
Generates a string of random base64 characters.
- @param bytes: is the number of random 8bit values to be used
+ @param bytesentropy: is the number of random 8bit values to be used
+ @rtype: str
>>> gen_rand_str() != gen_rand_str()
True
"""
- randnum = sysrand.getrandbits(bytes*8)
- randstr = ("%%0%dX" % (2*bytes)) % randnum
- randstr = binascii.unhexlify(randstr)
- randstr = base64.encodestring(randstr).strip()
+ randnum = sysrand.getrandbits(bytesentropy*8)
+ randstr = ("%%0%dX" % (2*bytesentropy)) % randnum
+ randbytes = str2bytes(randstr)
+ randbytes = base64.b16decode(randbytes)
+ randbytes = base64.b64encode(randbytes)
+ randstr = bytes2str(randbytes)
return randstr
def parse_digest_response(data):
@@ -121,6 +127,8 @@ def format_digest(mapping):
assert isinstance(mapping, dict)
result = []
for key, (value, needsquoting) in mapping.items():
+ assert isinstance(key, str)
+ assert isinstance(value, str)
if needsquoting:
value = '"%s"' % value.replace('\\', '\\\\').replace('"', '\\"')
else:
@@ -173,8 +181,8 @@ class AbstractTokenGenerator(object):
"""
assert isinstance(username, str)
assert isinstance(password, str)
- token = md5("%s:%s:%s" % (username, self.realm, password)).hexdigest()
- return token == self(username)
+ token = "%s:%s:%s" % (username, self.realm, password)
+ return md5hex(token) == self(username)
__all__.append("AuthTokenGenerator")
class AuthTokenGenerator(AbstractTokenGenerator):
@@ -200,7 +208,7 @@ class AuthTokenGenerator(AbstractTokenGenerator):
if password is None:
return None
a1 = "%s:%s:%s" % (username, self.realm, password)
- return md5(a1).hexdigest()
+ return md5hex(a1)
__all__.append("HtdigestTokenGenerator")
class HtdigestTokenGenerator(AbstractTokenGenerator):
@@ -231,18 +239,19 @@ class HtdigestTokenGenerator(AbstractTokenGenerator):
"""
assert isinstance(htdigestfile, str)
self.users = {}
- for line in file(htdigestfile):
- parts = line.rstrip("\n").split(":")
- if len(parts) != 3:
- if ignoreparseerrors:
+ with textopen(htdigestfile, "r") as htdigest:
+ for line in htdigest:
+ parts = line.rstrip("\n").split(":")
+ if len(parts) != 3:
+ if ignoreparseerrors:
+ continue
+ raise ValueError("invalid number of colons in htdigest file")
+ user, realm, token = parts
+ if realm != self.realm:
continue
- raise ValueError("invalid number of colons in htdigest file")
- user, realm, token = parts
- if realm != self.realm:
- continue
- if user in self.users and not ignoreparseerrors:
- raise ValueError("duplicate user in htdigest file")
- self.users[user] = token
+ if user in self.users and not ignoreparseerrors:
+ raise ValueError("duplicate user in htdigest file")
+ self.users[user] = token
def __call__(self, user, algo="md5"):
assert algo.lower() in ["md5", "md5-sess"]
@@ -259,7 +268,7 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
# modifications.
try:
self.statcache = os.stat(htdigestfile)
- except OSError, err:
+ except OSError as err:
raise IOError(str(err))
HtdigestTokenGenerator.__init__(self, realm, htdigestfile,
ignoreparseerrors)
@@ -276,7 +285,9 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
if self.statcache != statcache:
try:
self.readhtdigest(self.htdigestfile, self.ignoreparseerrors)
- except (IOError, ValueError):
+ except IOError:
+ return None
+ except ValueError:
return None
return HtdigestTokenGenerator.__call__(self, user, algo)
@@ -366,7 +377,7 @@ class StatelessNonceStore(NonceStoreBase):
token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret)
if ident is not None:
token = "%s:%s" % (token, ident)
- token = md5(token).hexdigest()
+ token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
def checknonce(self, nonce, count=1, ident=None):
@@ -386,7 +397,7 @@ class StatelessNonceStore(NonceStoreBase):
token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret)
if ident is not None:
token = "%s:%s" % (token, ident)
- token = md5(token).hexdigest()
+ token = md5hex(token)
if token != nonce_hash:
return False
@@ -448,7 +459,7 @@ class MemoryNonceStore(NonceStoreBase):
token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret)
if ident is not None:
token = "%s:%s" % (token, ident)
- token = md5(token).hexdigest()
+ token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
def checknonce(self, nonce, count=1, ident=None):
@@ -467,7 +478,7 @@ class MemoryNonceStore(NonceStoreBase):
token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret)
if ident is not None:
token = "%s:%s" % (token, ident)
- token = md5(token).hexdigest()
+ token = md5hex(token)
if token != nonce_hash:
return False
@@ -594,7 +605,7 @@ class DBAPI2NonceStore(NonceStoreBase):
token = "%s:%s" % (dbkey, self.server_secret)
if ident is not None:
token = "%s:%s" % (token, ident)
- token = md5(token).hexdigest()
+ token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
def checknonce(self, nonce, count=1, ident=None):
@@ -603,19 +614,22 @@ class DBAPI2NonceStore(NonceStoreBase):
count on returning True.
@type nonce: str
@type count: int
+ @type ident: str or None
@rtype: bool
"""
try:
nonce_time, nonce_value, nonce_hash = nonce.split(':')
except ValueError:
return False
- if not nonce_time.isalnum() or not nonce_value.replace("+", ""). \
- replace("/", "").replace("=", "").isalnum():
+ # use bytes.isalnum to avoid locale specific interpretation
+ if not str2bytes(nonce_time).isalnum() or \
+ not str2bytes(nonce_value.replace("+", "").replace("/", "") \
+ .replace("=", "")).isalnum():
return False
token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret)
if ident is not None:
token = "%s:%s" % (token, ident)
- token = md5(token).hexdigest()
+ token = md5hex(token)
if token != nonce_hash:
return False
@@ -680,7 +694,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
by a REMOTE_USER key before being passed to the wrapped
application."""
authorization_method = "digest"
- algorithms = {"md5": lambda data: md5(data).hexdigest()}
+ algorithms = {"md5": md5hex}
def __init__(self, app, gentoken, maxage=300, maxuses=5, store=None):
"""
@param app: is the wsgi application to be served with authentication.
@@ -707,6 +721,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
self.noncestore = store
def authenticate(self, auth, environ):
+ assert isinstance(auth, str)
try:
credentials = parse_digest_response(auth)
except ValueError:
@@ -724,7 +739,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
try:
nonce = credentials["nonce"]
credresponse = credentials["response"]
- except KeyError, err:
+ except KeyError as err:
raise ProtocolViolation("%s missing in credentials" %
err.args[0])
noncecount = 1
@@ -765,7 +780,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
username = credentials["username"]
algo = credentials["algorithm"]
uri = credentials["uri"]
- except KeyError, err:
+ except KeyError as err:
raise ProtocolViolation("%s missing in credentials" % err.args[0])
try:
dig = [credentials["nonce"]]
@@ -778,7 +793,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
try:
dig.append(credentials["nc"])
dig.append(credentials["cnonce"])
- except KeyError, err:
+ except KeyError as err:
raise ProtocolViolation(
"missing %s in credentials with qop=auth" % err.args[0])
dig.append(qop)