From 863e29c4d2ed76ddf35a0bc3b66abe7423125362 Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Sat, 17 Mar 2012 22:14:47 +0100 Subject: sendfile support When a sendfile library is available, expose it via wsgi.file_wrapper. This support spans both asynchronous and forkpool. --- wsgitools/scgi/__init__.py | 57 +++++++++++++++++++++++++++++ wsgitools/scgi/asynchronous.py | 81 ++++++++++++++++++++++++++++-------------- wsgitools/scgi/forkpool.py | 20 +++++++---- 3 files changed, 124 insertions(+), 34 deletions(-) diff --git a/wsgitools/scgi/__init__.py b/wsgitools/scgi/__init__.py index 20f6625..cbe7a80 100644 --- a/wsgitools/scgi/__init__.py +++ b/wsgitools/scgi/__init__.py @@ -1,3 +1,58 @@ +__all__ = [] + +try: + import sendfile +except ImportError: + have_sendfile = False +else: + have_sendfile = True + +class FileWrapper: + """ + @ivar offset: Initially 0. Becomes -1 when reading using next and + becomes positive when reading using next. In the latter case it + counts the number of bytes sent. It also ensures that next and + transfer are never mixed. + """ + def __init__(self, filelike, blksize=8192): + self.filelike = filelike + self.blksize = blksize + self.offset = 0 + if hasattr(filelike, "close"): + self.close = filelike.close + + def can_transfer(self): + return have_sendfile and hasattr(self.filelike, "fileno") and \ + self.offset >= 0 + + def transfer(self, sock, blksize=None): + assert self.offset >= 0 + if blksize is None: + blksize = self.blksize + else: + blksize = min(self.blksize, blksize) + try: + sent = sendfile.sendfile(sock.fileno(), self.filelike.fileno(), + self.offset, blksize) + except OSError: + return -1 + # There are two different sendfile libraries. Yeah! + if isinstance(sent, tuple): + sent = sent[1] + self.offset += sent + return sent + + def __iter__(self): + return self + + def next(self): + assert self.offset <= 0 + self.offset = -1 + data = self.filelike.read(self.blksize) + if data: + return data + raise StopIteration + def _convert_environ(environ, multithread=False, multiprocess=False, run_once=False): environ.update({ @@ -13,3 +68,5 @@ def _convert_environ(environ, multithread=False, multiprocess=False, except KeyError: pass environ.pop("HTTP_CONTENT_LENGTH", None) # TODO: better way? + if have_sendfile: + environ["wsgi.file_wrapper"] = FileWrapper diff --git a/wsgitools/scgi/asynchronous.py b/wsgitools/scgi/asynchronous.py index 4ecb00a..386e1d0 100644 --- a/wsgitools/scgi/asynchronous.py +++ b/wsgitools/scgi/asynchronous.py @@ -11,7 +11,7 @@ except ImportError: import StringIO as io import errno -from wsgitools.scgi import _convert_environ +from wsgitools.scgi import _convert_environ, FileWrapper if sys.version_info[0] >= 3: def exc_info_for_raise(exc_info): @@ -25,8 +25,11 @@ class SCGIConnection(asyncore.dispatcher): # connection states NEW = 0*4 | 1 # connection established, waiting for request HEADER = 1*4 | 1 # the request length was received, waiting for the rest - BODY = 2*4 | 1 # the request header was received, waiting for the body - REQ = 3*4 | 2 # request received, sending response + BODY = 2*4 | 1 # the request header was received, waiting for the body, + # to RESP or RESPH + RESP = 3*4 | 2 # sending response, end state + RESPH = 4*4 | 2 # buffered response headers, sending headers only, to TRANS + TRANS = 5*4 | 2 # transferring using FileWrapper, end state def __init__(self, server, connection, addr, maxrequestsize=65536, maxpostsize=8<<20, blocksize=4096, config={}): asyncore.dispatcher.__init__(self, connection) @@ -41,7 +44,8 @@ class SCGIConnection(asyncore.dispatcher): self.reqlen = -1 # request length used in two different meanings self.inbuff = "" # input buffer self.outbuff = "" # output buffer - self.wsgihandler = None # wsgi application iterator + self.wsgihandler = None # wsgi application + self.wsgiiterator = None # wsgi application iterator self.outheaders = () # headers to be sent # () -> unset, (..,..) -> set, True -> sent self.body = io.StringIO() # request body @@ -55,7 +59,8 @@ class SCGIConnection(asyncore.dispatcher): self.outheaders = True def _wsgi_write(self, data): - assert self.state >= SCGIConnection.REQ + assert self.state >= SCGIConnection.RESP + assert self.state < SCGIConnection.TRANS assert isinstance(data, str) if data: self._try_send_headers() @@ -124,9 +129,15 @@ class SCGIConnection(asyncore.dispatcher): _convert_environ(self.environ) self.environ["wsgi.input"] = self.body self.environ["wsgi.errors"] = self.server.error - self.wsgihandler = iter(self.server.wsgiapp( - self.environ, self.start_response)) - self.state = SCGIConnection.REQ + self.wsgihandler = self.server.wsgiapp(self.environ, + self.start_response) + if isinstance(self.wsgihandler, FileWrapper) and \ + self.wsgihandler.can_transfer(): + self._try_send_headers() + self.state = SCGIConnection.RESPH + else: + self.wsgiiterator = iter(self.wsgihandler) + self.state = SCGIConnection.RESP else: self.body.write(self.inbuff) self.reqlen -= len(self.inbuff) @@ -145,29 +156,45 @@ class SCGIConnection(asyncore.dispatcher): self.outheaders = (status, headers) return self._wsgi_write - def handle_write(self): - """C{asyncore} interface""" - assert self.state >= SCGIConnection.REQ - if len(self.outbuff) < self.blocksize: - self._try_send_headers() - for data in self.wsgihandler: - assert isinstance(data, str) - if data: - self.outbuff += data - break - if len(self.outbuff) == 0: - if hasattr(self.wsgihandler, "close"): - self.wsgihandler.close() - self.close() - return + def send_buff(self): try: sentbytes = self.send(self.outbuff[:self.blocksize]) except socket.error: - if hasattr(self.wsgihandler, "close"): - self.wsgihandler.close() self.close() - return - self.outbuff = self.outbuff[sentbytes:] + else: + self.outbuff = self.outbuff[sentbytes:] + + def handle_write(self): + """C{asyncore} interface""" + if self.state == SCGIConnection.RESP: + if len(self.outbuff) < self.blocksize: + self._try_send_headers() + for data in self.wsgiiterator: + assert isinstance(data, str) + if data: + self.outbuff += data + break + if len(self.outbuff) == 0: + self.close() + return + self.send_buff() + elif self.state == SCGIConnection.RESPH: + assert len(self.outbuff) > 0 + self.send_buff() + if not self.outbuff: + self.state = SCGIConnection.TRANS + else: + assert self.state == SCGIConnection.TRANS + assert self.wsgihandler.can_transfer() + sent = self.wsgihandler.transfer(self.socket, self.blocksize) + if sent <= 0: + self.close() + + def close(self): + # None doesn't have a close attribute + if hasattr(self.wsgihandler, "close"): + self.wsgihandler.close() + asyncore.dispatcher.close(self) def handle_close(self): """C{asyncore} interface""" diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py index aa316d1..cdd50f0 100644 --- a/wsgitools/scgi/forkpool.py +++ b/wsgitools/scgi/forkpool.py @@ -12,7 +12,7 @@ import sys import errno import signal -from wsgitools.scgi import _convert_environ +from wsgitools.scgi import _convert_environ, FileWrapper if sys.version_info[0] >= 3: def exc_info_for_raise(exc_info): @@ -444,13 +444,19 @@ class SCGIServer: result = self.wsgiapp(environ, start_response) assert hasattr(result, "__iter__") - assert response_head[0] is not None - result_iter = iter(result) - for data in result_iter: - assert isinstance(data, str) - dumbsend(data) - if response_head[0] != True: + if isinstance(result, FileWrapper) and result.can_transfer(): sendheaders() + sent = 1 + while sent > 0: + sent = result.transfer(con) + else: + assert response_head[0] is not None + result_iter = iter(result) + for data in result_iter: + assert isinstance(data, str) + dumbsend(data) + if response_head[0] != True: + sendheaders() if hasattr(result, "close"): result.close() sfw.close() -- cgit v1.2.3