summaryrefslogtreecommitdiff
path: root/wsgitools/digest.py
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2020-04-13 21:30:34 +0200
committerHelmut Grohne <helmut@subdivi.de>2023-06-18 23:16:57 +0200
commita41066b413489b407b9d99174af697563ad680b9 (patch)
tree2f08f9e886e13a7500d1eb527e30737d961deab6 /wsgitools/digest.py
parent4d52eaa4801df3f3169df8e58758bcccf22dc4de (diff)
downloadwsgitools-a41066b413489b407b9d99174af697563ad680b9.tar.gz
add type hints to all of the code
In order to use type hint syntax, we need to bump the minimum Python version to 3.7 and some of the features such as Literal and Protocol are opted in when a sufficiently recent Python is available. This does not make all of the code pass type checking with mypy. A number of typing issues remain, but the output of mypy becomes something one can read through. In adding type hints, a lot of epydoc @type annotations are removed as redundant. This update also adopts black-style line breaking.
Diffstat (limited to 'wsgitools/digest.py')
-rw-r--r--wsgitools/digest.py220
1 files changed, 109 insertions, 111 deletions
diff --git a/wsgitools/digest.py b/wsgitools/digest.py
index 6eb4cb3..18925df 100644
--- a/wsgitools/digest.py
+++ b/wsgitools/digest.py
@@ -23,25 +23,22 @@ except ImportError:
import random
sysrand = random.SystemRandom()
randbits = sysrand.getrandbits
- def compare_digest(a, b):
+ def compare_digest(a: str, b: str) -> bool:
return a == b
+import sys
+import typing
-from wsgitools.internal import bytes2str, str2bytes, textopen
+from wsgitools.internal import bytes2str, Environ, str2bytes, textopen, WsgiApp
from wsgitools.authentication import AuthenticationRequired, \
ProtocolViolation, AuthenticationMiddleware
-def md5hex(data):
- """
- @type data: str
- @rtype: str
- """
+def md5hex(data: str) -> str:
return hashlib.md5(str2bytes(data)).hexdigest()
-def gen_rand_str(bytesentropy=33):
+def gen_rand_str(bytesentropy: int = 33) -> str:
"""
Generates a string of random base64 characters.
@param bytesentropy: is the number of random 8bit values to be used
- @rtype: str
>>> gen_rand_str() != gen_rand_str()
True
@@ -53,7 +50,7 @@ def gen_rand_str(bytesentropy=33):
randstr = bytes2str(randbytes)
return randstr
-def parse_digest_response(data):
+def parse_digest_response(data: str) -> typing.Dict[str, str]:
"""internal
@raises ValueError:
@@ -118,13 +115,11 @@ def parse_digest_response(data):
value, data = data.split(',', 1)
result[key] = value
-def format_digest(mapping):
+def format_digest(mapping: typing.Dict[str, typing.Tuple[str, bool]]) -> str:
"""internal
- @type mapping: {str: (str, bool)}
@param mapping: a mapping of keys to values and a boolean that
determines whether the value needs quoting.
- @rtype: str
@note: the RFC specifies which values must be quoted and which must not be
quoted.
"""
@@ -150,38 +145,36 @@ class AbstractTokenGenerator:
L{AuthDigestMiddleware}.
@ivar realm: is a string according to RFC2617.
- @type realm: str
"""
- def __init__(self, realm):
- """
- @type realm: str
- """
+ realm: str
+ def __init__(self, realm: str):
assert isinstance(realm, str)
self.realm = realm
- def __call__(self, username, algo="md5"):
+ def __call__(
+ self, username: str, algo: str = "md5"
+ ) -> typing.Optional[str]:
"""Generates an authentication token from a username.
- @type username: str
- @type algo: str
@param algo: currently the only value supported by
L{AuthDigestMiddleware} is "md5"
- @rtype: str or None
@returns: a valid token or None to signal that authentication should
fail
"""
raise NotImplementedError
- def check_password(self, username, password, environ=None):
+ def check_password(
+ self,
+ username: str,
+ password: str,
+ environ: typing.Optional[Environ] = None,
+ ) -> bool:
"""
This function implements the interface for verifying passwords
used by L{BasicAuthMiddleware}. It works by computing a token
from the user and comparing it to the token returned by the
__call__ method.
- @type username: str
- @type password: str
@param environ: ignored
- @rtype: bool
"""
assert isinstance(username, str)
assert isinstance(password, str)
@@ -191,16 +184,26 @@ class AbstractTokenGenerator:
return False
return compare_digest(md5hex(token), expected)
+if sys.version_info >= (3, 11):
+ class TokenGenerator(typing.Protocol):
+ realm: str
+ def __call__(
+ self, username: str, algo: str = "md5"
+ ) -> typing.Optional[str]:
+ ...
+else:
+ TokenGenerator = typing.Callable[[str, str], typing.Optional[str]]
+
__all__.append("AuthTokenGenerator")
class AuthTokenGenerator(AbstractTokenGenerator):
"""Generates authentication tokens for L{AuthDigestMiddleware}. The
interface consists of beeing callable with a username and having a
realm attribute being a string."""
- def __init__(self, realm, getpass):
+ def __init__(
+ self, realm: str, getpass: typing.Callable[[str], typing.Optional[str]]
+ ):
"""
- @type realm: str
@param realm: is a string according to RFC2617.
- @type getpass: str -> (str or None)
@param getpass: this function is called with a username and password is
expected as result. C{None} may be used as an invalid password.
An example for getpass would be C{{username: password}.get}.
@@ -208,7 +211,9 @@ class AuthTokenGenerator(AbstractTokenGenerator):
AbstractTokenGenerator.__init__(self, realm)
self.getpass = getpass
- def __call__(self, username, algo="md5"):
+ def __call__(
+ self, username: str, algo: str = "md5"
+ ) -> typing.Optional[str]:
assert isinstance(username, str)
assert algo.lower() in ["md5", "md5-sess"]
password = self.getpass(username)
@@ -222,12 +227,13 @@ class HtdigestTokenGenerator(AbstractTokenGenerator):
"""Reads authentication tokens for L{AuthDigestMiddleware} from an
apache htdigest file.
"""
- def __init__(self, realm, htdigestfile, ignoreparseerrors=False):
+ users: typing.Dict[str, str]
+
+ def __init__(
+ self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False
+ ):
"""
- @type realm: str
- @type htdigestfile: str
@param htdigestfile: path to the .htdigest file
- @type ignoreparseerrors: bool
@param ignoreparseerrors: passed to readhtdigest
@raises IOError:
@raises ValueError:
@@ -236,10 +242,10 @@ class HtdigestTokenGenerator(AbstractTokenGenerator):
self.users = {}
self.readhtdigest(htdigestfile, ignoreparseerrors)
- def readhtdigest(self, htdigestfile, ignoreparseerrors=False):
+ def readhtdigest(
+ self, htdigestfile: str, ignoreparseerrors: bool = False
+ ) -> None:
"""
- @type htdigestfile: str
- @type ignoreparseerrors: bool
@param ignoreparseerrors: do not raise ValueErrors for bad files
@raises IOError:
@raises ValueError:
@@ -260,7 +266,7 @@ class HtdigestTokenGenerator(AbstractTokenGenerator):
raise ValueError("duplicate user in htdigest file")
self.users[user] = token
- def __call__(self, user, algo="md5"):
+ def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]:
assert algo.lower() in ["md5", "md5-sess"]
return self.users.get(user)
@@ -269,7 +275,9 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
"""Behaves like L{HtdigestTokenGenerator}, checks the htdigest file
for changes on each invocation.
"""
- def __init__(self, realm, htdigestfile, ignoreparseerrors=False):
+ def __init__(
+ self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False
+ ):
assert isinstance(htdigestfile, str)
# Need to stat the file before calling parent ctor to detect
# modifications.
@@ -282,7 +290,7 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
self.htdigestfile = htdigestfile
self.ignoreparseerrors = ignoreparseerrors
- def __call__(self, user, algo="md5"):
+ def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]:
# The interface does not permit raising exceptions, so all we can do is
# fail by returning None.
try:
@@ -301,36 +309,30 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator):
__all__.append("NonceStoreBase")
class NonceStoreBase:
"""Nonce storage interface."""
- def __init__(self):
+ def __init__(self) -> None:
pass
- def newnonce(self, ident=None):
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
"""
This method is to be overriden and should return new nonces.
- @type ident: str
@param ident: is an identifier to be associated with this nonce
- @rtype: str
"""
raise NotImplementedError
- def checknonce(self, nonce, count=1, ident=None):
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
"""
This method is to be overridden and should do a check for whether the
given nonce is valid as being used count times.
- @type nonce: str
- @type count: int
@param count: indicates how often the nonce has been used (including
this check)
- @type ident: str
@param ident: it is also checked that the nonce was associated to this
identifier when given
- @rtype: bool
"""
raise NotImplementedError
-def format_time(seconds):
+def format_time(seconds: float) -> str:
"""
internal method formatting a unix time to a fixed-length string
- @type seconds: float
- @rtype: str
"""
# the overflow will happen about 2112
return "%013X" % int(seconds * 1000000)
@@ -356,13 +358,11 @@ class StatelessNonceStore(NonceStoreBase):
>>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash")
False
"""
- def __init__(self, maxage=300, secret=None):
+ def __init__(self, maxage: int = 300, secret: typing.Optional[str] = None):
"""
- @type maxage: int
@param maxage: is the number of seconds a nonce may be valid. Choosing a
large value may result in more memory usage whereas a smaller
value results in more requests. Defaults to 5 minutes.
- @type secret: str
@param secret: if not given, a secret is generated and is therefore
shared after forks. Knowing this secret permits creating nonces.
"""
@@ -373,12 +373,8 @@ class StatelessNonceStore(NonceStoreBase):
else:
self.server_secret = gen_rand_str()
- def newnonce(self, ident=None):
- """
- Generates a new nonce string.
- @type ident: None or str
- @rtype: str
- """
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
+ """Generates a new nonce string."""
nonce_time = format_time(time.time())
nonce_value = gen_rand_str()
token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret)
@@ -387,14 +383,10 @@ class StatelessNonceStore(NonceStoreBase):
token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
- def checknonce(self, nonce, count=1, ident=None):
- """
- Check whether the provided string is a nonce.
- @type nonce: str
- @type count: int
- @type ident: None or str
- @rtype: bool
- """
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
+ """Check whether the provided string is a nonce."""
if count != 1:
return False
try:
@@ -429,13 +421,13 @@ class MemoryNonceStore(NonceStoreBase):
>>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash")
False
"""
- def __init__(self, maxage=300, maxuses=5):
+ nonces: typing.List[typing.Tuple[str, str, int]]
+
+ def __init__(self, maxage: int = 300, maxuses: int = 5):
"""
- @type maxage: int
@param maxage: is the number of seconds a nonce may be valid. Choosing a
large value may result in more memory usage whereas a smaller
value results in more requests. Defaults to 5 minutes.
- @type maxuses: int
@param maxuses: is the number of times a nonce may be used (with
different nc values). A value of 1 makes nonces usable exactly
once resulting in more requests. Defaults to 5.
@@ -447,18 +439,14 @@ class MemoryNonceStore(NonceStoreBase):
# as [(str (hex encoded), str, int)]
self.server_secret = gen_rand_str()
- def _cleanup(self):
+ def _cleanup(self) -> None:
"""internal methods cleaning list of valid nonces"""
old = format_time(time.time() - self.maxage)
while self.nonces and self.nonces[0][0] < old:
self.nonces.pop(0)
- def newnonce(self, ident=None):
- """
- Generates a new nonce string.
- @type ident: None or str
- @rtype: str
- """
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
+ """Generates a new nonce string."""
self._cleanup() # avoid growing self.nonces
nonce_time = format_time(time.time())
nonce_value = gen_rand_str()
@@ -469,14 +457,12 @@ class MemoryNonceStore(NonceStoreBase):
token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
- def checknonce(self, nonce, count=1, ident=None):
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
"""
Do a check for whether the provided string is a nonce and increase usage
count on returning True.
- @type nonce: str
- @type count: int
- @type ident: None or str
- @rtype: bool
"""
try:
nonce_time, nonce_value, nonce_hash = nonce.split(':')
@@ -522,19 +508,28 @@ class LazyDBAPI2Opener:
because this way each worker child opens a new database connection when
the first request is to be answered.
"""
- def __init__(self, function, *args, **kwargs):
+ _function: typing.Optional[typing.Callable[..., typing.Any]]
+ def __init__(
+ self,
+ function: typing.Callable[..., typing.Any],
+ *args,
+ **kwargs,
+ ):
"""
The database will be connected on the first method call. This is done
by calling the given function with the remaining parameters.
@param function: is the function that connects to the database
"""
self._function = function
- self._args = args
- self._kwargs = kwargs
+ self._args: typing.Optional[typing.Tuple[typing.Any, ...]] = args
+ self._kwargs: typing.Optional[typing.Dict[str, typing.Any]] = kwargs
self._dbhandle = None
- def _getdbhandle(self):
+ def _getdbhandle(self) -> typing.Any:
"""Returns an open database connection. Open if necessary."""
if self._dbhandle is None:
+ assert self._function is not None
+ assert self._args is not None
+ assert self._kwargs is not None
self._dbhandle = self._function(*self._args, **self._kwargs)
self._function = self._args = self._kwargs = None
return self._dbhandle
@@ -573,14 +568,14 @@ class DBAPI2NonceStore(NonceStoreBase):
>>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash")
False
"""
- def __init__(self, dbhandle, maxage=300, maxuses=5, table="nonces"):
+ def __init__(
+ self, dbhandle, maxage: int = 300, maxuses: int = 5, table="nonces"
+ ):
"""
@param dbhandle: is a dbapi2 connection
- @type maxage: int
@param maxage: is the number of seconds a nonce may be valid. Choosing a
large value may result in more memory usage whereas a smaller
value results in more requests. Defaults to 5 minutes.
- @type maxuses: int
@param maxuses: is the number of times a nonce may be used (with
different nc values). A value of 1 makes nonces usable exactly
once resulting in more requests. Defaults to 5.
@@ -592,16 +587,13 @@ class DBAPI2NonceStore(NonceStoreBase):
self.table = table
self.server_secret = gen_rand_str()
- def _cleanup(self, cur):
+ def _cleanup(self, cur) -> None:
"""internal methods cleaning list of valid nonces"""
old = format_time(time.time() - self.maxage)
cur.execute("DELETE FROM %s WHERE key < '%s:';" % (self.table, old))
- def newnonce(self, ident=None):
- """
- Generates a new nonce string.
- @rtype: str
- """
+ def newnonce(self, ident: typing.Optional[str] = None) -> str:
+ """Generates a new nonce string."""
nonce_time = format_time(time.time())
nonce_value = gen_rand_str()
dbkey = "%s:%s" % (nonce_time, nonce_value)
@@ -615,14 +607,12 @@ class DBAPI2NonceStore(NonceStoreBase):
token = md5hex(token)
return "%s:%s:%s" % (nonce_time, nonce_value, token)
- def checknonce(self, nonce, count=1, ident=None):
+ def checknonce(
+ self, nonce: str, count: int = 1, ident: typing.Optional[str] = None
+ ) -> bool:
"""
Do a check for whether the provided string is a nonce and increase usage
count on returning True.
- @type nonce: str
- @type count: int
- @type ident: str or None
- @rtype: bool
"""
try:
nonce_time, nonce_value, nonce_hash = nonce.split(':')
@@ -668,7 +658,7 @@ class DBAPI2NonceStore(NonceStoreBase):
self.dbhandle.commit()
return True
-def check_uri(credentials, environ):
+def check_uri(credentials: typing.Dict[str, str], environ: Environ) -> None:
"""internal method for verifying the uri credential
@raises AuthenticationRequired:
"""
@@ -706,19 +696,23 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
application."""
authorization_method = "digest"
algorithms = {"md5": md5hex}
- def __init__(self, app, gentoken, maxage=300, maxuses=5, store=None):
+ noncestore: NonceStoreBase
+ def __init__(
+ self,
+ app: WsgiApp,
+ gentoken: TokenGenerator,
+ maxage: int = 300,
+ maxuses: int = 5,
+ store: typing.Optional[NonceStoreBase] = None,
+ ):
"""
@param app: is the wsgi application to be served with authentication.
- @type gentoken: str -> (str or None)
@param gentoken: has to have the same functionality and interface as the
L{AuthTokenGenerator} class.
- @type maxage: int
@param maxage: deprecated, see L{MemoryNonceStore} or
L{StatelessNonceStore} and pass an instance to store
- @type maxuses: int
@param maxuses: deprecated, see L{MemoryNonceStore} and pass an
instance to store
- @type store: L{NonceStoreBase}
@param store: a nonce storage implementation object. Usage of this
parameter will override maxage and maxuses.
"""
@@ -731,7 +725,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
assert hasattr(store, "checknonce")
self.noncestore = store
- def authenticate(self, auth, environ):
+ def authenticate(self, auth: str, environ: Environ) -> Environ:
assert isinstance(auth, str)
try:
credentials = parse_digest_response(auth)
@@ -783,7 +777,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
return dict(user=credentials["username"],
outheaders=[("Authentication-Info", format_digest(digest))])
- def auth_response(self, credentials, reqmethod):
+ def auth_response(
+ self, credentials: typing.Dict[str, str], reqmethod: str
+ ) -> typing.Optional[str]:
"""internal method generating authentication tokens
@raises AuthenticationRequired:
"""
@@ -818,7 +814,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware):
dig.insert(0, a1h)
return self.algorithms[algo](":".join(dig))
- def www_authenticate(self, exception):
+ def www_authenticate(
+ self, exception: AuthenticationRequired
+ ) -> typing.Tuple[str, str]:
digest = dict(nonce=(self.noncestore.newnonce(), True),
realm=(self.gentoken.realm, True),
algorithm=("MD5", False),