import socket import os import select import sys import errno class SocketFileWrapper: """Wraps a socket to a wsgi-compliant file-like object.""" def __init__(self, sock, toread): """@param sock: is a socket.socket()""" self.sock = sock self.buff = "" self.toread = toread def _recv(self, size=None): """ internal method for receiving and counting incoming data @raise: socket.error """ if size is None: try: data = self.sock.recv() except socket.error, why: if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): data = "" else: raise self.toread -= len(data) return data try: data = self.sock.recv(size) except socket.error, why: if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): data = "" else: raise self.toread -= len(data) return data def close(self): """Does not close the socket, because it might still be needed. It reads all data that should have been read as given by CONTENT_LENGTH.""" try: while self.toread > 0: if self.toread > 4096: self._recv(4096) else: self._recv(self.toread) except socket.error: pass def read(self, size=None): """ see pep333 @raise: socket.error """ if size is None: retl = [] data = self.buff self.buff = "" while data: retl.append(data) try: data = self._recv() except socket.error: break return "".join(retl) while len(self.buff) < size: try: data = self._recv(min(4096, size - len(self.buff))) except socket.error: break self.buff += data if size <= len(self.buff): ret, self.buff = self.buff[:size], self.buff[size:] return ret ret, self.buff = self.buff, "" return ret def readline(self, size=None): """ see pep333 @raise: socket.error """ while True: try: split = self.buff.index('\n') + 1 if size is not None and split > size: split = size ret, self.buff = self.buff[:split], self.buff[split:] return ret except ValueError: if size is not None: if len(self.buff) < size: data = self._recv(size - len(self.buff)) else: ret, self.buff = self.buff[:size], self.buff[size:] return ret else: data = self._recv(4096) if not data: ret, self.buff = self.buff, "" return ret self.buff += data def readlines(self): """ see pep333 @raise: socket.error """ data = self.readline() while data: yield data data = self.readline() def __iter__(self): """see pep333""" return self def next(self): """ see pep333 @raise: socket.error """ data = self.read(4096) if not data: raise StopIteration return data def flush(self): """see pep333""" pass def write(self, data): """see pep333""" assert isinstance(data, str) try: self.sock.sendall(data) except socket.error: # ignore all socket errors: there is no way to report return def writelines(self, lines): """see pep333""" map(self.write, lines) class SCGIServer: """Usage: create an SCGIServer object and invoke the run method which will then turn this process into an scgi server.""" class WorkerState: """state: 0 means idle and 1 means working. These values are also sent as strings '0' and '1' over the socket.""" def __init__(self, pid, sock, state): self.pid = pid self.sock = sock self.state = state def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, minworkers=2, maxworkers=32, maxrequests=1000): """ @param wsgiapp: is the WSGI application to be run. @type port: number @param port: is the tcp port to listen on @type interface: str @param interface: is the interface to bind to (default: "localhost") @param error: is a filelike object beeing passed as wsgi.error in environ @type minworkers: number @param minworkers: is the number of worker processes to spawn @type maxworkers: number @param maxworkers: is the maximum number of workers that can be spawned on demand @type maxrequests: number @param maxrequests: is the number of requests a worker processes before dying """ assert hasattr(error, "write") self.wsgiapp = wsgiapp self.port = port self.interface = interface self.minworkers = minworkers self.maxworkers = maxworkers self.maxrequests = maxrequests self.error = error self.server = None # becomes a socket # maps filedescriptors to WorkerStates self.workers = {} def run(self): """ Serve the wsgi application. """ self.server = socket.socket() self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((self.interface, self.port)) self.server.listen(5) while True: while (len(self.workers) < self.minworkers or # less than min (len(self.workers) < self.maxworkers and # less than max not len([w for w in # no inctive self.workers.values() if w.state == 0]))): self.spawnworker() rs, _, _ = select.select(self.workers.keys(), [], []) for s in rs: try: data = self.workers[s].sock.recv(1) except socket.error: # we cannot handle errors here, so drop the connection. data = '' if data == '': self.workers[s].sock.close() del self.workers[s] elif data in ('0', '1'): self.workers[s].state = int(data) else: raise RuntimeError, "unexpected data from worker" try: pid = 1 while pid > 0: pid, _ = os.waitpid(0, os.WNOHANG) except OSError: pass def spawnworker(self): """ internal! spawns a single worker """ srvsock, worksock = socket.socketpair() pid = os.fork() if pid == 0: # close unneeded sockets srvsock.close() for worker in self.workers.values(): worker.sock.close() del self.workers try: self.work(worksock) except socket.error: pass sys.exit() elif pid > 0: # close unneeded sockets worksock.close() self.workers[srvsock.fileno()] = SCGIServer.\ WorkerState(pid, srvsock, 0) else: raise RuntimeError, "fork failed" def work(self, worksock): """ internal! serves maxrequests times @raise: socket.error """ for _ in range(self.maxrequests): (con, addr) = self.server.accept() # we cannot handle socket.errors here. worksock.sendall('1') # tell server we're working self.process(con) worksock.sendall('0') # tell server we've finished def process(self, con): """ internal! processes a single request on the connection con. """ # This is a little bit ugly: # The server has to send the length of the request followed by a colon. # We assume that 1. the colon is within the first seven bytes. # 2. the packet isn't fragmented. # Furthermore 1 implies that the request isn't longer than 999999 bytes. # This method however works. :-) try: data = con.recv(7) except socket.error: con.close() return if not ':' in data: con.close() return length, data = data.split(':', 1) if not length.isdigit(): # clear protocol violation con.close() return length = long(length) while len(data) != length + 1: # read one byte beyond try: t = con.recv(min(4096, length + 1 - len(data))) except socket.error: con.close() return if not t: # request too short con.close() return data += t # netstrings! data = data.split('\0') # the byte beyond has to be a ','. # and the number of netstrings excluding the final ',' has to be even if data.pop() != ',' or len(data) % 2 != 0: con.close() return environ = {} while data: key = data.pop(0) value = data.pop(0) environ[key] = value def dumbsend(data): try: con.sendall(data) except socket.error: pass def start_response(status, headers, exc_info=None): dumbsend('Status: %s\r\n%s\r\n\r\n' % (status, '\r\n'.join(map("%s: %s".__mod__, headers)))) return dumbsend sfw = SocketFileWrapper(con, long(environ["CONTENT_LENGTH"])) environ.update({ "wsgi.version": (1, 0), "wsgi.input": sfw, "wsgi.errors": self.error, "wsgi.url_scheme": "http", "wsgi.multithread": False, "wsgi.multiprocess": True, "wsgi.run_once": False}) if environ.get("HTTPS", "no").lower() in ('yes', 'y', '1'): environ["wsgi.url_scheme"] = "https" if "HTTP_CONTENT_TYPE" in environ: environ["CONTENT_TYPE"] = environ.pop("HTTP_CONTENT_TYPE") if "HTTP_CONTENT_LENGTH" in environ: del environ["HTTP_CONTENT_LENGTH"] # TODO: better way? result = self.wsgiapp(environ, start_response) assert hasattr(result, "__iter__") for data in result: assert isinstance(data, str) dumbsend(data) if hasattr(result, "close"): result.close() sfw.close() con.close()