summaryrefslogtreecommitdiff
path: root/wsgitools/scgi/forkpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'wsgitools/scgi/forkpool.py')
-rw-r--r--wsgitools/scgi/forkpool.py95
1 files changed, 54 insertions, 41 deletions
diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py
index df8a92f..f864a6a 100644
--- a/wsgitools/scgi/forkpool.py
+++ b/wsgitools/scgi/forkpool.py
@@ -12,21 +12,24 @@ import select
import sys
import errno
import signal
+import typing
-from wsgitools.internal import bytes2str, str2bytes
+from wsgitools.internal import (
+ bytes2str, HeaderList, OptExcInfo, str2bytes, WriteCallback, WsgiApp
+)
from wsgitools.scgi import _convert_environ, FileWrapper
__all__ = []
class SocketFileWrapper:
"""Wraps a socket to a wsgi-compliant file-like object."""
- def __init__(self, sock, toread):
+ def __init__(self, sock: socket.socket, toread: int):
"""@param sock: is a C{socket.socket()}"""
self.sock = sock
self.buff = b""
self.toread = toread
- def _recv(self, size=4096):
+ def _recv(self, size: int = 4096) -> bytes:
"""
internal method for receiving and counting incoming data
@raises socket.error:
@@ -44,7 +47,7 @@ class SocketFileWrapper:
self.toread -= len(data)
return data
- def close(self):
+ def close(self) -> None:
"""Does not close the socket, because it might still be needed. It
reads all data that should have been read as given by C{CONTENT_LENGTH}.
"""
@@ -55,7 +58,7 @@ class SocketFileWrapper:
except socket.error:
pass
- def read(self, size=None):
+ def read(self, size: typing.Optional[int] = None) -> bytes:
"""
see pep333
@raises socket.error:
@@ -92,7 +95,7 @@ class SocketFileWrapper:
ret, self.buff = self.buff, b""
return ret
- def readline(self, size=None):
+ def readline(self, size: typing.Optional[int] = None) -> bytes:
"""
see pep333
@raises socket.error:
@@ -118,7 +121,7 @@ class SocketFileWrapper:
return ret
self.buff += data
- def readlines(self):
+ def readlines(self) -> typing.Iterator[bytes]:
"""
see pep333
@raises socket.error:
@@ -127,10 +130,10 @@ class SocketFileWrapper:
while data:
yield data
data = self.readline()
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[bytes]:
"""see pep333"""
return self
- def __next__(self):
+ def __next__(self) -> bytes:
"""
see pep333
@raises socket.error:
@@ -139,9 +142,9 @@ class SocketFileWrapper:
if not data:
raise StopIteration
return data
- def flush(self):
+ def flush(self) -> None:
"""see pep333"""
- def write(self, data):
+ def write(self, data: bytes) -> None:
"""see pep333"""
assert isinstance(data, bytes)
try:
@@ -149,7 +152,7 @@ class SocketFileWrapper:
except socket.error:
# ignore all socket errors: there is no way to report
return
- def writelines(self, lines):
+ def writelines(self, lines: typing.List[bytes]) -> None:
"""see pep333"""
for line in lines:
self.write(line)
@@ -161,49 +164,51 @@ class SCGIServer:
class WorkerState:
"""state: 0 means idle and 1 means working.
These values are also sent as strings '0' and '1' over the socket."""
- def __init__(self, pid, sock, state):
- """
- @type pid: int
- @type state: int
- """
+ def __init__(self, pid: int, sock: socket.socket, state: int):
self.pid = pid
self.sock = sock
self.state = state
- def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr,
- minworkers=2, maxworkers=32, maxrequests=1000, config={},
- reusesocket=None, cpulimit=None, timelimit=None):
+ server: typing.Optional[socket.socket]
+ workers: typing.Dict[int, WorkerState]
+ sigpipe: typing.Optional[typing.Tuple[socket.socket, socket.socket]]
+
+ def __init__(
+ self,
+ wsgiapp: WsgiApp,
+ port: int,
+ interface: str = "localhost",
+ error: typing.TextIO = sys.stderr,
+ minworkers: int = 2,
+ maxworkers: int = 32,
+ maxrequests: int = 1000,
+ config: typing.Dict[typing.Any, typing.Any] = {},
+ reusesocket: typing.Optional[socket.socket] = None,
+ cpulimit: typing.Optional[typing.Tuple[int, int]] = None,
+ timelimit: typing.Optional[int] = None,
+ ):
"""
@param wsgiapp: is the WSGI application to be run.
- @type port: int
@param port: is the tcp port to listen on
- @type interface: str
@param interface: is the interface to bind to (default: C{"localhost"})
@param error: is a file-like object beeing passed as C{wsgi.errors} in
environ
- @type minworkers: int
@param minworkers: is the number of worker processes to spawn
- @type maxworkers: int
@param maxworkers: is the maximum number of workers that can be spawned
on demand
- @type maxrequests: int
@param maxrequests: is the number of requests a worker processes before
dying
- @type config: {}
@param config: the environ dictionary is updated using these values for
each request.
- @type reusesocket: None or socket.socket
@param reusesocket: If a socket is passed, do not create a socket.
Instead use given socket as listen socket. The passed socket
must be set up for accepting tcp connections (i.e. C{AF_INET},
C{SOCK_STREAM} with bind and listen called).
- @type cpulimit: (int, int)
@param cpulimit: a pair of soft and hard cpu time limit in seconds.
This limit is installed for each worker using RLIMIT_CPU if
resource limits are available to this platform. After reaching
the soft limit workers will continue to process the current
request and then cleanly terminate.
- @type timelimit: int
@param timelimit: The maximum number of wall clock seconds processing
a request should take. If this is specified, an alarm timer is
installed and the default action is to kill the worker.
@@ -229,7 +234,7 @@ class SCGIServer:
self.running = False
self.ischild = False
- def enable_sighandler(self, sig=signal.SIGTERM):
+ def enable_sighandler(self, sig: int = signal.SIGTERM) -> "SCGIServer":
"""
Changes the signal handler for the given signal to terminate the run()
loop.
@@ -239,7 +244,7 @@ class SCGIServer:
signal.signal(sig, self.shutdownhandler)
return self
- def run(self):
+ def run(self) -> None:
"""
Serve the wsgi application.
"""
@@ -296,7 +301,7 @@ class SCGIServer:
self.sigpipe = None
self.killworkers()
- def killworkers(self, sig=signal.SIGTERM):
+ def killworkers(self, sig: int = signal.SIGTERM) -> None:
"""
Kills all worker children.
@param sig: is the signal used to kill the children
@@ -307,7 +312,9 @@ class SCGIServer:
os.kill(state.pid, sig)
# TODO: handle working children with a timeout
- def shutdownhandler(self, sig=None, stackframe=None):
+ def shutdownhandler(
+ self, sig: typing.Optional[int] = None, stackframe=None
+ ) -> None:
"""
Signal handler function for stopping the run() loop. It works by
setting a variable that run() evaluates in each loop. As a signal
@@ -319,10 +326,13 @@ class SCGIServer:
if self.ischild:
sys.exit()
elif self.running:
+ assert self.sigpipe is not None
self.running = False
self.sigpipe[1].send(b' ')
- def sigxcpuhandler(self, sig=None, stackframe=None):
+ def sigxcpuhandler(
+ self, sig: typing.Optional[int] = None, stackframe=None
+ ) -> None:
"""
Signal hanlder function for the SIGXCUP signal. It is sent to a
worker when the soft RLIMIT_CPU is crossed.
@@ -331,7 +341,7 @@ class SCGIServer:
"""
self.cpulimit = True
- def spawnworker(self):
+ def spawnworker(self) -> None:
"""
internal! spawns a single worker
"""
@@ -366,11 +376,12 @@ class SCGIServer:
else:
raise RuntimeError("fork failed")
- def work(self, worksock):
+ def work(self, worksock: socket.socket) -> None:
"""
internal! serves maxrequests times
@raises socket.error:
"""
+ assert self.server is not None
for _ in range(self.maxrequests):
(con, addr) = self.server.accept()
# we cannot handle socket.errors here.
@@ -384,7 +395,7 @@ class SCGIServer:
if self.cpulimit:
break
- def process(self, con):
+ def process(self, con: socket.socket) -> None:
"""
internal! processes a single request on the connection con.
"""
@@ -439,9 +450,9 @@ class SCGIServer:
# 0 -> False: set but unsent
# 0 -> True: sent
# 1 -> bytes of the complete header
- response_head = [None, None]
+ response_head: typing.List[typing.Any] = [None, None]
- def sendheaders():
+ def sendheaders() -> None:
assert response_head[0] is not None # headers set
if response_head[0] != True:
response_head[0] = True
@@ -450,14 +461,16 @@ class SCGIServer:
except socket.error:
pass
- def dumbsend(data):
+ def dumbsend(data: bytes) -> None:
sendheaders()
try:
con.sendall(data)
except socket.error:
pass
- def start_response(status, headers, exc_info=None):
+ def start_response(
+ status: str, headers: HeaderList, exc_info: OptExcInfo = None
+ ) -> WriteCallback:
if exc_info and response_head[0]:
try:
raise exc_info[1].with_traceback(exc_info[2])