summaryrefslogtreecommitdiff
path: root/wsgitools/digest.py
diff options
context:
space:
mode:
Diffstat (limited to 'wsgitools/digest.py')
-rw-r--r--wsgitools/digest.py220
1 files changed, 109 insertions, 111 deletions
diff --git a/wsgitools/digest.py b/wsgitools/digest.py
index 6eb4cb3..18925df 100644
--- a/wsgitools/digest.py
+++ b/wsgitools/digest.py
@@ -23,25 +23,22 @@ except ImportError:
import random
sysrand = random.SystemRandom()
randbits = sysrand.getrandbits
- def compare_digest(a, b):
+ def compare_digest(a: str, b: str) -> bool:
return a == b
+import sys
+import typing
-from wsgitools.internal import bytes2str, str2bytes, textopen
+from wsgitools.internal import bytes2str, Environ, str2bytes, textopen, WsgiApp
from wsgitools.authentication import AuthenticationRequired, \
ProtocolViolation, AuthenticationMiddleware
-def md5hex(data):
- """
- @type data: str
- @rtype: str
- """
+def md5hex(data: str) -> str:
return hashlib.md5(str2bytes(data)).hexdigest()
-def gen_rand_str(bytesentropy=33):
+def gen_rand_str(bytesentropy: int = 33) -> str:
"""
Generates a string of random base64 characters.
@param bytesentropy: is the number of random 8bit values to be used
- @rtype: str
>>> gen_rand_str() != gen_rand_str()
True
@@ -53,7 +50,7 @@ def gen_rand_str(bytesentropy=33):
randstr = bytes2str(randbytes)
return randstr
-def parse_digest_response(data):
+def parse_digest_response(data: str) -> typing.Dict[str, str]:
"""internal
@raises ValueError:
@@ -118,13 +115,11 @@ def parse_digest_response(data):
value, data = data.split(',', 1)
result[key] = value
-def format_digest(mapping):
+def format_digest(mapping: typing.Dict[str, typing.Tuple[str, bool]]) -> str:
"""internal
- @type mapping: {str: (str, bool)}
@param mapping: a mapping of keys to values and a boolean that
determines whether the value needs quoting.
- @rtype: str
@note: the RFC specifies which values must be quoted and which must not be
quoted.
"""
@@ -150,38 +145,36 @@ class AbstractTokenGenerator:
L{AuthDigestMiddleware}.
@ivar realm: is a string according to RFC2617.
- @type realm: str
"""
- def __init__(self, realm):
- """
- @type realm: str
- """
+ realm: str
+ def __init__(self, realm: str):
assert isinstance(realm, str)
self.realm = realm
- def __call__(self, username, algo="md5"):
+ def __call__(
+ self, username: str, algo: str = "md5"
+ ) -> typing.Optional[str]:
"""Generates an authentication token from a username.
- @type username: str
- @type algo: str
@param algo: currently the only value supported by
L{AuthDigestMiddleware} is "md5"
- @rtype: str or None
@returns: a valid token or None to signal that authentication should
fail
"""
raise NotImplementedError
- def check_password(self, username, password, environ=None):
+ def check_password(
+ self,
+ username: str,
+ password: str,
+ environ: typing.Optional[Environ] = None,
+ ) -> bool:
"""
This function implements the interface for verifying passwords
used by L{BasicAuthMiddleware}. It works by computing a token
from the user and comparing it to the token returned by the
__call__ method.
- @type username: str
- @type password: str
@param environ: ignored
- @rtype: bool
"""
assert isinstance(username, str)
assert isinstance(password, str)
@@ -191,16 +184,26 @@ class AbstractTokenGenerator:
return False
return compare_digest(md5hex(token), expected)
+if sys.version_info >= (3, 11):
+ class TokenGenerator(typing.Protocol):
+ realm: str
+ def __call__(
+ self, username: str, algo: str = "md5"
+ ) -> typing.Optional[str]:
+ ...
+else:
+ TokenGenerator = typing.Callable[[str, str], typing.Optional[str]]
+
__all__.append("AuthTokenGenerator")
class AuthTokenGenerator(AbstractTokenGenerator):
"""Generates authentication tokens for L{AuthDigestMiddleware}. The
interface consists of beeing callable with a username and having a
realm attribute being a string."""
- def __init__(self, realm, getpass):
+ def __init__(
+ self, realm: str, getpass: typing.Callable[[str], typing.Optional[str]]
+ ):
"""
- @type realm: str
@param realm: is a string according to RFC2617.
- @type getpass: str -> (str or None)
@param getpass: this function is called with a username and password is
expected as result. C{None} may be used as an invalid password.
An example for getpass would be C{{username: password}.get}.
@@ -208,7 +211,9 @@ class AuthTokenGenerator(AbstractTokenGenerator):
AbstractTokenGenerator.__init__(self, realm)
self.getpass = getpass
- def __call__(self, username, algo="md5"):
+ def __call__(
+ self, username: str, algo: str = "md5"
+ ) -> typing.Optional[str]:
assert isinstance(username, str)
assert algo.lower() in ["md5", "md5-sess"]
password = self.getpass(username)
@@ -222,12 +227,13 @@ class HtdigestTokenGenerator(AbstractTokenGenerator):
"""Reads authentication tokens for L{AuthDigestMiddleware} from an
apache htdigest file.
"""
- def __init__(self, realm, htdigestfile, ignoreparseerrors=False):
+ users: typing.Dict[str, str]
+
+ def __init__(
+ self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False
+ ):
"""
- @type realm: str
- @type htdigestfile: str
@param htdigestfile: path to the .htdigest file
- @type ignoreparseerrors: bool
@param ignoreparseerrors: passed to readhtdigest
@raises IOError:
@raises ValueError:
@@ -236,10 +242,10 @@ class HtdigestTokenGenerator(AbstractTokenGenerator):
self.users = {}
self.readhtdigest(htdigestfile, ignoreparseerrors)
- def readhtdigest(self, htdigestfile, ignoreparseerrors=False):
+ def readhtdigest(
+ self, htdigestfile: str, ignoreparseerrors: bool = False
+ ) -> None:
"""
- @type htdigestfile: str
- @type ignoreparseerrors: bool
@param ignoreparseerrors: do not raise ValueErrors for bad files
@raises IOError:
@raises ValueError:
@@ -260,7 +266,7 @@ class HtdigestTokenGenerator(AbstractTokenGenerator):
raise ValueError("duplicate user in htdigest file")
self.users[user] = token
- def __call__(self, user, algo="md5"):
+ def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]:
assert algo.lower() in ["md5", "md5-sess"]
return self.users.get(user)
@@ -269,7 +275,9 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
"""Behaves like L{HtdigestTokenGenerator}, checks the htdigest file
for changes on each invocation.
"""
- def __init__(self, realm, htdigestfile, ignoreparseerrors=False):
+ def __init__(
+ self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False
+ ):
assert isinstance(htdigestfile, str)
# Need to stat the file before calling parent ctor to detect
# modifications.
@@ -282,7 +290,7 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
self.htdigestfile = htdigestfile
self.ignoreparseerrors = ignoreparseerrors
- def __call__(self, user, algo="md5"):
+ def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]:
# The interface does not permit raising exceptions, so all we can do is
# fail by returning None.
try:
@@ -301,36 +309,30 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
__all__.append("NonceStoreBase")
class NonceStoreBase:
"""Nonce storage interface."""
- def __init__(self):
+ def __init__(self) -> None:
pass
- def newnonce(self, ident=None):
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
"""
This method is to be overriden and should return new nonces.
- @type ident: str
@param ident: is an identifier to be associated with this nonce
- @rtype: str
"""
raise NotImplementedError
- def checknonce(self, nonce, count=1, ident=None):
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
"""
This method is to be overridden and should do a check for whether the
given nonce is valid as being used count times.
- @type nonce: str
- @type count: int
@param count: indicates how often the nonce has been used (including
this check)
- @type ident: str
@param ident: it is also checked that the nonce was associated to this
identifier when given
- @rtype: bool
"""
raise NotImplementedError
-def format_time(seconds):
+def format_time(seconds: float) -> str:
"""
internal method formatting a unix time to a fixed-length string
- @type seconds: float
- @rtype: str
"""
# the overflow will happen about 2112
return "%013X" % int(seconds * 1000000)
@@ -356,13 +358,11 @@ class StatelessNonceStore(NonceStoreBase):
>>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash")
False
"""
- def __init__(self, maxage=300, secret=None):
+ def __init__(self, maxage: int = 300, secret: typing.Optional[str] = None):
"""
- @type maxage: int
@param maxage: is the number of seconds a nonce may be valid. Choosing a
large value may result in more memory usage whereas a smaller
value results in more requests. Defaults to 5 minutes.
- @type secret: str
@param secret: if not given, a secret is generated and is therefore
shared after forks. Knowing this secret permits creating nonces.
"""
@@ -373,12 +373,8 @@ class StatelessNonceStore(NonceStoreBase):
else:
self.server_secret = gen_rand_str()
- def newnonce(self, ident=None):
- """
- Generates a new nonce string.
- @type ident: None or str
- @rtype: str
- """
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
+ """Generates a new nonce string."""
nonce_time = format_time(time.time())
nonce_value = gen_rand_str()
token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret)
@@ -387,14 +383,10 @@ class StatelessNonceStore(NonceStoreBase):
token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
- def checknonce(self, nonce, count=1, ident=None):
- """
- Check whether the provided string is a nonce.
- @type nonce: str
- @type count: int
- @type ident: None or str
- @rtype: bool
- """
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
+ """Check whether the provided string is a nonce."""
if count != 1:
return False
try:
@@ -429,13 +421,13 @@ class MemoryNonceStore(NonceStoreBase):
>>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash")
False
"""
- def __init__(self, maxage=300, maxuses=5):
+ nonces: typing.List[typing.Tuple[str, str, int]]
+
+ def __init__(self, maxage: int = 300, maxuses: int = 5):
"""
- @type maxage: int
@param maxage: is the number of seconds a nonce may be valid. Choosing a
large value may result in more memory usage whereas a smaller
value results in more requests. Defaults to 5 minutes.
- @type maxuses: int
@param maxuses: is the number of times a nonce may be used (with
different nc values). A value of 1 makes nonces usable exactly
once resulting in more requests. Defaults to 5.
@@ -447,18 +439,14 @@ class MemoryNonceStore(NonceStoreBase):
# as [(str (hex encoded), str, int)]
self.server_secret = gen_rand_str()
- def _cleanup(self):
+ def _cleanup(self) -> None:
"""internal methods cleaning list of valid nonces"""
old = format_time(time.time() - self.maxage)
while self.nonces and self.nonces[0][0] < old:
self.nonces.pop(0)
- def newnonce(self, ident=None):
- """
- Generates a new nonce string.
- @type ident: None or str
- @rtype: str
- """
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
+ """Generates a new nonce string."""
self._cleanup() # avoid growing self.nonces
nonce_time = format_time(time.time())
nonce_value = gen_rand_str()
@@ -469,14 +457,12 @@ class MemoryNonceStore(NonceStoreBase):
token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
- def checknonce(self, nonce, count=1, ident=None):
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
"""
Do a check for whether the provided string is a nonce and increase usage
count on returning True.
- @type nonce: str
- @type count: int
- @type ident: None or str
- @rtype: bool
"""
try:
nonce_time, nonce_value, nonce_hash = nonce.split(':')
@@ -522,19 +508,28 @@ class LazyDBAPI2Opener:
because this way each worker child opens a new database connection when
the first request is to be answered.
"""
- def __init__(self, function, *args, **kwargs):
+ _function: typing.Optional[typing.Callable[..., typing.Any]]
+ def __init__(
+ self,
+ function: typing.Callable[..., typing.Any],
+ *args,
+ **kwargs,
+ ):
"""
The database will be connected on the first method call. This is done
by calling the given function with the remaining parameters.
@param function: is the function that connects to the database
"""
self._function = function
- self._args = args
- self._kwargs = kwargs
+ self._args: typing.Optional[typing.Tuple[typing.Any, ...]] = args
+ self._kwargs: typing.Optional[typing.Dict[str, typing.Any]] = kwargs
self._dbhandle = None
- def _getdbhandle(self):
+ def _getdbhandle(self) -> typing.Any:
"""Returns an open database connection. Open if necessary."""
if self._dbhandle is None:
+ assert self._function is not None
+ assert self._args is not None
+ assert self._kwargs is not None
self._dbhandle = self._function(*self._args, **self._kwargs)
self._function = self._args = self._kwargs = None
return self._dbhandle
@@ -573,14 +568,14 @@ class DBAPI2NonceStore(NonceStoreBase):
>>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash")
False
"""
- def __init__(self, dbhandle, maxage=300, maxuses=5, table="nonces"):
+ def __init__(
+ self, dbhandle, maxage: int = 300, maxuses: int = 5, table="nonces"
+ ):
"""
@param dbhandle: is a dbapi2 connection
- @type maxage: int
@param maxage: is the number of seconds a nonce may be valid. Choosing a
large value may result in more memory usage whereas a smaller
value results in more requests. Defaults to 5 minutes.
- @type maxuses: int
@param maxuses: is the number of times a nonce may be used (with
different nc values). A value of 1 makes nonces usable exactly
once resulting in more requests. Defaults to 5.
@@ -592,16 +587,13 @@ class DBAPI2NonceStore(NonceStoreBase):
self.table = table
self.server_secret = gen_rand_str()
- def _cleanup(self, cur):
+ def _cleanup(self, cur) -> None:
"""internal methods cleaning list of valid nonces"""
old = format_time(time.time() - self.maxage)
cur.execute("DELETE FROM %s WHERE key < '%s:';" % (self.table, old))
- def newnonce(self, ident=None):
- """
- Generates a new nonce string.
- @rtype: str
- """
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
+ """Generates a new nonce string."""
nonce_time = format_time(time.time())
nonce_value = gen_rand_str()
dbkey = "%s:%s" % (nonce_time, nonce_value)
@@ -615,14 +607,12 @@ class DBAPI2NonceStore(NonceStoreBase):
token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
- def checknonce(self, nonce, count=1, ident=None):
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
"""
Do a check for whether the provided string is a nonce and increase usage
count on returning True.
- @type nonce: str
- @type count: int
- @type ident: str or None
- @rtype: bool
"""
try:
nonce_time, nonce_value, nonce_hash = nonce.split(':')
@@ -668,7 +658,7 @@ class DBAPI2NonceStore(NonceStoreBase):
self.dbhandle.commit()
return True
-def check_uri(credentials, environ):
+def check_uri(credentials: typing.Dict[str, str], environ: Environ) -> None:
"""internal method for verifying the uri credential
@raises AuthenticationRequired:
"""
@@ -706,19 +696,23 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
application."""
authorization_method = "digest"
algorithms = {"md5": md5hex}
- def __init__(self, app, gentoken, maxage=300, maxuses=5, store=None):
+ noncestore: NonceStoreBase
+ def __init__(
+ self,
+ app: WsgiApp,
+ gentoken: TokenGenerator,
+ maxage: int = 300,
+ maxuses: int = 5,
+ store: typing.Optional[NonceStoreBase] = None,
+ ):
"""
@param app: is the wsgi application to be served with authentication.
- @type gentoken: str -> (str or None)
@param gentoken: has to have the same functionality and interface as the
L{AuthTokenGenerator} class.
- @type maxage: int
@param maxage: deprecated, see L{MemoryNonceStore} or
L{StatelessNonceStore} and pass an instance to store
- @type maxuses: int
@param maxuses: deprecated, see L{MemoryNonceStore} and pass an
instance to store
- @type store: L{NonceStoreBase}
@param store: a nonce storage implementation object. Usage of this
parameter will override maxage and maxuses.
"""
@@ -731,7 +725,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
assert hasattr(store, "checknonce")
self.noncestore = store
- def authenticate(self, auth, environ):
+ def authenticate(self, auth: str, environ: Environ) -> Environ:
assert isinstance(auth, str)
try:
credentials = parse_digest_response(auth)
@@ -783,7 +777,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
return dict(user=credentials["username"],
outheaders=[("Authentication-Info", format_digest(digest))])
- def auth_response(self, credentials, reqmethod):
+ def auth_response(
+ self, credentials: typing.Dict[str, str], reqmethod: str
+ ) -> typing.Optional[str]:
"""internal method generating authentication tokens
@raises AuthenticationRequired:
"""
@@ -818,7 +814,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
dig.insert(0, a1h)
return self.algorithms[algo](":".join(dig))
- def www_authenticate(self, exception):
+ def www_authenticate(
+ self, exception: AuthenticationRequired
+ ) -> typing.Tuple[str, str]:
digest = dict(nonce=(self.noncestore.newnonce(), True),
realm=(self.gentoken.realm, True),
algorithm=("MD5", False),