summaryrefslogtreecommitdiff
path: root/wsgitools/scgi
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2007-04-17 18:38:13 +0200
committerHelmut Grohne <helmut@subdivi.de>2007-04-17 18:38:13 +0200
commitdb9d4aa502f88fa39de69142517b4ab5f1a0d5ab (patch)
treee20ee9f0a2aa398cdd243ce9f14f88fcaca57fe4 /wsgitools/scgi
parent179420b66aa54a676dbd5ce77dff28688d6a9d1d (diff)
downloadwsgitools-db9d4aa502f88fa39de69142517b4ab5f1a0d5ab.tar.gz
added wsgitools.scgi.forkpool
wsgitools.scgi is now known as wsgitools.scgi.asynchronous
Diffstat (limited to 'wsgitools/scgi')
-rw-r--r--wsgitools/scgi/__init__.py0
-rw-r--r--wsgitools/scgi/asynchronous.py201
-rw-r--r--wsgitools/scgi/forkpool.py225
3 files changed, 426 insertions, 0 deletions
diff --git a/wsgitools/scgi/__init__.py b/wsgitools/scgi/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wsgitools/scgi/__init__.py
diff --git a/wsgitools/scgi/asynchronous.py b/wsgitools/scgi/asynchronous.py
new file mode 100644
index 0000000..51349a0
--- /dev/null
+++ b/wsgitools/scgi/asynchronous.py
@@ -0,0 +1,201 @@
+__all__ = []
+
+import asyncore
+import socket
+import sys
+try:
+ import cStringIO as StringIO
+except ImportError:
+ import StringIO
+
+class SCGIConnection(asyncore.dispatcher):
+ """SCGI connection class used by WSGISCGIServer."""
+ # maximum request size
+ MAX_REQUEST_SIZE = 65536
+ # maximum post size
+ MAX_POST_SIZE = 8 << 20
+ # read and write size
+ BLOCK_SIZE = 4096
+ # connection states
+ NEW = 0*4 | 1 # connection established, waiting for request
+ HEADER = 1*4 | 1 # the request length was received, waiting for the rest
+ BODY = 2*4 | 1 # the request header was received, waiting for the body
+ REQ = 3*4 | 2 # request received, sending response
+ def __init__(self, server, connection, addr):
+ self.server = server # WSGISCGIServer instance
+ self.addr = addr # scgi client address
+ self.state = SCGIConnection.NEW # internal state
+ self.environ = {} # environment passed to wsgi app
+ self.reqlen = -1 # request length used in two different meanings
+ self.inbuff = "" # input buffer
+ self.outbuff = "" # output buffer
+ self.wsgihandler = None # wsgi application iterator
+ self.outheaders = () # headers to be sent
+ # () -> unset, (..,..) -> set, True -> sent
+ self.body = StringIO.StringIO() # request body
+ asyncore.dispatcher.__init__(self, connection)
+
+ def _wsgi_headers(self):
+ return {"wsgi.version": (1, 0),
+ "wsgi.input": self.body,
+ "wsgi.errors": self.server.error,
+ "wsgi.url_scheme": "http", # TODO: this is wrong
+ "wsgi.multithread": False,
+ "wsgi.multiprocess": False,
+ "wsgi.run_once": False}
+
+ def _try_send_headers(self):
+ if self.outheaders != True:
+ assert not self.outbuff
+ status, headers = self.outheaders
+ headdata = "".join(map("%s: %s\r\n".__mod__, headers))
+ self.outbuff = "Status: %s\r\n%s\r\n" % (status, headdata)
+ self.outheaders = True
+
+ def _wsgi_write(self, data):
+ assert self.state >= SCGIConnection.REQ
+ if data:
+ self._try_send_headers()
+ self.outbuff += data
+
+ def readable(self):
+ """asyncore interface"""
+ return self.state & 1 == 1
+
+ def writable(self):
+ """asyncore interface"""
+ return self.state & 2 == 2
+
+ def handle_read(self):
+ """asyncore interface"""
+ data = self.recv(self.BLOCK_SIZE)
+ self.inbuff += data
+ if self.state == SCGIConnection.NEW:
+ if ':' in self.inbuff:
+ reqlen, self.inbuff = self.inbuff.split(':', 1)
+ if not reqlen.isdigit():
+ self.close()
+ return # invalid request format
+ reqlen = long(reqlen)
+ if reqlen > self.MAX_REQUEST_SIZE:
+ self.close()
+ return # request too long
+ self.reqlen = reqlen
+ self.state = SCGIConnection.HEADER
+ elif len(self.inbuff) > self.MAX_REQUEST_SIZE:
+ self.close()
+ return # request too long
+
+ if self.state == SCGIConnection.HEADER:
+ buff = self.inbuff[:self.reqlen]
+ remainder = self.inbuff[self.reqlen:]
+
+ while buff.count('\0') >= 2:
+ key, value, buff = buff.split('\0', 2)
+ self.environ[key] = value
+ self.reqlen -= len(key) + len(value) + 2
+
+ self.inbuff = buff + remainder
+
+ if self.reqlen == 0:
+ if self.inbuff.startswith(','):
+ self.inbuff = self.inbuff[1:]
+ self.reqlen = long(self.environ["CONTENT_LENGTH"])
+ if self.reqlen > self.MAX_POST_SIZE:
+ self.close()
+ return
+ self.state = SCGIConnection.BODY
+ else:
+ self.close()
+ return # protocol violation
+
+ if self.state == SCGIConnection.BODY:
+ if len(self.inbuff) >= self.reqlen:
+ self.body.write(self.inbuff[:self.reqlen])
+ self.body.seek(0)
+ self.inbuff = ""
+ self.reqlen = 0
+ self.environ.update(self._wsgi_headers())
+ if "HTTP_CONTENT_TYPE" in self.environ:
+ self.environ["CONTENT_TYPE"] = \
+ self.environ.pop("HTTP_CONTENT_TYPE")
+ if "HTTP_CONTENT_LENGTH" in self.environ:
+ del self.environ["HTTP_CONTENT_LENGTH"] # TODO: better way?
+ self.wsgihandler = iter(self.server.wsgiapp(
+ self.environ, self.start_response))
+ self.state = SCGIConnection.REQ
+ else:
+ self.body.write(self.inbuff)
+ self.reqlen -= len(self.inbuff)
+ self.inbuff = ""
+
+ def start_response(self, status, headers, exc_info=None):
+ if exc_info:
+ if self.outheaders == True:
+ try:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None
+ assert self.outheaders != True # unsent
+ self.outheaders = (status, headers)
+ return self._wsgi_write
+
+ def handle_write(self):
+ """asyncore interface"""
+ assert self.state >= SCGIConnection.REQ
+ if len(self.outbuff) < self.BLOCK_SIZE:
+ for data in self.wsgihandler:
+ self.outbuff += data
+ if len(self.outbuff) >= self.BLOCK_SIZE:
+ self._try_send_headers()
+ break
+ if len(self.outbuff) == 0:
+ if hasattr(self.wsgihandler, "close"):
+ self.wsgihandler.close()
+ self.close()
+ return
+ try:
+ sentbytes = self.send(self.outbuff[:self.BLOCK_SIZE])
+ except socket.error:
+ if hasattr(self.wsgihandler, "close"):
+ self.wsgihandler.close()
+ self.close()
+ return
+ self.outbuff = self.outbuff[sentbytes:]
+
+ def handle_close(self):
+ """asyncore interface"""
+ self.close()
+
+__all__.append("SCGIServer")
+class SCGIServer(asyncore.dispatcher):
+ """SCGI Server for WSGI applications. It does not use multiple processes or
+ multiple threads."""
+ def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr):
+ """wsgiapp is the wsgi application to be run.
+ port is an int representing the TCP port number to be used.
+ interface is a string specifying the network interface to bind which
+ defaults to "localhost" making the server inaccessible over
+ network.
+ error is a file-like object being passed as wsgi.error in the environ
+ parameter defaulting to stderr."""
+ asyncore.dispatcher.__init__(self)
+ self.wsgiapp = wsgiapp
+ self.error = error
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind((interface, port))
+ self.listen(5)
+
+ def handle_accept(self):
+ """asyncore interface"""
+ ret = self.accept()
+ if ret is not None:
+ conn, addr = ret
+ SCGIConnection(self, conn, addr)
+
+ def run(self):
+ """Runs the server. It will not return and you can invoke
+ asyncore.loop() instead achieving the same effect."""
+ asyncore.loop()
+
diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py
new file mode 100644
index 0000000..1ff2bee
--- /dev/null
+++ b/wsgitools/scgi/forkpool.py
@@ -0,0 +1,225 @@
+import socket
+import os
+import select
+import sys
+
+class SocketFileWrapper:
+ """Wraps a socket to a wsgi-compliant file-like object."""
+ def __init__(self, sock):
+ """@param sock is a socket.socket()"""
+ self.sock = sock
+ self.buff = ""
+ def read(self, size=None):
+ """see pep333"""
+ if size is None:
+ try:
+ return self.buff + self.sock.recv()
+ except: raise # Yes, this is necessary.
+ else:
+ self.buff = ""
+ if size >= len(self.buff):
+ ret, self.buff = self.buff[:size], self.buff[size:]
+ return ret
+ try:
+ return self.buff + self.sock.recv(size)
+ except: raise # Yes, this is necessary.
+ else:
+ self.buff = ""
+ def readline(self):
+ """see pep333"""
+ while True:
+ try:
+ split = self.buff.index('\n') + 1
+ ret, self.buff = self.buff[:split], self.buff[split:]
+ return ret
+ except ValueError:
+ data = self.sock.recv(4096)
+ if not data:
+ ret, self.buff = self.buff, ""
+ return ret
+ self.buff += data
+ def readlines(self):
+ """see pep333"""
+ data = self.readline()
+ while data:
+ yield data
+ data = self.readline()
+ def __iter__(self):
+ """see pep333"""
+ return self
+ def next(self):
+ """see pep333"""
+ data = self.read(4096)
+ if not data:
+ raise StopIteration
+ return data
+ def flush(self):
+ """see pep333"""
+ pass
+ def write(self, data):
+ """see pep333"""
+ self.sock.send(data)
+ def writelines(self, lines):
+ """see pep333"""
+ map(self.write, lines)
+
+class SCGIServer:
+ """Usage: create an SCGIServer object and invoke the run method which will
+ then turn this process into an scgi server."""
+ class WorkerState:
+ """state: 0 means idle and 1 means working.
+ These values are also sent as strings '0' and '1' over the socket."""
+ def __init__(self, pid, sock, state):
+ self.pid = pid
+ self.sock = sock
+ self.state = state
+
+ def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr,
+ minworkers=2, maxworkers=32, maxrequests=1000):
+ """Parameters:
+ wsgiapp is the WSGI application to be run.
+ port is the tcp port to listen on
+ interface is the interface to bind to (default: "localhost")
+ error is a filelike object beeing passed as wsgi.error in environ
+ minworkers is the number of worker processes to spawn
+ maxworkers is the maximum number of workers that can be spawned on demand
+ maxrequests is the number of requests a worker processes before dying
+ """
+ self.wsgiapp = wsgiapp
+ self.port = port
+ self.interface = interface
+ self.minworkers = minworkers
+ self.maxworkers = maxworkers
+ self.maxrequests = maxrequests
+ self.error = error
+ self.server = None # becomes a socket
+ # maps filedescriptors to WorkerStates
+ self.workers = {}
+
+ def run(self):
+ """
+ Serve the wsgi application.
+ """
+ self.server = socket.socket()
+ self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.server.bind((self.interface, self.port))
+ self.server.listen(5)
+ while True:
+ while (len(self.workers) < self.minworkers or # less than min
+ (len(self.workers) < self.maxworkers and # less than max
+ not len([w for w in # no inctive
+ self.workers.values() if w.state == 0]))):
+ self.spawnworker()
+ rs, _, _ = select.select(self.workers.keys(), [], [])
+ for s in rs:
+ data = self.workers[s].sock.recv(1)
+ if data == '':
+ self.workers[s].sock.close()
+ del self.workers[s]
+ elif data in ('0', '1'):
+ self.workers[s].state = int(data)
+ else:
+ raise RuntimeError, "unexpected data from worker"
+ try:
+ pid = 1
+ while pid > 0:
+ pid, _ = os.waitpid(0, os.WNOHANG)
+ except OSError:
+ pass
+
+ def spawnworker(self):
+ """
+ internal! spawns a single worker
+ """
+ srvsock, worksock = socket.socketpair()
+
+ pid = os.fork()
+ if pid == 0:
+ # close unneeded sockets
+ srvsock.close()
+ for worker in self.workers.values():
+ worker.sock.close()
+ del self.workers
+
+ self.work(worksock)
+
+ sys.exit()
+ elif pid > 0:
+ # close unneeded sockets
+ worksock.close()
+
+ self.workers[srvsock.fileno()] = SCGIServer.\
+ WorkerState(pid, srvsock, 0)
+ else:
+ raise RuntimeError, "fork failed"
+
+ def work(self, worksock):
+ """
+ internal! serves maxrequests times
+ """
+ for _ in range(self.maxrequests):
+ (con, addr) = self.server.accept()
+ worksock.send('1') # tell server we're working
+ self.process(con)
+ worksock.send('0') # tell server we've finished
+
+ def process(self, con):
+ """
+ internal! processes a single request on the connection con.
+ """
+ # This is a little bit ugly:
+ # The server has to send the length of the request followed by a colon.
+ # We assume that 1. the colon is within the first seven bytes.
+ # 2. the packet isn't fragmented.
+ # Furthermore 1 implies that the request isn't longer than 999999 bytes.
+ # This method however works. :-)
+ data = con.recv(7)
+ if not ':' in data:
+ con.close()
+ return
+ length, data = data.split(':', 1)
+ if not length.isdigit(): # clear protocol violation
+ con.close()
+ return
+ length = long(length)
+
+ while len(data) != length + 1: # read one byte beyond
+ t = con.recv(length + 1 - len(data))
+ if not t: # request too short
+ con.close()
+ return
+ data += t
+
+ # netstrings!
+ data = data.split('\0')
+ # the byte beyond has to be a ','.
+ # and the number of netstrings excluding the final ',' has to be even
+ if data.pop() != ',' or len(data) % 2 != 0:
+ con.close()
+ return
+
+ environ = {}
+ while data:
+ key = data.pop(0)
+ value = data.pop(0)
+ environ[key] = value
+
+ def start_response(status, headers):
+ con.send('Status: %s\r\n%s\r\n\r\n' % (status,
+ '\r\n'.join(map("%s: %s".__mod__, headers))))
+ return con.send
+
+ environ.update({
+ "wsgi.version": (1, 0),
+ "wsgi.input": SocketFileWrapper(con),
+ "wsgi.errors": self.error,
+ "wsgi.url_scheme": "http", # TODO: this is wrong
+ "wsgi.multithread": False,
+ "wsgi.multiprocess": True,
+ "wsgi.run_once": False})
+ result = self.wsgiapp(environ, start_response)
+ for data in result:
+ con.send(data)
+ if hasattr(result, "close"):
+ result.close()
+ con.close()