diff options
Diffstat (limited to 'wsgitools/scgi/forkpool.py')
-rw-r--r-- | wsgitools/scgi/forkpool.py | 95 |
1 files changed, 54 insertions, 41 deletions
diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py index df8a92f..f864a6a 100644 --- a/wsgitools/scgi/forkpool.py +++ b/wsgitools/scgi/forkpool.py @@ -12,21 +12,24 @@ import select import sys import errno import signal +import typing -from wsgitools.internal import bytes2str, str2bytes +from wsgitools.internal import ( + bytes2str, HeaderList, OptExcInfo, str2bytes, WriteCallback, WsgiApp +) from wsgitools.scgi import _convert_environ, FileWrapper __all__ = [] class SocketFileWrapper: """Wraps a socket to a wsgi-compliant file-like object.""" - def __init__(self, sock, toread): + def __init__(self, sock: socket.socket, toread: int): """@param sock: is a C{socket.socket()}""" self.sock = sock self.buff = b"" self.toread = toread - def _recv(self, size=4096): + def _recv(self, size: int = 4096) -> bytes: """ internal method for receiving and counting incoming data @raises socket.error: @@ -44,7 +47,7 @@ class SocketFileWrapper: self.toread -= len(data) return data - def close(self): + def close(self) -> None: """Does not close the socket, because it might still be needed. It reads all data that should have been read as given by C{CONTENT_LENGTH}. """ @@ -55,7 +58,7 @@ class SocketFileWrapper: except socket.error: pass - def read(self, size=None): + def read(self, size: typing.Optional[int] = None) -> bytes: """ see pep333 @raises socket.error: @@ -92,7 +95,7 @@ class SocketFileWrapper: ret, self.buff = self.buff, b"" return ret - def readline(self, size=None): + def readline(self, size: typing.Optional[int] = None) -> bytes: """ see pep333 @raises socket.error: @@ -118,7 +121,7 @@ class SocketFileWrapper: return ret self.buff += data - def readlines(self): + def readlines(self) -> typing.Iterator[bytes]: """ see pep333 @raises socket.error: @@ -127,10 +130,10 @@ class SocketFileWrapper: while data: yield data data = self.readline() - def __iter__(self): + def __iter__(self) -> typing.Iterator[bytes]: """see pep333""" return self - def __next__(self): + def __next__(self) -> bytes: """ see pep333 @raises socket.error: @@ -139,9 +142,9 @@ class SocketFileWrapper: if not data: raise StopIteration return data - def flush(self): + def flush(self) -> None: """see pep333""" - def write(self, data): + def write(self, data: bytes) -> None: """see pep333""" assert isinstance(data, bytes) try: @@ -149,7 +152,7 @@ class SocketFileWrapper: except socket.error: # ignore all socket errors: there is no way to report return - def writelines(self, lines): + def writelines(self, lines: typing.List[bytes]) -> None: """see pep333""" for line in lines: self.write(line) @@ -161,49 +164,51 @@ class SCGIServer: class WorkerState: """state: 0 means idle and 1 means working. These values are also sent as strings '0' and '1' over the socket.""" - def __init__(self, pid, sock, state): - """ - @type pid: int - @type state: int - """ + def __init__(self, pid: int, sock: socket.socket, state: int): self.pid = pid self.sock = sock self.state = state - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, - minworkers=2, maxworkers=32, maxrequests=1000, config={}, - reusesocket=None, cpulimit=None, timelimit=None): + server: typing.Optional[socket.socket] + workers: typing.Dict[int, WorkerState] + sigpipe: typing.Optional[typing.Tuple[socket.socket, socket.socket]] + + def __init__( + self, + wsgiapp: WsgiApp, + port: int, + interface: str = "localhost", + error: typing.TextIO = sys.stderr, + minworkers: int = 2, + maxworkers: int = 32, + maxrequests: int = 1000, + config: typing.Dict[typing.Any, typing.Any] = {}, + reusesocket: typing.Optional[socket.socket] = None, + cpulimit: typing.Optional[typing.Tuple[int, int]] = None, + timelimit: typing.Optional[int] = None, + ): """ @param wsgiapp: is the WSGI application to be run. - @type port: int @param port: is the tcp port to listen on - @type interface: str @param interface: is the interface to bind to (default: C{"localhost"}) @param error: is a file-like object beeing passed as C{wsgi.errors} in environ - @type minworkers: int @param minworkers: is the number of worker processes to spawn - @type maxworkers: int @param maxworkers: is the maximum number of workers that can be spawned on demand - @type maxrequests: int @param maxrequests: is the number of requests a worker processes before dying - @type config: {} @param config: the environ dictionary is updated using these values for each request. - @type reusesocket: None or socket.socket @param reusesocket: If a socket is passed, do not create a socket. Instead use given socket as listen socket. The passed socket must be set up for accepting tcp connections (i.e. C{AF_INET}, C{SOCK_STREAM} with bind and listen called). - @type cpulimit: (int, int) @param cpulimit: a pair of soft and hard cpu time limit in seconds. This limit is installed for each worker using RLIMIT_CPU if resource limits are available to this platform. After reaching the soft limit workers will continue to process the current request and then cleanly terminate. - @type timelimit: int @param timelimit: The maximum number of wall clock seconds processing a request should take. If this is specified, an alarm timer is installed and the default action is to kill the worker. @@ -229,7 +234,7 @@ class SCGIServer: self.running = False self.ischild = False - def enable_sighandler(self, sig=signal.SIGTERM): + def enable_sighandler(self, sig: int = signal.SIGTERM) -> "SCGIServer": """ Changes the signal handler for the given signal to terminate the run() loop. @@ -239,7 +244,7 @@ class SCGIServer: signal.signal(sig, self.shutdownhandler) return self - def run(self): + def run(self) -> None: """ Serve the wsgi application. """ @@ -296,7 +301,7 @@ class SCGIServer: self.sigpipe = None self.killworkers() - def killworkers(self, sig=signal.SIGTERM): + def killworkers(self, sig: int = signal.SIGTERM) -> None: """ Kills all worker children. @param sig: is the signal used to kill the children @@ -307,7 +312,9 @@ class SCGIServer: os.kill(state.pid, sig) # TODO: handle working children with a timeout - def shutdownhandler(self, sig=None, stackframe=None): + def shutdownhandler( + self, sig: typing.Optional[int] = None, stackframe=None + ) -> None: """ Signal handler function for stopping the run() loop. It works by setting a variable that run() evaluates in each loop. As a signal @@ -319,10 +326,13 @@ class SCGIServer: if self.ischild: sys.exit() elif self.running: + assert self.sigpipe is not None self.running = False self.sigpipe[1].send(b' ') - def sigxcpuhandler(self, sig=None, stackframe=None): + def sigxcpuhandler( + self, sig: typing.Optional[int] = None, stackframe=None + ) -> None: """ Signal hanlder function for the SIGXCUP signal. It is sent to a worker when the soft RLIMIT_CPU is crossed. @@ -331,7 +341,7 @@ class SCGIServer: """ self.cpulimit = True - def spawnworker(self): + def spawnworker(self) -> None: """ internal! spawns a single worker """ @@ -366,11 +376,12 @@ class SCGIServer: else: raise RuntimeError("fork failed") - def work(self, worksock): + def work(self, worksock: socket.socket) -> None: """ internal! serves maxrequests times @raises socket.error: """ + assert self.server is not None for _ in range(self.maxrequests): (con, addr) = self.server.accept() # we cannot handle socket.errors here. @@ -384,7 +395,7 @@ class SCGIServer: if self.cpulimit: break - def process(self, con): + def process(self, con: socket.socket) -> None: """ internal! processes a single request on the connection con. """ @@ -439,9 +450,9 @@ class SCGIServer: # 0 -> False: set but unsent # 0 -> True: sent # 1 -> bytes of the complete header - response_head = [None, None] + response_head: typing.List[typing.Any] = [None, None] - def sendheaders(): + def sendheaders() -> None: assert response_head[0] is not None # headers set if response_head[0] != True: response_head[0] = True @@ -450,14 +461,16 @@ class SCGIServer: except socket.error: pass - def dumbsend(data): + def dumbsend(data: bytes) -> None: sendheaders() try: con.sendall(data) except socket.error: pass - def start_response(status, headers, exc_info=None): + def start_response( + status: str, headers: HeaderList, exc_info: OptExcInfo = None + ) -> WriteCallback: if exc_info and response_head[0]: try: raise exc_info[1].with_traceback(exc_info[2]) |