summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2007-04-17 18:38:13 +0200
committerHelmut Grohne <helmut@subdivi.de>2007-04-17 18:38:13 +0200
commitdb9d4aa502f88fa39de69142517b4ab5f1a0d5ab (patch)
treee20ee9f0a2aa398cdd243ce9f14f88fcaca57fe4
parent179420b66aa54a676dbd5ce77dff28688d6a9d1d (diff)
downloadwsgitools-db9d4aa502f88fa39de69142517b4ab5f1a0d5ab.tar.gz
added wsgitools.scgi.forkpool
wsgitools.scgi is now known as wsgitools.scgi.asynchronous
-rw-r--r--wsgitools/scgi/__init__.py0
-rw-r--r--wsgitools/scgi/asynchronous.py (renamed from wsgitools/scgi.py)0
-rw-r--r--wsgitools/scgi/forkpool.py225
3 files changed, 225 insertions, 0 deletions
diff --git a/wsgitools/scgi/__init__.py b/wsgitools/scgi/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wsgitools/scgi/__init__.py
diff --git a/wsgitools/scgi.py b/wsgitools/scgi/asynchronous.py
index 51349a0..51349a0 100644
--- a/wsgitools/scgi.py
+++ b/wsgitools/scgi/asynchronous.py
diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py
new file mode 100644
index 0000000..1ff2bee
--- /dev/null
+++ b/wsgitools/scgi/forkpool.py
@@ -0,0 +1,225 @@
+import socket
+import os
+import select
+import sys
+
+class SocketFileWrapper:
+ """Wraps a socket to a wsgi-compliant file-like object."""
+ def __init__(self, sock):
+ """@param sock is a socket.socket()"""
+ self.sock = sock
+ self.buff = ""
+ def read(self, size=None):
+ """see pep333"""
+ if size is None:
+ try:
+ return self.buff + self.sock.recv()
+ except: raise # Yes, this is necessary.
+ else:
+ self.buff = ""
+ if size >= len(self.buff):
+ ret, self.buff = self.buff[:size], self.buff[size:]
+ return ret
+ try:
+ return self.buff + self.sock.recv(size)
+ except: raise # Yes, this is necessary.
+ else:
+ self.buff = ""
+ def readline(self):
+ """see pep333"""
+ while True:
+ try:
+ split = self.buff.index('\n') + 1
+ ret, self.buff = self.buff[:split], self.buff[split:]
+ return ret
+ except ValueError:
+ data = self.sock.recv(4096)
+ if not data:
+ ret, self.buff = self.buff, ""
+ return ret
+ self.buff += data
+ def readlines(self):
+ """see pep333"""
+ data = self.readline()
+ while data:
+ yield data
+ data = self.readline()
+ def __iter__(self):
+ """see pep333"""
+ return self
+ def next(self):
+ """see pep333"""
+ data = self.read(4096)
+ if not data:
+ raise StopIteration
+ return data
+ def flush(self):
+ """see pep333"""
+ pass
+ def write(self, data):
+ """see pep333"""
+ self.sock.send(data)
+ def writelines(self, lines):
+ """see pep333"""
+ map(self.write, lines)
+
+class SCGIServer:
+ """Usage: create an SCGIServer object and invoke the run method which will
+ then turn this process into an scgi server."""
+ class WorkerState:
+ """state: 0 means idle and 1 means working.
+ These values are also sent as strings '0' and '1' over the socket."""
+ def __init__(self, pid, sock, state):
+ self.pid = pid
+ self.sock = sock
+ self.state = state
+
+ def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr,
+ minworkers=2, maxworkers=32, maxrequests=1000):
+ """Parameters:
+ wsgiapp is the WSGI application to be run.
+ port is the tcp port to listen on
+ interface is the interface to bind to (default: "localhost")
+ error is a filelike object beeing passed as wsgi.error in environ
+ minworkers is the number of worker processes to spawn
+ maxworkers is the maximum number of workers that can be spawned on demand
+ maxrequests is the number of requests a worker processes before dying
+ """
+ self.wsgiapp = wsgiapp
+ self.port = port
+ self.interface = interface
+ self.minworkers = minworkers
+ self.maxworkers = maxworkers
+ self.maxrequests = maxrequests
+ self.error = error
+ self.server = None # becomes a socket
+ # maps filedescriptors to WorkerStates
+ self.workers = {}
+
+ def run(self):
+ """
+ Serve the wsgi application.
+ """
+ self.server = socket.socket()
+ self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.server.bind((self.interface, self.port))
+ self.server.listen(5)
+ while True:
+ while (len(self.workers) < self.minworkers or # less than min
+ (len(self.workers) < self.maxworkers and # less than max
+ not len([w for w in # no inctive
+ self.workers.values() if w.state == 0]))):
+ self.spawnworker()
+ rs, _, _ = select.select(self.workers.keys(), [], [])
+ for s in rs:
+ data = self.workers[s].sock.recv(1)
+ if data == '':
+ self.workers[s].sock.close()
+ del self.workers[s]
+ elif data in ('0', '1'):
+ self.workers[s].state = int(data)
+ else:
+ raise RuntimeError, "unexpected data from worker"
+ try:
+ pid = 1
+ while pid > 0:
+ pid, _ = os.waitpid(0, os.WNOHANG)
+ except OSError:
+ pass
+
+ def spawnworker(self):
+ """
+ internal! spawns a single worker
+ """
+ srvsock, worksock = socket.socketpair()
+
+ pid = os.fork()
+ if pid == 0:
+ # close unneeded sockets
+ srvsock.close()
+ for worker in self.workers.values():
+ worker.sock.close()
+ del self.workers
+
+ self.work(worksock)
+
+ sys.exit()
+ elif pid > 0:
+ # close unneeded sockets
+ worksock.close()
+
+ self.workers[srvsock.fileno()] = SCGIServer.\
+ WorkerState(pid, srvsock, 0)
+ else:
+ raise RuntimeError, "fork failed"
+
+ def work(self, worksock):
+ """
+ internal! serves maxrequests times
+ """
+ for _ in range(self.maxrequests):
+ (con, addr) = self.server.accept()
+ worksock.send('1') # tell server we're working
+ self.process(con)
+ worksock.send('0') # tell server we've finished
+
+ def process(self, con):
+ """
+ internal! processes a single request on the connection con.
+ """
+ # This is a little bit ugly:
+ # The server has to send the length of the request followed by a colon.
+ # We assume that 1. the colon is within the first seven bytes.
+ # 2. the packet isn't fragmented.
+ # Furthermore 1 implies that the request isn't longer than 999999 bytes.
+ # This method however works. :-)
+ data = con.recv(7)
+ if not ':' in data:
+ con.close()
+ return
+ length, data = data.split(':', 1)
+ if not length.isdigit(): # clear protocol violation
+ con.close()
+ return
+ length = long(length)
+
+ while len(data) != length + 1: # read one byte beyond
+ t = con.recv(length + 1 - len(data))
+ if not t: # request too short
+ con.close()
+ return
+ data += t
+
+ # netstrings!
+ data = data.split('\0')
+ # the byte beyond has to be a ','.
+ # and the number of netstrings excluding the final ',' has to be even
+ if data.pop() != ',' or len(data) % 2 != 0:
+ con.close()
+ return
+
+ environ = {}
+ while data:
+ key = data.pop(0)
+ value = data.pop(0)
+ environ[key] = value
+
+ def start_response(status, headers):
+ con.send('Status: %s\r\n%s\r\n\r\n' % (status,
+ '\r\n'.join(map("%s: %s".__mod__, headers))))
+ return con.send
+
+ environ.update({
+ "wsgi.version": (1, 0),
+ "wsgi.input": SocketFileWrapper(con),
+ "wsgi.errors": self.error,
+ "wsgi.url_scheme": "http", # TODO: this is wrong
+ "wsgi.multithread": False,
+ "wsgi.multiprocess": True,
+ "wsgi.run_once": False})
+ result = self.wsgiapp(environ, start_response)
+ for data in result:
+ con.send(data)
+ if hasattr(result, "close"):
+ result.close()
+ con.close()