diff options
Diffstat (limited to 'wsgitools/scgi')
-rw-r--r-- | wsgitools/scgi/__init__.py | 25 | ||||
-rw-r--r-- | wsgitools/scgi/asynchronous.py | 98 | ||||
-rw-r--r-- | wsgitools/scgi/forkpool.py | 95 |
3 files changed, 142 insertions, 76 deletions
diff --git a/wsgitools/scgi/__init__.py b/wsgitools/scgi/__init__.py index e2a68c2..677e3b5 100644 --- a/wsgitools/scgi/__init__.py +++ b/wsgitools/scgi/__init__.py @@ -1,5 +1,8 @@ __all__ = [] +import socket +import typing + try: import sendfile except ImportError: @@ -7,6 +10,8 @@ except ImportError: else: have_sendfile = True +from wsgitools.internal import Environ + class FileWrapper: """ @ivar offset: Initially 0. Becomes -1 when reading using next and @@ -14,18 +19,20 @@ class FileWrapper: counts the number of bytes sent. It also ensures that next and transfer are never mixed. """ - def __init__(self, filelike, blksize=8192): + def __init__(self, filelike, blksize: int = 8192): self.filelike = filelike self.blksize = blksize self.offset = 0 if hasattr(filelike, "close"): self.close = filelike.close - def can_transfer(self): + def can_transfer(self) -> bool: return have_sendfile and hasattr(self.filelike, "fileno") and \ self.offset >= 0 - def transfer(self, sock, blksize=None): + def transfer( + self, sock: socket.socket, blksize: typing.Optional[int] = None + ) -> int: assert self.offset >= 0 if blksize is None: blksize = self.blksize @@ -42,10 +49,10 @@ class FileWrapper: self.offset += sent return sent - def __iter__(self): + def __iter__(self) -> typing.Iterator[bytes]: return self - def __next__(self): + def __next__(self) -> bytes: assert self.offset <= 0 self.offset = -1 data = self.filelike.read(self.blksize) @@ -53,8 +60,12 @@ class FileWrapper: return data raise StopIteration -def _convert_environ(environ, multithread=False, multiprocess=False, - run_once=False): +def _convert_environ( + environ: Environ, + multithread: bool = False, + multiprocess: bool = False, + run_once: bool = False, +) -> None: environ.update({ "wsgi.version": (1, 0), "wsgi.url_scheme": "http", diff --git a/wsgitools/scgi/asynchronous.py b/wsgitools/scgi/asynchronous.py index 61bbc6b..264e43e 100644 --- a/wsgitools/scgi/asynchronous.py +++ b/wsgitools/scgi/asynchronous.py @@ -5,10 +5,24 @@ import io import socket import sys import errno +import typing -from wsgitools.internal import bytes2str, str2bytes +from wsgitools.internal import ( + bytes2str, + Environ, + HeaderList, + OptExcInfo, + str2bytes, + WriteCallback, + WsgiApp, +) from wsgitools.scgi import _convert_environ, FileWrapper +if sys.version_info >= (3, 8): + LiteralTrue = typing.Literal[True] +else: + LiteralTrue = bool + class SCGIConnection(asyncore.dispatcher): """SCGI connection class used by L{SCGIServer}.""" # connection states @@ -19,11 +33,27 @@ class SCGIConnection(asyncore.dispatcher): RESP = 3*4 | 2 # sending response, end state RESPH = 4*4 | 2 # buffered response headers, sending headers only, to TRANS TRANS = 5*4 | 2 # transferring using FileWrapper, end state - def __init__(self, server, connection, maxrequestsize=65536, - maxpostsize=8<<20, blocksize=4096, config={}): + + outheaders: typing.Union[ + typing.Tuple[()], # unset + typing.Tuple[str, HeaderList], # headers + LiteralTrue # sent + ] + wsgihandler: typing.Optional[typing.Iterable[bytes]] + wsgiiterator: typing.Optional[typing.Iterator[bytes]] + + def __init__( + self, + server: "SCGIServer", + connection: socket.socket, + maxrequestsize: int = 65536, + maxpostsize: int = 8<<20, + blocksize: int = 4096, + config: Environ = {}, + ): asyncore.dispatcher.__init__(self, connection) - self.server = server # WSGISCGIServer instance + self.server = server self.maxrequestsize = maxrequestsize self.maxpostsize = maxpostsize self.blocksize = blocksize @@ -35,10 +65,9 @@ class SCGIConnection(asyncore.dispatcher): self.wsgihandler = None # wsgi application self.wsgiiterator = None # wsgi application iterator self.outheaders = () # headers to be sent - # () -> unset, (..,..) -> set, True -> sent self.body = io.BytesIO() # request body - def _try_send_headers(self): + def _try_send_headers(self) -> None: if self.outheaders != True: assert not self.outbuff status, headers = self.outheaders @@ -47,7 +76,7 @@ class SCGIConnection(asyncore.dispatcher): self.outbuff = str2bytes(headdata) self.outheaders = True - def _wsgi_write(self, data): + def _wsgi_write(self, data: bytes) -> None: assert self.state >= SCGIConnection.RESP assert self.state < SCGIConnection.TRANS assert isinstance(data, bytes) @@ -55,15 +84,15 @@ class SCGIConnection(asyncore.dispatcher): self._try_send_headers() self.outbuff += data - def readable(self): + def readable(self) -> bool: """C{asyncore} interface""" return self.state & 1 == 1 - def writable(self): + def writable(self) -> bool: """C{asyncore} interface""" return self.state & 2 == 2 - def handle_read(self): + def handle_read(self) -> None: """C{asyncore} interface""" data = self.recv(self.blocksize) self.inbuff += data @@ -122,6 +151,7 @@ class SCGIConnection(asyncore.dispatcher): self.environ["wsgi.errors"] = self.server.error self.wsgihandler = self.server.wsgiapp(self.environ, self.start_response) + assert self.wsgihandler is not None if isinstance(self.wsgihandler, FileWrapper) and \ self.wsgihandler.can_transfer(): self._try_send_headers() @@ -134,7 +164,12 @@ class SCGIConnection(asyncore.dispatcher): self.reqlen -= len(self.inbuff) self.inbuff = b"" - def start_response(self, status, headers, exc_info=None): + def start_response( + self, + status: str, + headers: HeaderList, + exc_info: OptExcInfo = None, + ) -> WriteCallback: assert isinstance(status, str) assert isinstance(headers, list) if exc_info: @@ -147,7 +182,7 @@ class SCGIConnection(asyncore.dispatcher): self.outheaders = (status, headers) return self._wsgi_write - def send_buff(self): + def send_buff(self) -> None: try: sentbytes = self.send(self.outbuff[:self.blocksize]) except socket.error: @@ -155,11 +190,12 @@ class SCGIConnection(asyncore.dispatcher): else: self.outbuff = self.outbuff[sentbytes:] - def handle_write(self): + def handle_write(self) -> None: """C{asyncore} interface""" if self.state == SCGIConnection.RESP: if len(self.outbuff) < self.blocksize: self._try_send_headers() + assert self.wsgiiterator is not None for data in self.wsgiiterator: assert isinstance(data, bytes) if data: @@ -171,23 +207,26 @@ class SCGIConnection(asyncore.dispatcher): self.send_buff() elif self.state == SCGIConnection.RESPH: assert len(self.outbuff) > 0 + assert isinstance(self.wsgihandler, FileWrapper) self.send_buff() if not self.outbuff: self.state = SCGIConnection.TRANS else: assert self.state == SCGIConnection.TRANS + assert isinstance(self.wsgihandler, FileWrapper) assert self.wsgihandler.can_transfer() sent = self.wsgihandler.transfer(self.socket, self.blocksize) if sent <= 0: self.close() - def close(self): + def close(self) -> None: # None doesn't have a close attribute if hasattr(self.wsgihandler, "close"): + assert self.wsgihandler is not None self.wsgihandler.close() asyncore.dispatcher.close(self) - def handle_close(self): + def handle_close(self) -> None: """C{asyncore} interface""" self.close() @@ -195,33 +234,36 @@ __all__.append("SCGIServer") class SCGIServer(asyncore.dispatcher): """SCGI Server for WSGI applications. It does not use multiple processes or multiple threads.""" - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, - maxrequestsize=None, maxpostsize=None, blocksize=None, - config={}, reusesocket=None): + + def __init__( + self, + wsgiapp: WsgiApp, + port: int, + interface: str = "localhost", + error: typing.TextIO = sys.stderr, + maxrequestsize: typing.Optional[int] = None, + maxpostsize: typing.Optional[int] = None, + blocksize: typing.Optional[int] = None, + config: Environ = {}, + reusesocket: typing.Optional[socket.socket] = None + ): """ @param wsgiapp: is the wsgi application to be run. - @type port: int @param port: is an int representing the TCP port number to be used. - @type interface: str @param interface: is a string specifying the network interface to bind which defaults to C{"localhost"} making the server inaccessible over network. @param error: is a file-like object being passed as C{wsgi.error} in the environ parameter defaulting to stderr. - @type maxrequestsize: int @param maxrequestsize: limit the size of request blocks in scgi connections. Connections are dropped when this limit is hit. - @type maxpostsize: int @param maxpostsize: limit the size of post bodies that may be processed by this instance. Connections are dropped when this limit is hit. - @type blocksize: int @param blocksize: is amount of data to read or write from or to the network at once - @type config: {} @param config: the environ dictionary is updated using these values for each request. - @type reusesocket: None or socket.socket @param reusesocket: If a socket is passed, do not create a socket. Instead use given socket as listen socket. The passed socket must be set up for accepting tcp connections (i.e. C{AF_INET}, @@ -234,7 +276,7 @@ class SCGIServer(asyncore.dispatcher): self.wsgiapp = wsgiapp self.error = error - self.conf = {} + self.conf: Environ = {} if maxrequestsize is not None: self.conf["maxrequestsize"] = maxrequestsize if maxpostsize is not None: @@ -251,7 +293,7 @@ class SCGIServer(asyncore.dispatcher): else: self.accepting = True - def handle_accept(self): + def handle_accept(self) -> None: """asyncore interface""" try: ret = self.accept() @@ -264,7 +306,7 @@ class SCGIServer(asyncore.dispatcher): conn, _ = ret SCGIConnection(self, conn, **self.conf) - def run(self): + def run(self) -> None: """Runs the server. It will not return and you can invoke C{asyncore.loop()} instead achieving the same effect.""" asyncore.loop() diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py index df8a92f..f864a6a 100644 --- a/wsgitools/scgi/forkpool.py +++ b/wsgitools/scgi/forkpool.py @@ -12,21 +12,24 @@ import select import sys import errno import signal +import typing -from wsgitools.internal import bytes2str, str2bytes +from wsgitools.internal import ( + bytes2str, HeaderList, OptExcInfo, str2bytes, WriteCallback, WsgiApp +) from wsgitools.scgi import _convert_environ, FileWrapper __all__ = [] class SocketFileWrapper: """Wraps a socket to a wsgi-compliant file-like object.""" - def __init__(self, sock, toread): + def __init__(self, sock: socket.socket, toread: int): """@param sock: is a C{socket.socket()}""" self.sock = sock self.buff = b"" self.toread = toread - def _recv(self, size=4096): + def _recv(self, size: int = 4096) -> bytes: """ internal method for receiving and counting incoming data @raises socket.error: @@ -44,7 +47,7 @@ class SocketFileWrapper: self.toread -= len(data) return data - def close(self): + def close(self) -> None: """Does not close the socket, because it might still be needed. It reads all data that should have been read as given by C{CONTENT_LENGTH}. """ @@ -55,7 +58,7 @@ class SocketFileWrapper: except socket.error: pass - def read(self, size=None): + def read(self, size: typing.Optional[int] = None) -> bytes: """ see pep333 @raises socket.error: @@ -92,7 +95,7 @@ class SocketFileWrapper: ret, self.buff = self.buff, b"" return ret - def readline(self, size=None): + def readline(self, size: typing.Optional[int] = None) -> bytes: """ see pep333 @raises socket.error: @@ -118,7 +121,7 @@ class SocketFileWrapper: return ret self.buff += data - def readlines(self): + def readlines(self) -> typing.Iterator[bytes]: """ see pep333 @raises socket.error: @@ -127,10 +130,10 @@ class SocketFileWrapper: while data: yield data data = self.readline() - def __iter__(self): + def __iter__(self) -> typing.Iterator[bytes]: """see pep333""" return self - def __next__(self): + def __next__(self) -> bytes: """ see pep333 @raises socket.error: @@ -139,9 +142,9 @@ class SocketFileWrapper: if not data: raise StopIteration return data - def flush(self): + def flush(self) -> None: """see pep333""" - def write(self, data): + def write(self, data: bytes) -> None: """see pep333""" assert isinstance(data, bytes) try: @@ -149,7 +152,7 @@ class SocketFileWrapper: except socket.error: # ignore all socket errors: there is no way to report return - def writelines(self, lines): + def writelines(self, lines: typing.List[bytes]) -> None: """see pep333""" for line in lines: self.write(line) @@ -161,49 +164,51 @@ class SCGIServer: class WorkerState: """state: 0 means idle and 1 means working. These values are also sent as strings '0' and '1' over the socket.""" - def __init__(self, pid, sock, state): - """ - @type pid: int - @type state: int - """ + def __init__(self, pid: int, sock: socket.socket, state: int): self.pid = pid self.sock = sock self.state = state - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, - minworkers=2, maxworkers=32, maxrequests=1000, config={}, - reusesocket=None, cpulimit=None, timelimit=None): + server: typing.Optional[socket.socket] + workers: typing.Dict[int, WorkerState] + sigpipe: typing.Optional[typing.Tuple[socket.socket, socket.socket]] + + def __init__( + self, + wsgiapp: WsgiApp, + port: int, + interface: str = "localhost", + error: typing.TextIO = sys.stderr, + minworkers: int = 2, + maxworkers: int = 32, + maxrequests: int = 1000, + config: typing.Dict[typing.Any, typing.Any] = {}, + reusesocket: typing.Optional[socket.socket] = None, + cpulimit: typing.Optional[typing.Tuple[int, int]] = None, + timelimit: typing.Optional[int] = None, + ): """ @param wsgiapp: is the WSGI application to be run. - @type port: int @param port: is the tcp port to listen on - @type interface: str @param interface: is the interface to bind to (default: C{"localhost"}) @param error: is a file-like object beeing passed as C{wsgi.errors} in environ - @type minworkers: int @param minworkers: is the number of worker processes to spawn - @type maxworkers: int @param maxworkers: is the maximum number of workers that can be spawned on demand - @type maxrequests: int @param maxrequests: is the number of requests a worker processes before dying - @type config: {} @param config: the environ dictionary is updated using these values for each request. - @type reusesocket: None or socket.socket @param reusesocket: If a socket is passed, do not create a socket. Instead use given socket as listen socket. The passed socket must be set up for accepting tcp connections (i.e. C{AF_INET}, C{SOCK_STREAM} with bind and listen called). - @type cpulimit: (int, int) @param cpulimit: a pair of soft and hard cpu time limit in seconds. This limit is installed for each worker using RLIMIT_CPU if resource limits are available to this platform. After reaching the soft limit workers will continue to process the current request and then cleanly terminate. - @type timelimit: int @param timelimit: The maximum number of wall clock seconds processing a request should take. If this is specified, an alarm timer is installed and the default action is to kill the worker. @@ -229,7 +234,7 @@ class SCGIServer: self.running = False self.ischild = False - def enable_sighandler(self, sig=signal.SIGTERM): + def enable_sighandler(self, sig: int = signal.SIGTERM) -> "SCGIServer": """ Changes the signal handler for the given signal to terminate the run() loop. @@ -239,7 +244,7 @@ class SCGIServer: signal.signal(sig, self.shutdownhandler) return self - def run(self): + def run(self) -> None: """ Serve the wsgi application. """ @@ -296,7 +301,7 @@ class SCGIServer: self.sigpipe = None self.killworkers() - def killworkers(self, sig=signal.SIGTERM): + def killworkers(self, sig: int = signal.SIGTERM) -> None: """ Kills all worker children. @param sig: is the signal used to kill the children @@ -307,7 +312,9 @@ class SCGIServer: os.kill(state.pid, sig) # TODO: handle working children with a timeout - def shutdownhandler(self, sig=None, stackframe=None): + def shutdownhandler( + self, sig: typing.Optional[int] = None, stackframe=None + ) -> None: """ Signal handler function for stopping the run() loop. It works by setting a variable that run() evaluates in each loop. As a signal @@ -319,10 +326,13 @@ class SCGIServer: if self.ischild: sys.exit() elif self.running: + assert self.sigpipe is not None self.running = False self.sigpipe[1].send(b' ') - def sigxcpuhandler(self, sig=None, stackframe=None): + def sigxcpuhandler( + self, sig: typing.Optional[int] = None, stackframe=None + ) -> None: """ Signal hanlder function for the SIGXCUP signal. It is sent to a worker when the soft RLIMIT_CPU is crossed. @@ -331,7 +341,7 @@ class SCGIServer: """ self.cpulimit = True - def spawnworker(self): + def spawnworker(self) -> None: """ internal! spawns a single worker """ @@ -366,11 +376,12 @@ class SCGIServer: else: raise RuntimeError("fork failed") - def work(self, worksock): + def work(self, worksock: socket.socket) -> None: """ internal! serves maxrequests times @raises socket.error: """ + assert self.server is not None for _ in range(self.maxrequests): (con, addr) = self.server.accept() # we cannot handle socket.errors here. @@ -384,7 +395,7 @@ class SCGIServer: if self.cpulimit: break - def process(self, con): + def process(self, con: socket.socket) -> None: """ internal! processes a single request on the connection con. """ @@ -439,9 +450,9 @@ class SCGIServer: # 0 -> False: set but unsent # 0 -> True: sent # 1 -> bytes of the complete header - response_head = [None, None] + response_head: typing.List[typing.Any] = [None, None] - def sendheaders(): + def sendheaders() -> None: assert response_head[0] is not None # headers set if response_head[0] != True: response_head[0] = True @@ -450,14 +461,16 @@ class SCGIServer: except socket.error: pass - def dumbsend(data): + def dumbsend(data: bytes) -> None: sendheaders() try: con.sendall(data) except socket.error: pass - def start_response(status, headers, exc_info=None): + def start_response( + status: str, headers: HeaderList, exc_info: OptExcInfo = None + ) -> WriteCallback: if exc_info and response_head[0]: try: raise exc_info[1].with_traceback(exc_info[2]) |