""" The L{forkpool.SCGIServer} adapts a wsgi application to a scgi service. It works with multiple processes that are periodically cleaned up to prevent memory leaks having an impact to the system. """ try: import resource except ImportError: resource = None import socket import os import select import sys import errno import signal from wsgitools.scgi import _convert_environ, FileWrapper if sys.version_info[0] >= 3: def exc_info_for_raise(exc_info): return exc_info[0](exc_info[1]).with_traceback(exc_info[2]) else: def exc_info_for_raise(exc_info): return exc_info[0], exc_info[1], exc_info[2] __all__ = [] class SocketFileWrapper: """Wraps a socket to a wsgi-compliant file-like object.""" def __init__(self, sock, toread): """@param sock: is a C{socket.socket()}""" self.sock = sock self.buff = "" self.toread = toread def _recv(self, size=4096): """ internal method for receiving and counting incoming data @raise socket.error: """ toread = min(size, self.toread) if not toread: return "" try: data = self.sock.recv(toread) except socket.error, why: if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): data = "" else: raise self.toread -= len(data) return data def close(self): """Does not close the socket, because it might still be needed. It reads all data that should have been read as given by C{CONTENT_LENGTH}. """ try: while self.toread > 0: if not self._recv(min(self.toread, 4096)): return except socket.error: pass def read(self, size=None): """ see pep333 @raise socket.error: """ if size is None: retl = [] data = self.buff self.buff = "" while True: retl.append(data) try: data = self._recv() except socket.error: break if not data: break return "".join(retl) datalist = [self.buff] datalen = len(self.buff) while datalen < size: try: data = self._recv(min(4096, size - datalen)) except socket.error: break if not data: break datalist.append(data) datalen += len(data) self.buff = "".join(datalist) if size <= len(self.buff): ret, self.buff = self.buff[:size], self.buff[size:] return ret ret, self.buff = self.buff, "" return ret def readline(self, size=None): """ see pep333 @raise socket.error: """ while True: try: split = self.buff.index('\n') + 1 if size is not None and split > size: split = size ret, self.buff = self.buff[:split], self.buff[split:] return ret except ValueError: if size is not None: if len(self.buff) < size: data = self._recv(size - len(self.buff)) else: ret, self.buff = self.buff[:size], self.buff[size:] return ret else: data = self._recv(4096) if not data: ret, self.buff = self.buff, "" return ret self.buff += data def readlines(self): """ see pep333 @raise socket.error: """ data = self.readline() while data: yield data data = self.readline() def __iter__(self): """see pep333""" return self def next(self): """ see pep333 @raise socket.error: """ data = self.read(4096) if not data: raise StopIteration return data def flush(self): """see pep333""" pass def write(self, data): """see pep333""" assert isinstance(data, str) try: self.sock.sendall(data) except socket.error: # ignore all socket errors: there is no way to report return def writelines(self, lines): """see pep333""" for line in lines: self.write(line) __all__.append("SCGIServer") class SCGIServer: """Usage: create an L{SCGIServer} object and invoke the run method which will then turn this process into an scgi server.""" class WorkerState: """state: 0 means idle and 1 means working. These values are also sent as strings '0' and '1' over the socket.""" def __init__(self, pid, sock, state): """ @type pid: int @type state: int """ self.pid = pid self.sock = sock self.state = state def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, minworkers=2, maxworkers=32, maxrequests=1000, config={}, reusesocket=None, cpulimit=None, timelimit=None): """ @param wsgiapp: is the WSGI application to be run. @type port: int @param port: is the tcp port to listen on @type interface: str @param interface: is the interface to bind to (default: C{"localhost"}) @param error: is a file-like object beeing passed as C{wsgi.errors} in environ @type minworkers: int @param minworkers: is the number of worker processes to spawn @type maxworkers: int @param maxworkers: is the maximum number of workers that can be spawned on demand @type maxrequests: int @param maxrequests: is the number of requests a worker processes before dying @type config: {} @param config: the environ dictionary is updated using these values for each request. @type reusesocket: None or socket.socket @param reusesocket: If a socket is passed, do not create a socket. Instead use given socket as listen socket. The passed socket must be set up for accepting tcp connections (i.e. C{AF_INET}, C{SOCK_STREAM} with bind and listen called). @type cpulimit: (int, int) @param cpulimit: a pair of soft and hard cpu time limit in seconds. This limit is installed for each worker using RLIMIT_CPU if resource limits are available to this platform. After reaching the soft limit workers will continue to process the current request and then cleanly terminate. @type timelimit: int @param timelimit: The maximum number of wall clock seconds processing a request should take. If this is specified, an alarm timer is installed and the default action is to kill the worker. """ assert hasattr(error, "write") self.wsgiapp = wsgiapp self.bind_address = (interface, port) self.minworkers = minworkers self.maxworkers = maxworkers self.maxrequests = maxrequests self.config = config.copy() self.config["wsgi.errors"] = error self.reusesocket = reusesocket # cpulimit changes meaning: # master: None or a tuple denoting the limit to be configured. # worker: boolean denoting whether the limit is reached. self.cpulimit = cpulimit self.timelimit = timelimit self.server = None # becomes a socket # maps filedescriptors to WorkerStates self.workers = {} self.running = False self.ischild = False def enable_sighandler(self, sig=signal.SIGTERM): """ Changes the signal handler for the given signal to terminate the run() loop. @param sig: is the signal to handle @returns: self """ signal.signal(sig, self.shutdownhandler) return self def run(self): """ Serve the wsgi application. """ if self.reusesocket is None: self.server = socket.socket() self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind(self.bind_address) self.server.listen(5) else: self.server = self.reusesocket self.running = True while self.running: while (len(self.workers) < self.minworkers or # less than min (len(self.workers) < self.maxworkers and # less than max not len([w for w in # no inactive self.workers.values() if w.state == 0]))): self.spawnworker() try: rs, _, _ = select.select(self.workers.keys(), [], []) except select.error, e: if e[0] != errno.EINTR: raise rs = [] for s in rs: try: data = self.workers[s].sock.recv(1) except socket.error: # we cannot handle errors here, so drop the connection. data = '' if data == '': self.workers[s].sock.close() del self.workers[s] elif data in ('0', '1'): self.workers[s].state = int(data) else: raise RuntimeError("unexpected data from worker") try: pid = 1 while pid > 0: pid, _ = os.waitpid(0, os.WNOHANG) except OSError: pass if self.reusesocket is None: self.server.close() self.server = None self.killworkers() def killworkers(self, sig=signal.SIGTERM): """ Kills all worker children. @param sig: is the signal used to kill the children """ while self.workers: _, state = self.workers.popitem() state.sock.close() os.kill(state.pid, sig) # TODO: handle working children with a timeout def shutdownhandler(self, sig=None, stackframe=None): """ Signal handler function for stopping the run() loop. It works by setting a variable that run() evaluates in each loop. As a signal interrupts accept the loop is terminated, the accepting socket is closed and the workers are killed. @param sig: ignored for usage with signal.signal @param stackframe: ignored for usage with signal.signal """ if self.ischild: sys.exit() else: self.running = False def sigxcpuhandler(self, sig=None, stackframe=None): """ Signal hanlder function for the SIGXCUP signal. It is sent to a worker when the soft RLIMIT_CPU is crossed. @param sig: ignored for usage with signal.signal @param stackframe: ignored for usage with signal.signal """ self.cpulimit = True def spawnworker(self): """ internal! spawns a single worker """ srvsock, worksock = socket.socketpair() pid = os.fork() if pid == 0: self.ischild = True # close unneeded sockets srvsock.close() for worker in self.workers.values(): worker.sock.close() del self.workers if self.cpulimit and resource: signal.signal(signal.SIGXCPU, self.sigxcpuhandler) resource.setrlimit(resource.RLIMIT_CPU, self.cpulimit) self.cpulimit = False try: self.work(worksock) except socket.error: pass sys.exit() elif pid > 0: # close unneeded sockets worksock.close() self.workers[srvsock.fileno()] = SCGIServer.\ WorkerState(pid, srvsock, 0) else: raise RuntimeError("fork failed") def work(self, worksock): """ internal! serves maxrequests times @raise socket.error: """ for _ in range(self.maxrequests): (con, addr) = self.server.accept() # we cannot handle socket.errors here. worksock.sendall('1') # tell server we're working if self.timelimit: signal.alarm(self.timelimit) self.process(con) if self.timelimit: signal.alarm(0) worksock.sendall('0') # tell server we've finished if self.cpulimit: break def process(self, con): """ internal! processes a single request on the connection con. """ # This is a little bit ugly: # The server has to send the length of the request followed by a colon. # We assume that 1. the colon is within the first seven bytes. # 2. the packet isn't fragmented. # Furthermore 1 implies that the request isn't longer than 999999 bytes. # This method however works. :-) try: data = con.recv(7) except socket.error: con.close() return if not ':' in data: con.close() return length, data = data.split(':', 1) if not length.isdigit(): # clear protocol violation con.close() return length = int(length) while len(data) != length + 1: # read one byte beyond try: t = con.recv(min(4096, length + 1 - len(data))) except socket.error: con.close() return if not t: # request too short con.close() return data += t # netstrings! data = data.split('\0') # the byte beyond has to be a ','. # and the number of netstrings excluding the final ',' has to be even if data.pop() != ',' or len(data) % 2 != 0: con.close() return environ = self.config.copy() while data: key = data.pop(0) value = data.pop(0) environ[key] = value # elements: # 0 -> None: no headers set # 0 -> False: set but unsent # 0 -> True: sent # 1 -> status string # 2 -> header list response_head = [None, None, None] def sendheaders(): assert response_head[0] is not None # headers set if response_head[0] != True: response_head[0] = True try: con.sendall('Status: %s\r\n%s\r\n\r\n' % (response_head[1], '\r\n'.join(map("%s: %s".__mod__, response_head[2])))) except socket.error: pass def dumbsend(data): sendheaders() try: con.sendall(data) except socket.error: pass def start_response(status, headers, exc_info=None): if exc_info and response_head[0]: try: raise exc_info_for_raise(exc_info) finally: exc_info = None assert not response_head[0] # unset or not sent response_head[0] = False # set but nothing sent response_head[1] = status response_head[2] = headers return dumbsend if not environ.get("CONTENT_LENGTH", "bad").isdigit(): con.close() return _convert_environ(environ, multiprocess=True) sfw = SocketFileWrapper(con, int(environ["CONTENT_LENGTH"])) environ["wsgi.input"] = sfw result = self.wsgiapp(environ, start_response) assert hasattr(result, "__iter__") if isinstance(result, FileWrapper) and result.can_transfer(): sendheaders() sent = 1 while sent > 0: sent = result.transfer(con) else: result_iter = iter(result) for data in result_iter: assert response_head[0] is not None assert isinstance(data, str) dumbsend(data) if response_head[0] != True: sendheaders() if hasattr(result, "close"): result.close() sfw.close() con.close()