""" The L{forkpool.SCGIServer} adapts a wsgi application to a scgi service. It works with multiple processes that are periodically cleaned up to prevent memory leaks having an impact to the system. """ import socket import os import select import sys import errno class SocketFileWrapper: """Wraps a socket to a wsgi-compliant file-like object.""" def __init__(self, sock, toread): """@param sock: is a C{socket.socket()}""" self.sock = sock self.buff = "" self.toread = toread def _recv(self, size=None): """ internal method for receiving and counting incoming data @raise socket.error: """ if size is None: try: data = self.sock.recv() except socket.error, why: if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): data = "" else: raise self.toread -= len(data) return data try: data = self.sock.recv(size) except socket.error, why: if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): data = "" else: raise self.toread -= len(data) return data def close(self): """Does not close the socket, because it might still be needed. It reads all data that should have been read as given by C{CONTENT_LENGTH}. """ try: while self.toread > 0: if self.toread > 4096: self._recv(4096) else: self._recv(self.toread) except socket.error: pass def read(self, size=None): """ see pep333 @raise socket.error: """ if size is None: retl = [] data = self.buff self.buff = "" while data: retl.append(data) try: data = self._recv() except socket.error: break return "".join(retl) datalist = [self.buff] datalen = len(self.buff) while datalen < size: try: data = self._recv(min(4096, size - datalen)) except socket.error: break if not data: break datalist.append(data) datalen += len(data) self.buff = "".join(datalist) if size <= len(self.buff): ret, self.buff = self.buff[:size], self.buff[size:] return ret ret, self.buff = self.buff, "" return ret def readline(self, size=None): """ see pep333 @raise socket.error: """ while True: try: split = self.buff.index('\n') + 1 if size is not None and split > size: split = size ret, self.buff = self.buff[:split], self.buff[split:] return ret except ValueError: if size is not None: if len(self.buff) < size: data = self._recv(size - len(self.buff)) else: ret, self.buff = self.buff[:size], self.buff[size:] return ret else: data = self._recv(4096) if not data: ret, self.buff = self.buff, "" return ret self.buff += data def readlines(self): """ see pep333 @raise socket.error: """ data = self.readline() while data: yield data data = self.readline() def __iter__(self): """see pep333""" return self def next(self): """ see pep333 @raise socket.error: """ data = self.read(4096) if not data: raise StopIteration return data def flush(self): """see pep333""" pass def write(self, data): """see pep333""" assert isinstance(data, str) try: self.sock.sendall(data) except socket.error: # ignore all socket errors: there is no way to report return def writelines(self, lines): """see pep333""" map(self.write, lines) class SCGIServer: """Usage: create an L{SCGIServer} object and invoke the run method which will then turn this process into an scgi server.""" class WorkerState: """state: 0 means idle and 1 means working. These values are also sent as strings '0' and '1' over the socket.""" def __init__(self, pid, sock, state): self.pid = pid self.sock = sock self.state = state def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, minworkers=2, maxworkers=32, maxrequests=1000): """ @param wsgiapp: is the WSGI application to be run. @type port: int @param port: is the tcp port to listen on @type interface: str @param interface: is the interface to bind to (default: C{"localhost"}) @param error: is a file-like object beeing passed as C{wsgi.error} in environ @type minworkers: int @param minworkers: is the number of worker processes to spawn @type maxworkers: int @param maxworkers: is the maximum number of workers that can be spawned on demand @type maxrequests: int @param maxrequests: is the number of requests a worker processes before dying """ assert hasattr(error, "write") self.wsgiapp = wsgiapp self.port = port self.interface = interface self.minworkers = minworkers self.maxworkers = maxworkers self.maxrequests = maxrequests self.error = error self.server = None # becomes a socket # maps filedescriptors to WorkerStates self.workers = {} def run(self): """ Serve the wsgi application. """ self.server = socket.socket() self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((self.interface, self.port)) self.server.listen(5) while True: while (len(self.workers) < self.minworkers or # less than min (len(self.workers) < self.maxworkers and # less than max not len([w for w in # no inactive self.workers.values() if w.state == 0]))): self.spawnworker() rs, _, _ = select.select(self.workers.keys(), [], []) for s in rs: try: data = self.workers[s].sock.recv(1) except socket.error: # we cannot handle errors here, so drop the connection. data = '' if data == '': self.workers[s].sock.close() del self.workers[s] elif data in ('0', '1'): self.workers[s].state = int(data) else: raise RuntimeError, "unexpected data from worker" try: pid = 1 while pid > 0: pid, _ = os.waitpid(0, os.WNOHANG) except OSError: pass def spawnworker(self): """ internal! spawns a single worker """ srvsock, worksock = socket.socketpair() pid = os.fork() if pid == 0: # close unneeded sockets srvsock.close() for worker in self.workers.values(): worker.sock.close() del self.workers try: self.work(worksock) except socket.error: pass sys.exit() elif pid > 0: # close unneeded sockets worksock.close() self.workers[srvsock.fileno()] = SCGIServer.\ WorkerState(pid, srvsock, 0) else: raise RuntimeError, "fork failed" def work(self, worksock): """ internal! serves maxrequests times @raise socket.error: """ for _ in range(self.maxrequests): (con, addr) = self.server.accept() # we cannot handle socket.errors here. worksock.sendall('1') # tell server we're working self.process(con) worksock.sendall('0') # tell server we've finished def process(self, con): """ internal! processes a single request on the connection con. """ # This is a little bit ugly: # The server has to send the length of the request followed by a colon. # We assume that 1. the colon is within the first seven bytes. # 2. the packet isn't fragmented. # Furthermore 1 implies that the request isn't longer than 999999 bytes. # This method however works. :-) try: data = con.recv(7) except socket.error: con.close() return if not ':' in data: con.close() return length, data = data.split(':', 1) if not length.isdigit(): # clear protocol violation con.close() return length = long(length) while len(data) != length + 1: # read one byte beyond try: t = con.recv(min(4096, length + 1 - len(data))) except socket.error: con.close() return if not t: # request too short con.close() return data += t # netstrings! data = data.split('\0') # the byte beyond has to be a ','. # and the number of netstrings excluding the final ',' has to be even if data.pop() != ',' or len(data) % 2 != 0: con.close() return environ = {} while data: key = data.pop(0) value = data.pop(0) environ[key] = value def dumbsend(data): try: con.sendall(data) except socket.error: pass def start_response(status, headers, exc_info=None): dumbsend('Status: %s\r\n%s\r\n\r\n' % (status, '\r\n'.join(map("%s: %s".__mod__, headers)))) return dumbsend if not environ.get("CONTENT_LENGTH", "bad").isdigit(): con.close() return sfw = SocketFileWrapper(con, long(environ["CONTENT_LENGTH"])) environ.update({ "wsgi.version": (1, 0), "wsgi.input": sfw, "wsgi.errors": self.error, "wsgi.url_scheme": "http", "wsgi.multithread": False, "wsgi.multiprocess": True, "wsgi.run_once": False}) if environ.get("HTTPS", "no").lower() in ('yes', 'y', '1'): environ["wsgi.url_scheme"] = "https" if "HTTP_CONTENT_TYPE" in environ: environ["CONTENT_TYPE"] = environ.pop("HTTP_CONTENT_TYPE") if "HTTP_CONTENT_LENGTH" in environ: del environ["HTTP_CONTENT_LENGTH"] # TODO: better way? result = self.wsgiapp(environ, start_response) assert hasattr(result, "__iter__") for data in result: assert isinstance(data, str) dumbsend(data) if hasattr(result, "close"): result.close() sfw.close() con.close()