diff options
Diffstat (limited to 'wsgitools/digest.py')
-rw-r--r-- | wsgitools/digest.py | 220 |
1 files changed, 109 insertions, 111 deletions
diff --git a/wsgitools/digest.py b/wsgitools/digest.py index 6eb4cb3..18925df 100644 --- a/wsgitools/digest.py +++ b/wsgitools/digest.py @@ -23,25 +23,22 @@ except ImportError: import random sysrand = random.SystemRandom() randbits = sysrand.getrandbits - def compare_digest(a, b): + def compare_digest(a: str, b: str) -> bool: return a == b +import sys +import typing -from wsgitools.internal import bytes2str, str2bytes, textopen +from wsgitools.internal import bytes2str, Environ, str2bytes, textopen, WsgiApp from wsgitools.authentication import AuthenticationRequired, \ ProtocolViolation, AuthenticationMiddleware -def md5hex(data): - """ - @type data: str - @rtype: str - """ +def md5hex(data: str) -> str: return hashlib.md5(str2bytes(data)).hexdigest() -def gen_rand_str(bytesentropy=33): +def gen_rand_str(bytesentropy: int = 33) -> str: """ Generates a string of random base64 characters. @param bytesentropy: is the number of random 8bit values to be used - @rtype: str >>> gen_rand_str() != gen_rand_str() True @@ -53,7 +50,7 @@ def gen_rand_str(bytesentropy=33): randstr = bytes2str(randbytes) return randstr -def parse_digest_response(data): +def parse_digest_response(data: str) -> typing.Dict[str, str]: """internal @raises ValueError: @@ -118,13 +115,11 @@ def parse_digest_response(data): value, data = data.split(',', 1) result[key] = value -def format_digest(mapping): +def format_digest(mapping: typing.Dict[str, typing.Tuple[str, bool]]) -> str: """internal - @type mapping: {str: (str, bool)} @param mapping: a mapping of keys to values and a boolean that determines whether the value needs quoting. - @rtype: str @note: the RFC specifies which values must be quoted and which must not be quoted. """ @@ -150,38 +145,36 @@ class AbstractTokenGenerator: L{AuthDigestMiddleware}. @ivar realm: is a string according to RFC2617. - @type realm: str """ - def __init__(self, realm): - """ - @type realm: str - """ + realm: str + def __init__(self, realm: str): assert isinstance(realm, str) self.realm = realm - def __call__(self, username, algo="md5"): + def __call__( + self, username: str, algo: str = "md5" + ) -> typing.Optional[str]: """Generates an authentication token from a username. - @type username: str - @type algo: str @param algo: currently the only value supported by L{AuthDigestMiddleware} is "md5" - @rtype: str or None @returns: a valid token or None to signal that authentication should fail """ raise NotImplementedError - def check_password(self, username, password, environ=None): + def check_password( + self, + username: str, + password: str, + environ: typing.Optional[Environ] = None, + ) -> bool: """ This function implements the interface for verifying passwords used by L{BasicAuthMiddleware}. It works by computing a token from the user and comparing it to the token returned by the __call__ method. - @type username: str - @type password: str @param environ: ignored - @rtype: bool """ assert isinstance(username, str) assert isinstance(password, str) @@ -191,16 +184,26 @@ class AbstractTokenGenerator: return False return compare_digest(md5hex(token), expected) +if sys.version_info >= (3, 11): + class TokenGenerator(typing.Protocol): + realm: str + def __call__( + self, username: str, algo: str = "md5" + ) -> typing.Optional[str]: + ... +else: + TokenGenerator = typing.Callable[[str, str], typing.Optional[str]] + __all__.append("AuthTokenGenerator") class AuthTokenGenerator(AbstractTokenGenerator): """Generates authentication tokens for L{AuthDigestMiddleware}. The interface consists of beeing callable with a username and having a realm attribute being a string.""" - def __init__(self, realm, getpass): + def __init__( + self, realm: str, getpass: typing.Callable[[str], typing.Optional[str]] + ): """ - @type realm: str @param realm: is a string according to RFC2617. - @type getpass: str -> (str or None) @param getpass: this function is called with a username and password is expected as result. C{None} may be used as an invalid password. An example for getpass would be C{{username: password}.get}. @@ -208,7 +211,9 @@ class AuthTokenGenerator(AbstractTokenGenerator): AbstractTokenGenerator.__init__(self, realm) self.getpass = getpass - def __call__(self, username, algo="md5"): + def __call__( + self, username: str, algo: str = "md5" + ) -> typing.Optional[str]: assert isinstance(username, str) assert algo.lower() in ["md5", "md5-sess"] password = self.getpass(username) @@ -222,12 +227,13 @@ class HtdigestTokenGenerator(AbstractTokenGenerator): """Reads authentication tokens for L{AuthDigestMiddleware} from an apache htdigest file. """ - def __init__(self, realm, htdigestfile, ignoreparseerrors=False): + users: typing.Dict[str, str] + + def __init__( + self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False + ): """ - @type realm: str - @type htdigestfile: str @param htdigestfile: path to the .htdigest file - @type ignoreparseerrors: bool @param ignoreparseerrors: passed to readhtdigest @raises IOError: @raises ValueError: @@ -236,10 +242,10 @@ class HtdigestTokenGenerator(AbstractTokenGenerator): self.users = {} self.readhtdigest(htdigestfile, ignoreparseerrors) - def readhtdigest(self, htdigestfile, ignoreparseerrors=False): + def readhtdigest( + self, htdigestfile: str, ignoreparseerrors: bool = False + ) -> None: """ - @type htdigestfile: str - @type ignoreparseerrors: bool @param ignoreparseerrors: do not raise ValueErrors for bad files @raises IOError: @raises ValueError: @@ -260,7 +266,7 @@ class HtdigestTokenGenerator(AbstractTokenGenerator): raise ValueError("duplicate user in htdigest file") self.users[user] = token - def __call__(self, user, algo="md5"): + def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]: assert algo.lower() in ["md5", "md5-sess"] return self.users.get(user) @@ -269,7 +275,9 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator): """Behaves like L{HtdigestTokenGenerator}, checks the htdigest file for changes on each invocation. """ - def __init__(self, realm, htdigestfile, ignoreparseerrors=False): + def __init__( + self, realm: str, htdigestfile: str, ignoreparseerrors: bool = False + ): assert isinstance(htdigestfile, str) # Need to stat the file before calling parent ctor to detect # modifications. @@ -282,7 +290,7 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator): self.htdigestfile = htdigestfile self.ignoreparseerrors = ignoreparseerrors - def __call__(self, user, algo="md5"): + def __call__(self, user: str, algo: str = "md5") -> typing.Optional[str]: # The interface does not permit raising exceptions, so all we can do is # fail by returning None. try: @@ -301,36 +309,30 @@ class UpdatingHtdigestTokenGenerator(HtdigestTokenGenerator): __all__.append("NonceStoreBase") class NonceStoreBase: """Nonce storage interface.""" - def __init__(self): + def __init__(self) -> None: pass - def newnonce(self, ident=None): + def newnonce(self, ident: typing.Optional[str] = None) -> str: """ This method is to be overriden and should return new nonces. - @type ident: str @param ident: is an identifier to be associated with this nonce - @rtype: str """ raise NotImplementedError - def checknonce(self, nonce, count=1, ident=None): + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: """ This method is to be overridden and should do a check for whether the given nonce is valid as being used count times. - @type nonce: str - @type count: int @param count: indicates how often the nonce has been used (including this check) - @type ident: str @param ident: it is also checked that the nonce was associated to this identifier when given - @rtype: bool """ raise NotImplementedError -def format_time(seconds): +def format_time(seconds: float) -> str: """ internal method formatting a unix time to a fixed-length string - @type seconds: float - @rtype: str """ # the overflow will happen about 2112 return "%013X" % int(seconds * 1000000) @@ -356,13 +358,11 @@ class StatelessNonceStore(NonceStoreBase): >>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash") False """ - def __init__(self, maxage=300, secret=None): + def __init__(self, maxage: int = 300, secret: typing.Optional[str] = None): """ - @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. - @type secret: str @param secret: if not given, a secret is generated and is therefore shared after forks. Knowing this secret permits creating nonces. """ @@ -373,12 +373,8 @@ class StatelessNonceStore(NonceStoreBase): else: self.server_secret = gen_rand_str() - def newnonce(self, ident=None): - """ - Generates a new nonce string. - @type ident: None or str - @rtype: str - """ + def newnonce(self, ident: typing.Optional[str] = None) -> str: + """Generates a new nonce string.""" nonce_time = format_time(time.time()) nonce_value = gen_rand_str() token = "%s:%s:%s" % (nonce_time, nonce_value, self.server_secret) @@ -387,14 +383,10 @@ class StatelessNonceStore(NonceStoreBase): token = md5hex(token) return "%s:%s:%s" % (nonce_time, nonce_value, token) - def checknonce(self, nonce, count=1, ident=None): - """ - Check whether the provided string is a nonce. - @type nonce: str - @type count: int - @type ident: None or str - @rtype: bool - """ + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: + """Check whether the provided string is a nonce.""" if count != 1: return False try: @@ -429,13 +421,13 @@ class MemoryNonceStore(NonceStoreBase): >>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash") False """ - def __init__(self, maxage=300, maxuses=5): + nonces: typing.List[typing.Tuple[str, str, int]] + + def __init__(self, maxage: int = 300, maxuses: int = 5): """ - @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. - @type maxuses: int @param maxuses: is the number of times a nonce may be used (with different nc values). A value of 1 makes nonces usable exactly once resulting in more requests. Defaults to 5. @@ -447,18 +439,14 @@ class MemoryNonceStore(NonceStoreBase): # as [(str (hex encoded), str, int)] self.server_secret = gen_rand_str() - def _cleanup(self): + def _cleanup(self) -> None: """internal methods cleaning list of valid nonces""" old = format_time(time.time() - self.maxage) while self.nonces and self.nonces[0][0] < old: self.nonces.pop(0) - def newnonce(self, ident=None): - """ - Generates a new nonce string. - @type ident: None or str - @rtype: str - """ + def newnonce(self, ident: typing.Optional[str] = None) -> str: + """Generates a new nonce string.""" self._cleanup() # avoid growing self.nonces nonce_time = format_time(time.time()) nonce_value = gen_rand_str() @@ -469,14 +457,12 @@ class MemoryNonceStore(NonceStoreBase): token = md5hex(token) return "%s:%s:%s" % (nonce_time, nonce_value, token) - def checknonce(self, nonce, count=1, ident=None): + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: """ Do a check for whether the provided string is a nonce and increase usage count on returning True. - @type nonce: str - @type count: int - @type ident: None or str - @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') @@ -522,19 +508,28 @@ class LazyDBAPI2Opener: because this way each worker child opens a new database connection when the first request is to be answered. """ - def __init__(self, function, *args, **kwargs): + _function: typing.Optional[typing.Callable[..., typing.Any]] + def __init__( + self, + function: typing.Callable[..., typing.Any], + *args, + **kwargs, + ): """ The database will be connected on the first method call. This is done by calling the given function with the remaining parameters. @param function: is the function that connects to the database """ self._function = function - self._args = args - self._kwargs = kwargs + self._args: typing.Optional[typing.Tuple[typing.Any, ...]] = args + self._kwargs: typing.Optional[typing.Dict[str, typing.Any]] = kwargs self._dbhandle = None - def _getdbhandle(self): + def _getdbhandle(self) -> typing.Any: """Returns an open database connection. Open if necessary.""" if self._dbhandle is None: + assert self._function is not None + assert self._args is not None + assert self._kwargs is not None self._dbhandle = self._function(*self._args, **self._kwargs) self._function = self._args = self._kwargs = None return self._dbhandle @@ -573,14 +568,14 @@ class DBAPI2NonceStore(NonceStoreBase): >>> s.checknonce(n.rsplit(':', 1)[0] + "bad hash") False """ - def __init__(self, dbhandle, maxage=300, maxuses=5, table="nonces"): + def __init__( + self, dbhandle, maxage: int = 300, maxuses: int = 5, table="nonces" + ): """ @param dbhandle: is a dbapi2 connection - @type maxage: int @param maxage: is the number of seconds a nonce may be valid. Choosing a large value may result in more memory usage whereas a smaller value results in more requests. Defaults to 5 minutes. - @type maxuses: int @param maxuses: is the number of times a nonce may be used (with different nc values). A value of 1 makes nonces usable exactly once resulting in more requests. Defaults to 5. @@ -592,16 +587,13 @@ class DBAPI2NonceStore(NonceStoreBase): self.table = table self.server_secret = gen_rand_str() - def _cleanup(self, cur): + def _cleanup(self, cur) -> None: """internal methods cleaning list of valid nonces""" old = format_time(time.time() - self.maxage) cur.execute("DELETE FROM %s WHERE key < '%s:';" % (self.table, old)) - def newnonce(self, ident=None): - """ - Generates a new nonce string. - @rtype: str - """ + def newnonce(self, ident: typing.Optional[str] = None) -> str: + """Generates a new nonce string.""" nonce_time = format_time(time.time()) nonce_value = gen_rand_str() dbkey = "%s:%s" % (nonce_time, nonce_value) @@ -615,14 +607,12 @@ class DBAPI2NonceStore(NonceStoreBase): token = md5hex(token) return "%s:%s:%s" % (nonce_time, nonce_value, token) - def checknonce(self, nonce, count=1, ident=None): + def checknonce( + self, nonce: str, count: int = 1, ident: typing.Optional[str] = None + ) -> bool: """ Do a check for whether the provided string is a nonce and increase usage count on returning True. - @type nonce: str - @type count: int - @type ident: str or None - @rtype: bool """ try: nonce_time, nonce_value, nonce_hash = nonce.split(':') @@ -668,7 +658,7 @@ class DBAPI2NonceStore(NonceStoreBase): self.dbhandle.commit() return True -def check_uri(credentials, environ): +def check_uri(credentials: typing.Dict[str, str], environ: Environ) -> None: """internal method for verifying the uri credential @raises AuthenticationRequired: """ @@ -706,19 +696,23 @@ class AuthDigestMiddleware(AuthenticationMiddleware): application.""" authorization_method = "digest" algorithms = {"md5": md5hex} - def __init__(self, app, gentoken, maxage=300, maxuses=5, store=None): + noncestore: NonceStoreBase + def __init__( + self, + app: WsgiApp, + gentoken: TokenGenerator, + maxage: int = 300, + maxuses: int = 5, + store: typing.Optional[NonceStoreBase] = None, + ): """ @param app: is the wsgi application to be served with authentication. - @type gentoken: str -> (str or None) @param gentoken: has to have the same functionality and interface as the L{AuthTokenGenerator} class. - @type maxage: int @param maxage: deprecated, see L{MemoryNonceStore} or L{StatelessNonceStore} and pass an instance to store - @type maxuses: int @param maxuses: deprecated, see L{MemoryNonceStore} and pass an instance to store - @type store: L{NonceStoreBase} @param store: a nonce storage implementation object. Usage of this parameter will override maxage and maxuses. """ @@ -731,7 +725,7 @@ class AuthDigestMiddleware(AuthenticationMiddleware): assert hasattr(store, "checknonce") self.noncestore = store - def authenticate(self, auth, environ): + def authenticate(self, auth: str, environ: Environ) -> Environ: assert isinstance(auth, str) try: credentials = parse_digest_response(auth) @@ -783,7 +777,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware): return dict(user=credentials["username"], outheaders=[("Authentication-Info", format_digest(digest))]) - def auth_response(self, credentials, reqmethod): + def auth_response( + self, credentials: typing.Dict[str, str], reqmethod: str + ) -> typing.Optional[str]: """internal method generating authentication tokens @raises AuthenticationRequired: """ @@ -818,7 +814,9 @@ class AuthDigestMiddleware(AuthenticationMiddleware): dig.insert(0, a1h) return self.algorithms[algo](":".join(dig)) - def www_authenticate(self, exception): + def www_authenticate( + self, exception: AuthenticationRequired + ) -> typing.Tuple[str, str]: digest = dict(nonce=(self.noncestore.newnonce(), True), realm=(self.gentoken.realm, True), algorithm=("MD5", False), |