summaryrefslogtreecommitdiff
path: root/wsgitools/scgi
diff options
context:
space:
mode:
Diffstat (limited to 'wsgitools/scgi')
-rw-r--r--wsgitools/scgi/__init__.py25
-rw-r--r--wsgitools/scgi/asynchronous.py98
-rw-r--r--wsgitools/scgi/forkpool.py95
3 files changed, 142 insertions, 76 deletions
diff --git a/wsgitools/scgi/__init__.py b/wsgitools/scgi/__init__.py
index e2a68c2..677e3b5 100644
--- a/wsgitools/scgi/__init__.py
+++ b/wsgitools/scgi/__init__.py
@@ -1,5 +1,8 @@
__all__ = []
+import socket
+import typing
+
try:
import sendfile
except ImportError:
@@ -7,6 +10,8 @@ except ImportError:
else:
have_sendfile = True
+from wsgitools.internal import Environ
+
class FileWrapper:
"""
@ivar offset: Initially 0. Becomes -1 when reading using next and
@@ -14,18 +19,20 @@ class FileWrapper:
counts the number of bytes sent. It also ensures that next and
transfer are never mixed.
"""
- def __init__(self, filelike, blksize=8192):
+ def __init__(self, filelike, blksize: int = 8192):
self.filelike = filelike
self.blksize = blksize
self.offset = 0
if hasattr(filelike, "close"):
self.close = filelike.close
- def can_transfer(self):
+ def can_transfer(self) -> bool:
return have_sendfile and hasattr(self.filelike, "fileno") and \
self.offset >= 0
- def transfer(self, sock, blksize=None):
+ def transfer(
+ self, sock: socket.socket, blksize: typing.Optional[int] = None
+ ) -> int:
assert self.offset >= 0
if blksize is None:
blksize = self.blksize
@@ -42,10 +49,10 @@ class FileWrapper:
self.offset += sent
return sent
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[bytes]:
return self
- def __next__(self):
+ def __next__(self) -> bytes:
assert self.offset <= 0
self.offset = -1
data = self.filelike.read(self.blksize)
@@ -53,8 +60,12 @@ class FileWrapper:
return data
raise StopIteration
-def _convert_environ(environ, multithread=False, multiprocess=False,
- run_once=False):
+def _convert_environ(
+ environ: Environ,
+ multithread: bool = False,
+ multiprocess: bool = False,
+ run_once: bool = False,
+) -> None:
environ.update({
"wsgi.version": (1, 0),
"wsgi.url_scheme": "http",
diff --git a/wsgitools/scgi/asynchronous.py b/wsgitools/scgi/asynchronous.py
index 61bbc6b..264e43e 100644
--- a/wsgitools/scgi/asynchronous.py
+++ b/wsgitools/scgi/asynchronous.py
@@ -5,10 +5,24 @@ import io
import socket
import sys
import errno
+import typing
-from wsgitools.internal import bytes2str, str2bytes
+from wsgitools.internal import (
+ bytes2str,
+ Environ,
+ HeaderList,
+ OptExcInfo,
+ str2bytes,
+ WriteCallback,
+ WsgiApp,
+)
from wsgitools.scgi import _convert_environ, FileWrapper
+if sys.version_info >= (3, 8):
+ LiteralTrue = typing.Literal[True]
+else:
+ LiteralTrue = bool
+
class SCGIConnection(asyncore.dispatcher):
"""SCGI connection class used by L{SCGIServer}."""
# connection states
@@ -19,11 +33,27 @@ class SCGIConnection(asyncore.dispatcher):
RESP = 3*4 | 2 # sending response, end state
RESPH = 4*4 | 2 # buffered response headers, sending headers only, to TRANS
TRANS = 5*4 | 2 # transferring using FileWrapper, end state
- def __init__(self, server, connection, maxrequestsize=65536,
- maxpostsize=8<<20, blocksize=4096, config={}):
+
+ outheaders: typing.Union[
+ typing.Tuple[()], # unset
+ typing.Tuple[str, HeaderList], # headers
+ LiteralTrue # sent
+ ]
+ wsgihandler: typing.Optional[typing.Iterable[bytes]]
+ wsgiiterator: typing.Optional[typing.Iterator[bytes]]
+
+ def __init__(
+ self,
+ server: "SCGIServer",
+ connection: socket.socket,
+ maxrequestsize: int = 65536,
+ maxpostsize: int = 8<<20,
+ blocksize: int = 4096,
+ config: Environ = {},
+ ):
asyncore.dispatcher.__init__(self, connection)
- self.server = server # WSGISCGIServer instance
+ self.server = server
self.maxrequestsize = maxrequestsize
self.maxpostsize = maxpostsize
self.blocksize = blocksize
@@ -35,10 +65,9 @@ class SCGIConnection(asyncore.dispatcher):
self.wsgihandler = None # wsgi application
self.wsgiiterator = None # wsgi application iterator
self.outheaders = () # headers to be sent
- # () -> unset, (..,..) -> set, True -> sent
self.body = io.BytesIO() # request body
- def _try_send_headers(self):
+ def _try_send_headers(self) -> None:
if self.outheaders != True:
assert not self.outbuff
status, headers = self.outheaders
@@ -47,7 +76,7 @@ class SCGIConnection(asyncore.dispatcher):
self.outbuff = str2bytes(headdata)
self.outheaders = True
- def _wsgi_write(self, data):
+ def _wsgi_write(self, data: bytes) -> None:
assert self.state >= SCGIConnection.RESP
assert self.state < SCGIConnection.TRANS
assert isinstance(data, bytes)
@@ -55,15 +84,15 @@ class SCGIConnection(asyncore.dispatcher):
self._try_send_headers()
self.outbuff += data
- def readable(self):
+ def readable(self) -> bool:
"""C{asyncore} interface"""
return self.state & 1 == 1
- def writable(self):
+ def writable(self) -> bool:
"""C{asyncore} interface"""
return self.state & 2 == 2
- def handle_read(self):
+ def handle_read(self) -> None:
"""C{asyncore} interface"""
data = self.recv(self.blocksize)
self.inbuff += data
@@ -122,6 +151,7 @@ class SCGIConnection(asyncore.dispatcher):
self.environ["wsgi.errors"] = self.server.error
self.wsgihandler = self.server.wsgiapp(self.environ,
self.start_response)
+ assert self.wsgihandler is not None
if isinstance(self.wsgihandler, FileWrapper) and \
self.wsgihandler.can_transfer():
self._try_send_headers()
@@ -134,7 +164,12 @@ class SCGIConnection(asyncore.dispatcher):
self.reqlen -= len(self.inbuff)
self.inbuff = b""
- def start_response(self, status, headers, exc_info=None):
+ def start_response(
+ self,
+ status: str,
+ headers: HeaderList,
+ exc_info: OptExcInfo = None,
+ ) -> WriteCallback:
assert isinstance(status, str)
assert isinstance(headers, list)
if exc_info:
@@ -147,7 +182,7 @@ class SCGIConnection(asyncore.dispatcher):
self.outheaders = (status, headers)
return self._wsgi_write
- def send_buff(self):
+ def send_buff(self) -> None:
try:
sentbytes = self.send(self.outbuff[:self.blocksize])
except socket.error:
@@ -155,11 +190,12 @@ class SCGIConnection(asyncore.dispatcher):
else:
self.outbuff = self.outbuff[sentbytes:]
- def handle_write(self):
+ def handle_write(self) -> None:
"""C{asyncore} interface"""
if self.state == SCGIConnection.RESP:
if len(self.outbuff) < self.blocksize:
self._try_send_headers()
+ assert self.wsgiiterator is not None
for data in self.wsgiiterator:
assert isinstance(data, bytes)
if data:
@@ -171,23 +207,26 @@ class SCGIConnection(asyncore.dispatcher):
self.send_buff()
elif self.state == SCGIConnection.RESPH:
assert len(self.outbuff) > 0
+ assert isinstance(self.wsgihandler, FileWrapper)
self.send_buff()
if not self.outbuff:
self.state = SCGIConnection.TRANS
else:
assert self.state == SCGIConnection.TRANS
+ assert isinstance(self.wsgihandler, FileWrapper)
assert self.wsgihandler.can_transfer()
sent = self.wsgihandler.transfer(self.socket, self.blocksize)
if sent <= 0:
self.close()
- def close(self):
+ def close(self) -> None:
# None doesn't have a close attribute
if hasattr(self.wsgihandler, "close"):
+ assert self.wsgihandler is not None
self.wsgihandler.close()
asyncore.dispatcher.close(self)
- def handle_close(self):
+ def handle_close(self) -> None:
"""C{asyncore} interface"""
self.close()
@@ -195,33 +234,36 @@ __all__.append("SCGIServer")
class SCGIServer(asyncore.dispatcher):
"""SCGI Server for WSGI applications. It does not use multiple processes or
multiple threads."""
- def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr,
- maxrequestsize=None, maxpostsize=None, blocksize=None,
- config={}, reusesocket=None):
+
+ def __init__(
+ self,
+ wsgiapp: WsgiApp,
+ port: int,
+ interface: str = "localhost",
+ error: typing.TextIO = sys.stderr,
+ maxrequestsize: typing.Optional[int] = None,
+ maxpostsize: typing.Optional[int] = None,
+ blocksize: typing.Optional[int] = None,
+ config: Environ = {},
+ reusesocket: typing.Optional[socket.socket] = None
+ ):
"""
@param wsgiapp: is the wsgi application to be run.
- @type port: int
@param port: is an int representing the TCP port number to be used.
- @type interface: str
@param interface: is a string specifying the network interface to bind
which defaults to C{"localhost"} making the server inaccessible
over network.
@param error: is a file-like object being passed as C{wsgi.error} in the
environ parameter defaulting to stderr.
- @type maxrequestsize: int
@param maxrequestsize: limit the size of request blocks in scgi
connections. Connections are dropped when this limit is hit.
- @type maxpostsize: int
@param maxpostsize: limit the size of post bodies that may be processed
by this instance. Connections are dropped when this limit is
hit.
- @type blocksize: int
@param blocksize: is amount of data to read or write from or to the
network at once
- @type config: {}
@param config: the environ dictionary is updated using these values for
each request.
- @type reusesocket: None or socket.socket
@param reusesocket: If a socket is passed, do not create a socket.
Instead use given socket as listen socket. The passed socket
must be set up for accepting tcp connections (i.e. C{AF_INET},
@@ -234,7 +276,7 @@ class SCGIServer(asyncore.dispatcher):
self.wsgiapp = wsgiapp
self.error = error
- self.conf = {}
+ self.conf: Environ = {}
if maxrequestsize is not None:
self.conf["maxrequestsize"] = maxrequestsize
if maxpostsize is not None:
@@ -251,7 +293,7 @@ class SCGIServer(asyncore.dispatcher):
else:
self.accepting = True
- def handle_accept(self):
+ def handle_accept(self) -> None:
"""asyncore interface"""
try:
ret = self.accept()
@@ -264,7 +306,7 @@ class SCGIServer(asyncore.dispatcher):
conn, _ = ret
SCGIConnection(self, conn, **self.conf)
- def run(self):
+ def run(self) -> None:
"""Runs the server. It will not return and you can invoke
C{asyncore.loop()} instead achieving the same effect."""
asyncore.loop()
diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py
index df8a92f..f864a6a 100644
--- a/wsgitools/scgi/forkpool.py
+++ b/wsgitools/scgi/forkpool.py
@@ -12,21 +12,24 @@ import select
import sys
import errno
import signal
+import typing
-from wsgitools.internal import bytes2str, str2bytes
+from wsgitools.internal import (
+ bytes2str, HeaderList, OptExcInfo, str2bytes, WriteCallback, WsgiApp
+)
from wsgitools.scgi import _convert_environ, FileWrapper
__all__ = []
class SocketFileWrapper:
"""Wraps a socket to a wsgi-compliant file-like object."""
- def __init__(self, sock, toread):
+ def __init__(self, sock: socket.socket, toread: int):
"""@param sock: is a C{socket.socket()}"""
self.sock = sock
self.buff = b""
self.toread = toread
- def _recv(self, size=4096):
+ def _recv(self, size: int = 4096) -> bytes:
"""
internal method for receiving and counting incoming data
@raises socket.error:
@@ -44,7 +47,7 @@ class SocketFileWrapper:
self.toread -= len(data)
return data
- def close(self):
+ def close(self) -> None:
"""Does not close the socket, because it might still be needed. It
reads all data that should have been read as given by C{CONTENT_LENGTH}.
"""
@@ -55,7 +58,7 @@ class SocketFileWrapper:
except socket.error:
pass
- def read(self, size=None):
+ def read(self, size: typing.Optional[int] = None) -> bytes:
"""
see pep333
@raises socket.error:
@@ -92,7 +95,7 @@ class SocketFileWrapper:
ret, self.buff = self.buff, b""
return ret
- def readline(self, size=None):
+ def readline(self, size: typing.Optional[int] = None) -> bytes:
"""
see pep333
@raises socket.error:
@@ -118,7 +121,7 @@ class SocketFileWrapper:
return ret
self.buff += data
- def readlines(self):
+ def readlines(self) -> typing.Iterator[bytes]:
"""
see pep333
@raises socket.error:
@@ -127,10 +130,10 @@ class SocketFileWrapper:
while data:
yield data
data = self.readline()
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[bytes]:
"""see pep333"""
return self
- def __next__(self):
+ def __next__(self) -> bytes:
"""
see pep333
@raises socket.error:
@@ -139,9 +142,9 @@ class SocketFileWrapper:
if not data:
raise StopIteration
return data
- def flush(self):
+ def flush(self) -> None:
"""see pep333"""
- def write(self, data):
+ def write(self, data: bytes) -> None:
"""see pep333"""
assert isinstance(data, bytes)
try:
@@ -149,7 +152,7 @@ class SocketFileWrapper:
except socket.error:
# ignore all socket errors: there is no way to report
return
- def writelines(self, lines):
+ def writelines(self, lines: typing.List[bytes]) -> None:
"""see pep333"""
for line in lines:
self.write(line)
@@ -161,49 +164,51 @@ class SCGIServer:
class WorkerState:
"""state: 0 means idle and 1 means working.
These values are also sent as strings '0' and '1' over the socket."""
- def __init__(self, pid, sock, state):
- """
- @type pid: int
- @type state: int
- """
+ def __init__(self, pid: int, sock: socket.socket, state: int):
self.pid = pid
self.sock = sock
self.state = state
- def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr,
- minworkers=2, maxworkers=32, maxrequests=1000, config={},
- reusesocket=None, cpulimit=None, timelimit=None):
+ server: typing.Optional[socket.socket]
+ workers: typing.Dict[int, WorkerState]
+ sigpipe: typing.Optional[typing.Tuple[socket.socket, socket.socket]]
+
+ def __init__(
+ self,
+ wsgiapp: WsgiApp,
+ port: int,
+ interface: str = "localhost",
+ error: typing.TextIO = sys.stderr,
+ minworkers: int = 2,
+ maxworkers: int = 32,
+ maxrequests: int = 1000,
+ config: typing.Dict[typing.Any, typing.Any] = {},
+ reusesocket: typing.Optional[socket.socket] = None,
+ cpulimit: typing.Optional[typing.Tuple[int, int]] = None,
+ timelimit: typing.Optional[int] = None,
+ ):
"""
@param wsgiapp: is the WSGI application to be run.
- @type port: int
@param port: is the tcp port to listen on
- @type interface: str
@param interface: is the interface to bind to (default: C{"localhost"})
@param error: is a file-like object beeing passed as C{wsgi.errors} in
environ
- @type minworkers: int
@param minworkers: is the number of worker processes to spawn
- @type maxworkers: int
@param maxworkers: is the maximum number of workers that can be spawned
on demand
- @type maxrequests: int
@param maxrequests: is the number of requests a worker processes before
dying
- @type config: {}
@param config: the environ dictionary is updated using these values for
each request.
- @type reusesocket: None or socket.socket
@param reusesocket: If a socket is passed, do not create a socket.
Instead use given socket as listen socket. The passed socket
must be set up for accepting tcp connections (i.e. C{AF_INET},
C{SOCK_STREAM} with bind and listen called).
- @type cpulimit: (int, int)
@param cpulimit: a pair of soft and hard cpu time limit in seconds.
This limit is installed for each worker using RLIMIT_CPU if
resource limits are available to this platform. After reaching
the soft limit workers will continue to process the current
request and then cleanly terminate.
- @type timelimit: int
@param timelimit: The maximum number of wall clock seconds processing
a request should take. If this is specified, an alarm timer is
installed and the default action is to kill the worker.
@@ -229,7 +234,7 @@ class SCGIServer:
self.running = False
self.ischild = False
- def enable_sighandler(self, sig=signal.SIGTERM):
+ def enable_sighandler(self, sig: int = signal.SIGTERM) -> "SCGIServer":
"""
Changes the signal handler for the given signal to terminate the run()
loop.
@@ -239,7 +244,7 @@ class SCGIServer:
signal.signal(sig, self.shutdownhandler)
return self
- def run(self):
+ def run(self) -> None:
"""
Serve the wsgi application.
"""
@@ -296,7 +301,7 @@ class SCGIServer:
self.sigpipe = None
self.killworkers()
- def killworkers(self, sig=signal.SIGTERM):
+ def killworkers(self, sig: int = signal.SIGTERM) -> None:
"""
Kills all worker children.
@param sig: is the signal used to kill the children
@@ -307,7 +312,9 @@ class SCGIServer:
os.kill(state.pid, sig)
# TODO: handle working children with a timeout
- def shutdownhandler(self, sig=None, stackframe=None):
+ def shutdownhandler(
+ self, sig: typing.Optional[int] = None, stackframe=None
+ ) -> None:
"""
Signal handler function for stopping the run() loop. It works by
setting a variable that run() evaluates in each loop. As a signal
@@ -319,10 +326,13 @@ class SCGIServer:
if self.ischild:
sys.exit()
elif self.running:
+ assert self.sigpipe is not None
self.running = False
self.sigpipe[1].send(b' ')
- def sigxcpuhandler(self, sig=None, stackframe=None):
+ def sigxcpuhandler(
+ self, sig: typing.Optional[int] = None, stackframe=None
+ ) -> None:
"""
Signal hanlder function for the SIGXCUP signal. It is sent to a
worker when the soft RLIMIT_CPU is crossed.
@@ -331,7 +341,7 @@ class SCGIServer:
"""
self.cpulimit = True
- def spawnworker(self):
+ def spawnworker(self) -> None:
"""
internal! spawns a single worker
"""
@@ -366,11 +376,12 @@ class SCGIServer:
else:
raise RuntimeError("fork failed")
- def work(self, worksock):
+ def work(self, worksock: socket.socket) -> None:
"""
internal! serves maxrequests times
@raises socket.error:
"""
+ assert self.server is not None
for _ in range(self.maxrequests):
(con, addr) = self.server.accept()
# we cannot handle socket.errors here.
@@ -384,7 +395,7 @@ class SCGIServer:
if self.cpulimit:
break
- def process(self, con):
+ def process(self, con: socket.socket) -> None:
"""
internal! processes a single request on the connection con.
"""
@@ -439,9 +450,9 @@ class SCGIServer:
# 0 -> False: set but unsent
# 0 -> True: sent
# 1 -> bytes of the complete header
- response_head = [None, None]
+ response_head: typing.List[typing.Any] = [None, None]
- def sendheaders():
+ def sendheaders() -> None:
assert response_head[0] is not None # headers set
if response_head[0] != True:
response_head[0] = True
@@ -450,14 +461,16 @@ class SCGIServer:
except socket.error:
pass
- def dumbsend(data):
+ def dumbsend(data: bytes) -> None:
sendheaders()
try:
con.sendall(data)
except socket.error:
pass
- def start_response(status, headers, exc_info=None):
+ def start_response(
+ status: str, headers: HeaderList, exc_info: OptExcInfo = None
+ ) -> WriteCallback:
if exc_info and response_head[0]:
try:
raise exc_info[1].with_traceback(exc_info[2])