From a41066b413489b407b9d99174af697563ad680b9 Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Mon, 13 Apr 2020 21:30:34 +0200 Subject: add type hints to all of the code In order to use type hint syntax, we need to bump the minimum Python version to 3.7 and some of the features such as Literal and Protocol are opted in when a sufficiently recent Python is available. This does not make all of the code pass type checking with mypy. A number of typing issues remain, but the output of mypy becomes something one can read through. In adding type hints, a lot of epydoc @type annotations are removed as redundant. This update also adopts black-style line breaking. --- wsgitools/digest.py | 220 ++++++++++++++++++++++++++-------------------------- 1 file changed, 109 insertions(+), 111 deletions(-) (limited to 'wsgitools/digest.py') diff --git a/wsgitools/digest.py b/wsgitools/digest.py index 6eb4cb3..18925df 100644 --- a/wsgitools/digest.py +++ b/wsgitools/digest.py @@ -23,25 +23,22 @@ except ImportError: import random sysrand = random.SystemRandom() randbits = sysrand.getrandbits - def compare_digest(a, b): + def compare_digest(a: str, b: str) -> bool: return a == b +import sys +import typing -from wsgitools.internal import bytes2str, str2bytes, textopen +from wsgitools.internal import bytes2str, Environ, str2bytes, textopen, WsgiApp from wsgitools.authentication import AuthenticationRequired, \ ProtocolViolation, AuthenticationMiddleware -def md5hex(data): - """ - @type data: str - @rtype: str - """ +def md5hex(data: str) -> str: return hashlib.md5(str2bytes(data)).hexdigest() -def gen_rand_str(bytesentropy=33): +def gen_rand_str(bytesentropy: int = 33) -> str: """ Generates a string of random base64 characters. @param bytesentropy: is the number of random 8bit values to be used - @rtype: str >>> gen_rand_str() != gen_rand_str() True @@ -53,7 +50,7 @@ def gen_rand_str(bytesentropy=33): randstr = bytes2str(randbytes) return randstr -def parse_digest_response(data): +def parse_digest_response(data: str) -> typing.Dict[str, str]: """internal @raises ValueError: @@ -118,13 +115,11 @@ def parse_digest_response(data): value, data = data.split(',', 1) result[key] = value -def format_digest(mapping): +def format_digest(mapping: typing.Dict[str, typing.Tuple[str, bool]]) -> str: """internal - @type mapping: {str: (str, bool)} @param mapping: a mapping of keys to values and a boolean that determines whether the value needs quoting. - @rtype: str @note: the RFC specifies which values must be quoted and which must not be quoted. """ @@ -150,38 +145,36 @@ class AbstractTokenGenerator: L{AuthDigestMiddleware}. @ivar realm: is a string according to RFC2617. - @type realm: str """ - def __init__(self, realm): - """ - @type realm: str - """ + realm: str + def __init__(self, realm: str): assert isinstance(realm, str) self.realm = realm - def __call__(self, username, algo="md5"): + def __call__( + self, username: str, algo: str = "md5" + ) -> typing.Optional[str]: """Generates an authentication token from a username. - @type username: str - @type algo: str @param algo: currently the only value supported by L{AuthDigestMiddleware} is "md5" - @rtype: str or None @returns: a valid token or None to signal that authentication should fail """ raise NotImplementedError - def check_password(self, username, password, environ=None): + def check_password( + self, + username: str, + password: str, + environ: typing.Optional[Environ] = None, + ) -> bool: """ This function implements the interface for verifying passwords used by L{BasicAuthMiddleware}. It works by computing a token from the user and comparing it to the token returned by the __call__ method. - @type username: str - @type password: str @param environ: ignored - @rtype: bool """ assert isinstance(username, str) assert isinstance(password, str) @@ -191,16 +184,26 @@ class AbstractTokenGenerator: return False return compare_digest(md5hex(token), expected) +if sys.version_info >= (3, 11): + class TokenGenerator(typing.Protocol): + realm: str + def __call__( + self, username: str, algo: str = "md5" + ) -> typing.Optional[str]: + ... +else: + TokenGenerator = typing.Callable[[str, str], typing.Optional[str]] + __all__.append("AuthTokenGenerator") class AuthTokenGenerator(AbstractTokenGenerator): """Generates authentication tokens for L{AuthDigestMiddleware}. The interface consists of beeing callable with a username and having a realm attribute being a string.""" - def __init__(self, realm, getpass): + def __init__( + self, realm: str, getpass: typing.Callable[[str], typing.Optional[str]] + ): """ - @type realm: str @param realm: is a string according to RFC2617. - @type getpass: str -> (str or None) @param getpass: this function is called with a username and password is expected as result. C{None} may be used as an invalid password. An example for getpass would be C{{username: password}.get}. @@ -208,7 +211,9 @@ class AuthTokenGenerator(AbstractTokenGenerator): AbstractTokenGenerator.__init__(self, realm) self.getpass = getpass - def __call__(self, username, algo="md5"): + def __call__( + self, username: str, algo: str = "md5" + ) -> typing.Optional[str]: assert isinstance(username, str) assert algo.lower() in ["md5", "md5-sess"] password = self.getpass(username) @@ -222,12 +227,13 @@ class HtdigestTokenGenerator(AbstractTokenGenerator): """Reads authentication tokens for L{AuthDigestMiddleware} from an apache htdigest file. """ - def __init__(self, realm, htdigestfile, ignoreparseerrors=False): + users: typing.Dict[str, str] + + def __init__( + self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False + ): """ - @type realm: str - @type htdigestfile: str @param htdigestfile: path to the .htdigest file - @type ignoreparseerrors: bool @param ignoreparseerrors: passed to readhtdigest @raises IOError: @raises ValueError: @@ -236,10 +242,10 @@ class HtdigestTokenGenerator(AbstractTokenGenerator): self.users = {} self.readhtdigest(htdigestfile, ignoreparseerrors) - def readhtdigest(self, htdigestfile, ignoreparseerrors=False): + def readhtdigest( + self, htdigestfile: str, ignoreparseerrors: bool = False + ) -> None: """ - @type htdigestfile: str - @type ignoreparseerrors: bool @param ignoreparseerrors: do not raise ValueErrors for bad files @raises IOError: @raises ValueError: @@ -260,7 +266,7 @@ class HtdigestTokenGenerator(AbstractTokenGenerator): raise ValueError("duplicate user in htdigest file") self.users[user] = token - def __call__(self, user, algo="md5"): + def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]: assert algo.lower() in ["md5", "md5-sess"] return self.users.get(user) @@ -269,7 +275,9 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator): """Behaves like L{HtdigestTokenGenerator}, checks the htdigest file for changes on each invocation. """ - def __init__(self, realm, htdigestfile, ignoreparseerrors=False): + def __init__( + self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False + ): assert isinstance(htdigestfile, str) # Need to stat the file before calling parent ctor to detect # modifications. @@ -282,7 +290,7 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator): self.htdigestfile = htdigestfile self.ignoreparseerrors = ignoreparseerrors - def __call__(self, user, algo="md5"): + def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]: # The interface does not permit raising exceptions, so all we can do is # fail by returning None. try: @@ -301,36 +309,30 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator): __all__.append("NonceStoreBase") class NonceStoreBase: """Nonce storage interface.""" - def __init__(self): + def __init__(self) -> None: pass - def newnonce(self, ident=None): + def newnonce(self, ident: typing.Optional[str] = None) -> str: """ This method is to be overriden and should return new nonces. - @type ident: str @param ident: is an identifier to be associated with this nonce - @rtype: str """ raise NotImplementedError - def checknonce(self, nonce, count=1, ident=None): + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: """ This method is to be overridden and should do a check for whether the given nonce is valid as being used count times. - @type nonce: str - @type count: int @param count: indicates how often the nonce has been used (including this check) - @type ident: str @param ident: it is also checked that the nonce was associated to this identifier when given - @rtype: bool """ raise NotImplementedError -def format_time(seconds): +def format_time(seconds: float) -> str: """ internal method formatting a unix time to a fixed-length string - @type seconds: float - @rtype: str """ # the overflow will happen about 2112 return "%013X" % int(seconds * 1000000) @@ -356,13 +358,11 @@ class StatelessNonceStore(NonceStoreBase): >>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash") False """ - def __init__(self, maxage=300, secret=None): + def __init__(self, maxage: int = 300, secret: typing.Optional[str] = None): """ - @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. - @type secret: str @param secret: if not given, a secret is generated and is therefore shared after forks. Knowing this secret permits creating nonces. """ @@ -373,12 +373,8 @@ class StatelessNonceStore(NonceStoreBase): else: self.server_secret = gen_rand_str() - def newnonce(self, ident=None): - """ - Generates a new nonce string. - @type ident: None or str - @rtype: str - """ + def newnonce(self, ident: typing.Optional[str] = None) -> str: + """Generates a new nonce string.""" nonce_time = format_time(time.time()) nonce_value = gen_rand_str() token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) @@ -387,14 +383,10 @@ class StatelessNonceStore(NonceStoreBase): token = md5hex(token) return "%s:%s:%s" % (nonce_time, nonce_value, token) - def checknonce(self, nonce, count=1, ident=None): - """ - Check whether the provided string is a nonce. - @type nonce: str - @type count: int - @type ident: None or str - @rtype: bool - """ + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: + """Check whether the provided string is a nonce.""" if count != 1: return False try: @@ -429,13 +421,13 @@ class MemoryNonceStore(NonceStoreBase): >>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash") False """ - def __init__(self, maxage=300, maxuses=5): + nonces: typing.List[typing.Tuple[str, str, int]] + + def __init__(self, maxage: int = 300, maxuses: int = 5): """ - @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. - @type maxuses: int @param maxuses: is the number of times a nonce may be used (with different nc values). A value of 1 makes nonces usable exactly once resulting in more requests. Defaults to 5. @@ -447,18 +439,14 @@ class MemoryNonceStore(NonceStoreBase): # as [(str (hex encoded), str, int)] self.server_secret = gen_rand_str() - def _cleanup(self): + def _cleanup(self) -> None: """internal methods cleaning list of valid nonces""" old = format_time(time.time() - self.maxage) while self.nonces and self.nonces[0][0] < old: self.nonces.pop(0) - def newnonce(self, ident=None): - """ - Generates a new nonce string. - @type ident: None or str - @rtype: str - """ + def newnonce(self, ident: typing.Optional[str] = None) -> str: + """Generates a new nonce string.""" self._cleanup() # avoid growing self.nonces nonce_time = format_time(time.time()) nonce_value = gen_rand_str() @@ -469,14 +457,12 @@ class MemoryNonceStore(NonceStoreBase): token = md5hex(token) return "%s:%s:%s" % (nonce_time, nonce_value, token) - def checknonce(self, nonce, count=1, ident=None): + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: """ Do a check for whether the provided string is a nonce and increase usage count on returning True. - @type nonce: str - @type count: int - @type ident: None or str - @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') @@ -522,19 +508,28 @@ class LazyDBAPI2Opener: because this way each worker child opens a new database connection when the first request is to be answered. """ - def __init__(self, function, *args, **kwargs): + _function: typing.Optional[typing.Callable[..., typing.Any]] + def __init__( + self, + function: typing.Callable[..., typing.Any], + *args, + **kwargs, + ): """ The database will be connected on the first method call. This is done by calling the given function with the remaining parameters. @param function: is the function that connects to the database """ self._function = function - self._args = args - self._kwargs = kwargs + self._args: typing.Optional[typing.Tuple[typing.Any, ...]] = args + self._kwargs: typing.Optional[typing.Dict[str, typing.Any]] = kwargs self._dbhandle = None - def _getdbhandle(self): + def _getdbhandle(self) -> typing.Any: """Returns an open database connection. Open if necessary.""" if self._dbhandle is None: + assert self._function is not None + assert self._args is not None + assert self._kwargs is not None self._dbhandle = self._function(*self._args, **self._kwargs) self._function = self._args = self._kwargs = None return self._dbhandle @@ -573,14 +568,14 @@ class DBAPI2NonceStore(NonceStoreBase): >>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash") False """ - def __init__(self, dbhandle, maxage=300, maxuses=5, table="nonces"): + def __init__( + self, dbhandle, maxage: int = 300, maxuses: int = 5, table="nonces" + ): """ @param dbhandle: is a dbapi2 connection - @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. - @type maxuses: int @param maxuses: is the number of times a nonce may be used (with different nc values). A value of 1 makes nonces usable exactly once resulting in more requests. Defaults to 5. @@ -592,16 +587,13 @@ class DBAPI2NonceStore(NonceStoreBase): self.table = table self.server_secret = gen_rand_str() - def _cleanup(self, cur): + def _cleanup(self, cur) -> None: """internal methods cleaning list of valid nonces""" old = format_time(time.time() - self.maxage) cur.execute("DELETE FROM %s WHERE key < '%s:';" % (self.table, old)) - def newnonce(self, ident=None): - """ - Generates a new nonce string. - @rtype: str - """ + def newnonce(self, ident: typing.Optional[str] = None) -> str: + """Generates a new nonce string.""" nonce_time = format_time(time.time()) nonce_value = gen_rand_str() dbkey = "%s:%s" % (nonce_time, nonce_value) @@ -615,14 +607,12 @@ class DBAPI2NonceStore(NonceStoreBase): token = md5hex(token) return "%s:%s:%s" % (nonce_time, nonce_value, token) - def checknonce(self, nonce, count=1, ident=None): + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: """ Do a check for whether the provided string is a nonce and increase usage count on returning True. - @type nonce: str - @type count: int - @type ident: str or None - @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') @@ -668,7 +658,7 @@ class DBAPI2NonceStore(NonceStoreBase): self.dbhandle.commit() return True -def check_uri(credentials, environ): +def check_uri(credentials: typing.Dict[str, str], environ: Environ) -> None: """internal method for verifying the uri credential @raises AuthenticationRequired: """ @@ -706,19 +696,23 @@ class AuthDigestMiddleware(AuthenticationMiddleware): application.""" authorization_method = "digest" algorithms = {"md5": md5hex} - def __init__(self, app, gentoken, maxage=300, maxuses=5, store=None): + noncestore: NonceStoreBase + def __init__( + self, + app: WsgiApp, + gentoken: TokenGenerator, + maxage: int = 300, + maxuses: int = 5, + store: typing.Optional[NonceStoreBase] = None, + ): """ @param app: is the wsgi application to be served with authentication. - @type gentoken: str -> (str or None) @param gentoken: has to have the same functionality and interface as the L{AuthTokenGenerator} class. - @type maxage: int @param maxage: deprecated, see L{MemoryNonceStore} or L{StatelessNonceStore} and pass an instance to store - @type maxuses: int @param maxuses: deprecated, see L{MemoryNonceStore} and pass an instance to store - @type store: L{NonceStoreBase} @param store: a nonce storage implementation object. Usage of this parameter will override maxage and maxuses. """ @@ -731,7 +725,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware): assert hasattr(store, "checknonce") self.noncestore = store - def authenticate(self, auth, environ): + def authenticate(self, auth: str, environ: Environ) -> Environ: assert isinstance(auth, str) try: credentials = parse_digest_response(auth) @@ -783,7 +777,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware): return dict(user=credentials["username"], outheaders=[("Authentication-Info", format_digest(digest))]) - def auth_response(self, credentials, reqmethod): + def auth_response( + self, credentials: typing.Dict[str, str], reqmethod: str + ) -> typing.Optional[str]: """internal method generating authentication tokens @raises AuthenticationRequired: """ @@ -818,7 +814,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware): dig.insert(0, a1h) return self.algorithms[algo](":".join(dig)) - def www_authenticate(self, exception): + def www_authenticate( + self, exception: AuthenticationRequired + ) -> typing.Tuple[str, str]: digest = dict(nonce=(self.noncestore.newnonce(), True), realm=(self.gentoken.realm, True), algorithm=("MD5", False), -- cgit v1.2.3