From db9d4aa502f88fa39de69142517b4ab5f1a0d5ab Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Tue, 17 Apr 2007 18:38:13 +0200 Subject: added wsgitools.scgi.forkpool wsgitools.scgi is now known as wsgitools.scgi.asynchronous --- wsgitools/scgi.py | 201 ------------------------------------ wsgitools/scgi/__init__.py | 0 wsgitools/scgi/asynchronous.py | 201 ++++++++++++++++++++++++++++++++++++ wsgitools/scgi/forkpool.py | 225 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 426 insertions(+), 201 deletions(-) delete mode 100644 wsgitools/scgi.py create mode 100644 wsgitools/scgi/__init__.py create mode 100644 wsgitools/scgi/asynchronous.py create mode 100644 wsgitools/scgi/forkpool.py diff --git a/wsgitools/scgi.py b/wsgitools/scgi.py deleted file mode 100644 index 51349a0..0000000 --- a/wsgitools/scgi.py +++ /dev/null @@ -1,201 +0,0 @@ -__all__ = [] - -import asyncore -import socket -import sys -try: - import cStringIO as StringIO -except ImportError: - import StringIO - -class SCGIConnection(asyncore.dispatcher): - """SCGI connection class used by WSGISCGIServer.""" - # maximum request size - MAX_REQUEST_SIZE = 65536 - # maximum post size - MAX_POST_SIZE = 8 << 20 - # read and write size - BLOCK_SIZE = 4096 - # connection states - NEW = 0*4 | 1 # connection established, waiting for request - HEADER = 1*4 | 1 # the request length was received, waiting for the rest - BODY = 2*4 | 1 # the request header was received, waiting for the body - REQ = 3*4 | 2 # request received, sending response - def __init__(self, server, connection, addr): - self.server = server # WSGISCGIServer instance - self.addr = addr # scgi client address - self.state = SCGIConnection.NEW # internal state - self.environ = {} # environment passed to wsgi app - self.reqlen = -1 # request length used in two different meanings - self.inbuff = "" # input buffer - self.outbuff = "" # output buffer - self.wsgihandler = None # wsgi application iterator - self.outheaders = () # headers to be sent - # () -> unset, (..,..) -> set, True -> sent - self.body = StringIO.StringIO() # request body - asyncore.dispatcher.__init__(self, connection) - - def _wsgi_headers(self): - return {"wsgi.version": (1, 0), - "wsgi.input": self.body, - "wsgi.errors": self.server.error, - "wsgi.url_scheme": "http", # TODO: this is wrong - "wsgi.multithread": False, - "wsgi.multiprocess": False, - "wsgi.run_once": False} - - def _try_send_headers(self): - if self.outheaders != True: - assert not self.outbuff - status, headers = self.outheaders - headdata = "".join(map("%s: %s\r\n".__mod__, headers)) - self.outbuff = "Status: %s\r\n%s\r\n" % (status, headdata) - self.outheaders = True - - def _wsgi_write(self, data): - assert self.state >= SCGIConnection.REQ - if data: - self._try_send_headers() - self.outbuff += data - - def readable(self): - """asyncore interface""" - return self.state & 1 == 1 - - def writable(self): - """asyncore interface""" - return self.state & 2 == 2 - - def handle_read(self): - """asyncore interface""" - data = self.recv(self.BLOCK_SIZE) - self.inbuff += data - if self.state == SCGIConnection.NEW: - if ':' in self.inbuff: - reqlen, self.inbuff = self.inbuff.split(':', 1) - if not reqlen.isdigit(): - self.close() - return # invalid request format - reqlen = long(reqlen) - if reqlen > self.MAX_REQUEST_SIZE: - self.close() - return # request too long - self.reqlen = reqlen - self.state = SCGIConnection.HEADER - elif len(self.inbuff) > self.MAX_REQUEST_SIZE: - self.close() - return # request too long - - if self.state == SCGIConnection.HEADER: - buff = self.inbuff[:self.reqlen] - remainder = self.inbuff[self.reqlen:] - - while buff.count('\0') >= 2: - key, value, buff = buff.split('\0', 2) - self.environ[key] = value - self.reqlen -= len(key) + len(value) + 2 - - self.inbuff = buff + remainder - - if self.reqlen == 0: - if self.inbuff.startswith(','): - self.inbuff = self.inbuff[1:] - self.reqlen = long(self.environ["CONTENT_LENGTH"]) - if self.reqlen > self.MAX_POST_SIZE: - self.close() - return - self.state = SCGIConnection.BODY - else: - self.close() - return # protocol violation - - if self.state == SCGIConnection.BODY: - if len(self.inbuff) >= self.reqlen: - self.body.write(self.inbuff[:self.reqlen]) - self.body.seek(0) - self.inbuff = "" - self.reqlen = 0 - self.environ.update(self._wsgi_headers()) - if "HTTP_CONTENT_TYPE" in self.environ: - self.environ["CONTENT_TYPE"] = \ - self.environ.pop("HTTP_CONTENT_TYPE") - if "HTTP_CONTENT_LENGTH" in self.environ: - del self.environ["HTTP_CONTENT_LENGTH"] # TODO: better way? - self.wsgihandler = iter(self.server.wsgiapp( - self.environ, self.start_response)) - self.state = SCGIConnection.REQ - else: - self.body.write(self.inbuff) - self.reqlen -= len(self.inbuff) - self.inbuff = "" - - def start_response(self, status, headers, exc_info=None): - if exc_info: - if self.outheaders == True: - try: - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None - assert self.outheaders != True # unsent - self.outheaders = (status, headers) - return self._wsgi_write - - def handle_write(self): - """asyncore interface""" - assert self.state >= SCGIConnection.REQ - if len(self.outbuff) < self.BLOCK_SIZE: - for data in self.wsgihandler: - self.outbuff += data - if len(self.outbuff) >= self.BLOCK_SIZE: - self._try_send_headers() - break - if len(self.outbuff) == 0: - if hasattr(self.wsgihandler, "close"): - self.wsgihandler.close() - self.close() - return - try: - sentbytes = self.send(self.outbuff[:self.BLOCK_SIZE]) - except socket.error: - if hasattr(self.wsgihandler, "close"): - self.wsgihandler.close() - self.close() - return - self.outbuff = self.outbuff[sentbytes:] - - def handle_close(self): - """asyncore interface""" - self.close() - -__all__.append("SCGIServer") -class SCGIServer(asyncore.dispatcher): - """SCGI Server for WSGI applications. It does not use multiple processes or - multiple threads.""" - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr): - """wsgiapp is the wsgi application to be run. - port is an int representing the TCP port number to be used. - interface is a string specifying the network interface to bind which - defaults to "localhost" making the server inaccessible over - network. - error is a file-like object being passed as wsgi.error in the environ - parameter defaulting to stderr.""" - asyncore.dispatcher.__init__(self) - self.wsgiapp = wsgiapp - self.error = error - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind((interface, port)) - self.listen(5) - - def handle_accept(self): - """asyncore interface""" - ret = self.accept() - if ret is not None: - conn, addr = ret - SCGIConnection(self, conn, addr) - - def run(self): - """Runs the server. It will not return and you can invoke - asyncore.loop() instead achieving the same effect.""" - asyncore.loop() - diff --git a/wsgitools/scgi/__init__.py b/wsgitools/scgi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wsgitools/scgi/asynchronous.py b/wsgitools/scgi/asynchronous.py new file mode 100644 index 0000000..51349a0 --- /dev/null +++ b/wsgitools/scgi/asynchronous.py @@ -0,0 +1,201 @@ +__all__ = [] + +import asyncore +import socket +import sys +try: + import cStringIO as StringIO +except ImportError: + import StringIO + +class SCGIConnection(asyncore.dispatcher): + """SCGI connection class used by WSGISCGIServer.""" + # maximum request size + MAX_REQUEST_SIZE = 65536 + # maximum post size + MAX_POST_SIZE = 8 << 20 + # read and write size + BLOCK_SIZE = 4096 + # connection states + NEW = 0*4 | 1 # connection established, waiting for request + HEADER = 1*4 | 1 # the request length was received, waiting for the rest + BODY = 2*4 | 1 # the request header was received, waiting for the body + REQ = 3*4 | 2 # request received, sending response + def __init__(self, server, connection, addr): + self.server = server # WSGISCGIServer instance + self.addr = addr # scgi client address + self.state = SCGIConnection.NEW # internal state + self.environ = {} # environment passed to wsgi app + self.reqlen = -1 # request length used in two different meanings + self.inbuff = "" # input buffer + self.outbuff = "" # output buffer + self.wsgihandler = None # wsgi application iterator + self.outheaders = () # headers to be sent + # () -> unset, (..,..) -> set, True -> sent + self.body = StringIO.StringIO() # request body + asyncore.dispatcher.__init__(self, connection) + + def _wsgi_headers(self): + return {"wsgi.version": (1, 0), + "wsgi.input": self.body, + "wsgi.errors": self.server.error, + "wsgi.url_scheme": "http", # TODO: this is wrong + "wsgi.multithread": False, + "wsgi.multiprocess": False, + "wsgi.run_once": False} + + def _try_send_headers(self): + if self.outheaders != True: + assert not self.outbuff + status, headers = self.outheaders + headdata = "".join(map("%s: %s\r\n".__mod__, headers)) + self.outbuff = "Status: %s\r\n%s\r\n" % (status, headdata) + self.outheaders = True + + def _wsgi_write(self, data): + assert self.state >= SCGIConnection.REQ + if data: + self._try_send_headers() + self.outbuff += data + + def readable(self): + """asyncore interface""" + return self.state & 1 == 1 + + def writable(self): + """asyncore interface""" + return self.state & 2 == 2 + + def handle_read(self): + """asyncore interface""" + data = self.recv(self.BLOCK_SIZE) + self.inbuff += data + if self.state == SCGIConnection.NEW: + if ':' in self.inbuff: + reqlen, self.inbuff = self.inbuff.split(':', 1) + if not reqlen.isdigit(): + self.close() + return # invalid request format + reqlen = long(reqlen) + if reqlen > self.MAX_REQUEST_SIZE: + self.close() + return # request too long + self.reqlen = reqlen + self.state = SCGIConnection.HEADER + elif len(self.inbuff) > self.MAX_REQUEST_SIZE: + self.close() + return # request too long + + if self.state == SCGIConnection.HEADER: + buff = self.inbuff[:self.reqlen] + remainder = self.inbuff[self.reqlen:] + + while buff.count('\0') >= 2: + key, value, buff = buff.split('\0', 2) + self.environ[key] = value + self.reqlen -= len(key) + len(value) + 2 + + self.inbuff = buff + remainder + + if self.reqlen == 0: + if self.inbuff.startswith(','): + self.inbuff = self.inbuff[1:] + self.reqlen = long(self.environ["CONTENT_LENGTH"]) + if self.reqlen > self.MAX_POST_SIZE: + self.close() + return + self.state = SCGIConnection.BODY + else: + self.close() + return # protocol violation + + if self.state == SCGIConnection.BODY: + if len(self.inbuff) >= self.reqlen: + self.body.write(self.inbuff[:self.reqlen]) + self.body.seek(0) + self.inbuff = "" + self.reqlen = 0 + self.environ.update(self._wsgi_headers()) + if "HTTP_CONTENT_TYPE" in self.environ: + self.environ["CONTENT_TYPE"] = \ + self.environ.pop("HTTP_CONTENT_TYPE") + if "HTTP_CONTENT_LENGTH" in self.environ: + del self.environ["HTTP_CONTENT_LENGTH"] # TODO: better way? + self.wsgihandler = iter(self.server.wsgiapp( + self.environ, self.start_response)) + self.state = SCGIConnection.REQ + else: + self.body.write(self.inbuff) + self.reqlen -= len(self.inbuff) + self.inbuff = "" + + def start_response(self, status, headers, exc_info=None): + if exc_info: + if self.outheaders == True: + try: + raise exc_info[0], exc_info[1], exc_info[2] + finally: + exc_info = None + assert self.outheaders != True # unsent + self.outheaders = (status, headers) + return self._wsgi_write + + def handle_write(self): + """asyncore interface""" + assert self.state >= SCGIConnection.REQ + if len(self.outbuff) < self.BLOCK_SIZE: + for data in self.wsgihandler: + self.outbuff += data + if len(self.outbuff) >= self.BLOCK_SIZE: + self._try_send_headers() + break + if len(self.outbuff) == 0: + if hasattr(self.wsgihandler, "close"): + self.wsgihandler.close() + self.close() + return + try: + sentbytes = self.send(self.outbuff[:self.BLOCK_SIZE]) + except socket.error: + if hasattr(self.wsgihandler, "close"): + self.wsgihandler.close() + self.close() + return + self.outbuff = self.outbuff[sentbytes:] + + def handle_close(self): + """asyncore interface""" + self.close() + +__all__.append("SCGIServer") +class SCGIServer(asyncore.dispatcher): + """SCGI Server for WSGI applications. It does not use multiple processes or + multiple threads.""" + def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr): + """wsgiapp is the wsgi application to be run. + port is an int representing the TCP port number to be used. + interface is a string specifying the network interface to bind which + defaults to "localhost" making the server inaccessible over + network. + error is a file-like object being passed as wsgi.error in the environ + parameter defaulting to stderr.""" + asyncore.dispatcher.__init__(self) + self.wsgiapp = wsgiapp + self.error = error + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((interface, port)) + self.listen(5) + + def handle_accept(self): + """asyncore interface""" + ret = self.accept() + if ret is not None: + conn, addr = ret + SCGIConnection(self, conn, addr) + + def run(self): + """Runs the server. It will not return and you can invoke + asyncore.loop() instead achieving the same effect.""" + asyncore.loop() + diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py new file mode 100644 index 0000000..1ff2bee --- /dev/null +++ b/wsgitools/scgi/forkpool.py @@ -0,0 +1,225 @@ +import socket +import os +import select +import sys + +class SocketFileWrapper: + """Wraps a socket to a wsgi-compliant file-like object.""" + def __init__(self, sock): + """@param sock is a socket.socket()""" + self.sock = sock + self.buff = "" + def read(self, size=None): + """see pep333""" + if size is None: + try: + return self.buff + self.sock.recv() + except: raise # Yes, this is necessary. + else: + self.buff = "" + if size >= len(self.buff): + ret, self.buff = self.buff[:size], self.buff[size:] + return ret + try: + return self.buff + self.sock.recv(size) + except: raise # Yes, this is necessary. + else: + self.buff = "" + def readline(self): + """see pep333""" + while True: + try: + split = self.buff.index('\n') + 1 + ret, self.buff = self.buff[:split], self.buff[split:] + return ret + except ValueError: + data = self.sock.recv(4096) + if not data: + ret, self.buff = self.buff, "" + return ret + self.buff += data + def readlines(self): + """see pep333""" + data = self.readline() + while data: + yield data + data = self.readline() + def __iter__(self): + """see pep333""" + return self + def next(self): + """see pep333""" + data = self.read(4096) + if not data: + raise StopIteration + return data + def flush(self): + """see pep333""" + pass + def write(self, data): + """see pep333""" + self.sock.send(data) + def writelines(self, lines): + """see pep333""" + map(self.write, lines) + +class SCGIServer: + """Usage: create an SCGIServer object and invoke the run method which will + then turn this process into an scgi server.""" + class WorkerState: + """state: 0 means idle and 1 means working. + These values are also sent as strings '0' and '1' over the socket.""" + def __init__(self, pid, sock, state): + self.pid = pid + self.sock = sock + self.state = state + + def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, + minworkers=2, maxworkers=32, maxrequests=1000): + """Parameters: + wsgiapp is the WSGI application to be run. + port is the tcp port to listen on + interface is the interface to bind to (default: "localhost") + error is a filelike object beeing passed as wsgi.error in environ + minworkers is the number of worker processes to spawn + maxworkers is the maximum number of workers that can be spawned on demand + maxrequests is the number of requests a worker processes before dying + """ + self.wsgiapp = wsgiapp + self.port = port + self.interface = interface + self.minworkers = minworkers + self.maxworkers = maxworkers + self.maxrequests = maxrequests + self.error = error + self.server = None # becomes a socket + # maps filedescriptors to WorkerStates + self.workers = {} + + def run(self): + """ + Serve the wsgi application. + """ + self.server = socket.socket() + self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server.bind((self.interface, self.port)) + self.server.listen(5) + while True: + while (len(self.workers) < self.minworkers or # less than min + (len(self.workers) < self.maxworkers and # less than max + not len([w for w in # no inctive + self.workers.values() if w.state == 0]))): + self.spawnworker() + rs, _, _ = select.select(self.workers.keys(), [], []) + for s in rs: + data = self.workers[s].sock.recv(1) + if data == '': + self.workers[s].sock.close() + del self.workers[s] + elif data in ('0', '1'): + self.workers[s].state = int(data) + else: + raise RuntimeError, "unexpected data from worker" + try: + pid = 1 + while pid > 0: + pid, _ = os.waitpid(0, os.WNOHANG) + except OSError: + pass + + def spawnworker(self): + """ + internal! spawns a single worker + """ + srvsock, worksock = socket.socketpair() + + pid = os.fork() + if pid == 0: + # close unneeded sockets + srvsock.close() + for worker in self.workers.values(): + worker.sock.close() + del self.workers + + self.work(worksock) + + sys.exit() + elif pid > 0: + # close unneeded sockets + worksock.close() + + self.workers[srvsock.fileno()] = SCGIServer.\ + WorkerState(pid, srvsock, 0) + else: + raise RuntimeError, "fork failed" + + def work(self, worksock): + """ + internal! serves maxrequests times + """ + for _ in range(self.maxrequests): + (con, addr) = self.server.accept() + worksock.send('1') # tell server we're working + self.process(con) + worksock.send('0') # tell server we've finished + + def process(self, con): + """ + internal! processes a single request on the connection con. + """ + # This is a little bit ugly: + # The server has to send the length of the request followed by a colon. + # We assume that 1. the colon is within the first seven bytes. + # 2. the packet isn't fragmented. + # Furthermore 1 implies that the request isn't longer than 999999 bytes. + # This method however works. :-) + data = con.recv(7) + if not ':' in data: + con.close() + return + length, data = data.split(':', 1) + if not length.isdigit(): # clear protocol violation + con.close() + return + length = long(length) + + while len(data) != length + 1: # read one byte beyond + t = con.recv(length + 1 - len(data)) + if not t: # request too short + con.close() + return + data += t + + # netstrings! + data = data.split('\0') + # the byte beyond has to be a ','. + # and the number of netstrings excluding the final ',' has to be even + if data.pop() != ',' or len(data) % 2 != 0: + con.close() + return + + environ = {} + while data: + key = data.pop(0) + value = data.pop(0) + environ[key] = value + + def start_response(status, headers): + con.send('Status: %s\r\n%s\r\n\r\n' % (status, + '\r\n'.join(map("%s: %s".__mod__, headers)))) + return con.send + + environ.update({ + "wsgi.version": (1, 0), + "wsgi.input": SocketFileWrapper(con), + "wsgi.errors": self.error, + "wsgi.url_scheme": "http", # TODO: this is wrong + "wsgi.multithread": False, + "wsgi.multiprocess": True, + "wsgi.run_once": False}) + result = self.wsgiapp(environ, start_response) + for data in result: + con.send(data) + if hasattr(result, "close"): + result.close() + con.close() -- cgit v1.2.3