summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2012-03-17 22:14:47 +0100
committerHelmut Grohne <helmut@subdivi.de>2012-03-17 22:14:47 +0100
commit863e29c4d2ed76ddf35a0bc3b66abe7423125362 (patch)
treee3c5008f31574a5e84b159b660f38e17d53dfa1e
parent31aef51e3badfd2fdafdc6c15b3a58fcb04607fd (diff)
downloadwsgitools-863e29c4d2ed76ddf35a0bc3b66abe7423125362.tar.gz
sendfile support
When a sendfile library is available, expose it via wsgi.file_wrapper. This support spans both asynchronous and forkpool.
-rw-r--r--wsgitools/scgi/__init__.py57
-rw-r--r--wsgitools/scgi/asynchronous.py81
-rw-r--r--wsgitools/scgi/forkpool.py20
3 files changed, 124 insertions, 34 deletions
diff --git a/wsgitools/scgi/__init__.py b/wsgitools/scgi/__init__.py
index 20f6625..cbe7a80 100644
--- a/wsgitools/scgi/__init__.py
+++ b/wsgitools/scgi/__init__.py
@@ -1,3 +1,58 @@
+__all__ = []
+
+try:
+ import sendfile
+except ImportError:
+ have_sendfile = False
+else:
+ have_sendfile = True
+
+class FileWrapper:
+ """
+ @ivar offset: Initially 0. Becomes -1 when reading using next and
+ becomes positive when reading using next. In the latter case it
+ counts the number of bytes sent. It also ensures that next and
+ transfer are never mixed.
+ """
+ def __init__(self, filelike, blksize=8192):
+ self.filelike = filelike
+ self.blksize = blksize
+ self.offset = 0
+ if hasattr(filelike, "close"):
+ self.close = filelike.close
+
+ def can_transfer(self):
+ return have_sendfile and hasattr(self.filelike, "fileno") and \
+ self.offset >= 0
+
+ def transfer(self, sock, blksize=None):
+ assert self.offset >= 0
+ if blksize is None:
+ blksize = self.blksize
+ else:
+ blksize = min(self.blksize, blksize)
+ try:
+ sent = sendfile.sendfile(sock.fileno(), self.filelike.fileno(),
+ self.offset, blksize)
+ except OSError:
+ return -1
+ # There are two different sendfile libraries. Yeah!
+ if isinstance(sent, tuple):
+ sent = sent[1]
+ self.offset += sent
+ return sent
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ assert self.offset <= 0
+ self.offset = -1
+ data = self.filelike.read(self.blksize)
+ if data:
+ return data
+ raise StopIteration
+
def _convert_environ(environ, multithread=False, multiprocess=False,
run_once=False):
environ.update({
@@ -13,3 +68,5 @@ def _convert_environ(environ, multithread=False, multiprocess=False,
except KeyError:
pass
environ.pop("HTTP_CONTENT_LENGTH", None) # TODO: better way?
+ if have_sendfile:
+ environ["wsgi.file_wrapper"] = FileWrapper
diff --git a/wsgitools/scgi/asynchronous.py b/wsgitools/scgi/asynchronous.py
index 4ecb00a..386e1d0 100644
--- a/wsgitools/scgi/asynchronous.py
+++ b/wsgitools/scgi/asynchronous.py
@@ -11,7 +11,7 @@ except ImportError:
import StringIO as io
import errno
-from wsgitools.scgi import _convert_environ
+from wsgitools.scgi import _convert_environ, FileWrapper
if sys.version_info[0] >= 3:
def exc_info_for_raise(exc_info):
@@ -25,8 +25,11 @@ class SCGIConnection(asyncore.dispatcher):
# connection states
NEW = 0*4 | 1 # connection established, waiting for request
HEADER = 1*4 | 1 # the request length was received, waiting for the rest
- BODY = 2*4 | 1 # the request header was received, waiting for the body
- REQ = 3*4 | 2 # request received, sending response
+ BODY = 2*4 | 1 # the request header was received, waiting for the body,
+ # to RESP or RESPH
+ RESP = 3*4 | 2 # sending response, end state
+ RESPH = 4*4 | 2 # buffered response headers, sending headers only, to TRANS
+ TRANS = 5*4 | 2 # transferring using FileWrapper, end state
def __init__(self, server, connection, addr, maxrequestsize=65536,
maxpostsize=8<<20, blocksize=4096, config={}):
asyncore.dispatcher.__init__(self, connection)
@@ -41,7 +44,8 @@ class SCGIConnection(asyncore.dispatcher):
self.reqlen = -1 # request length used in two different meanings
self.inbuff = "" # input buffer
self.outbuff = "" # output buffer
- self.wsgihandler = None # wsgi application iterator
+ self.wsgihandler = None # wsgi application
+ self.wsgiiterator = None # wsgi application iterator
self.outheaders = () # headers to be sent
# () -> unset, (..,..) -> set, True -> sent
self.body = io.StringIO() # request body
@@ -55,7 +59,8 @@ class SCGIConnection(asyncore.dispatcher):
self.outheaders = True
def _wsgi_write(self, data):
- assert self.state >= SCGIConnection.REQ
+ assert self.state >= SCGIConnection.RESP
+ assert self.state < SCGIConnection.TRANS
assert isinstance(data, str)
if data:
self._try_send_headers()
@@ -124,9 +129,15 @@ class SCGIConnection(asyncore.dispatcher):
_convert_environ(self.environ)
self.environ["wsgi.input"] = self.body
self.environ["wsgi.errors"] = self.server.error
- self.wsgihandler = iter(self.server.wsgiapp(
- self.environ, self.start_response))
- self.state = SCGIConnection.REQ
+ self.wsgihandler = self.server.wsgiapp(self.environ,
+ self.start_response)
+ if isinstance(self.wsgihandler, FileWrapper) and \
+ self.wsgihandler.can_transfer():
+ self._try_send_headers()
+ self.state = SCGIConnection.RESPH
+ else:
+ self.wsgiiterator = iter(self.wsgihandler)
+ self.state = SCGIConnection.RESP
else:
self.body.write(self.inbuff)
self.reqlen -= len(self.inbuff)
@@ -145,29 +156,45 @@ class SCGIConnection(asyncore.dispatcher):
self.outheaders = (status, headers)
return self._wsgi_write
- def handle_write(self):
- """C{asyncore} interface"""
- assert self.state >= SCGIConnection.REQ
- if len(self.outbuff) < self.blocksize:
- self._try_send_headers()
- for data in self.wsgihandler:
- assert isinstance(data, str)
- if data:
- self.outbuff += data
- break
- if len(self.outbuff) == 0:
- if hasattr(self.wsgihandler, "close"):
- self.wsgihandler.close()
- self.close()
- return
+ def send_buff(self):
try:
sentbytes = self.send(self.outbuff[:self.blocksize])
except socket.error:
- if hasattr(self.wsgihandler, "close"):
- self.wsgihandler.close()
self.close()
- return
- self.outbuff = self.outbuff[sentbytes:]
+ else:
+ self.outbuff = self.outbuff[sentbytes:]
+
+ def handle_write(self):
+ """C{asyncore} interface"""
+ if self.state == SCGIConnection.RESP:
+ if len(self.outbuff) < self.blocksize:
+ self._try_send_headers()
+ for data in self.wsgiiterator:
+ assert isinstance(data, str)
+ if data:
+ self.outbuff += data
+ break
+ if len(self.outbuff) == 0:
+ self.close()
+ return
+ self.send_buff()
+ elif self.state == SCGIConnection.RESPH:
+ assert len(self.outbuff) > 0
+ self.send_buff()
+ if not self.outbuff:
+ self.state = SCGIConnection.TRANS
+ else:
+ assert self.state == SCGIConnection.TRANS
+ assert self.wsgihandler.can_transfer()
+ sent = self.wsgihandler.transfer(self.socket, self.blocksize)
+ if sent <= 0:
+ self.close()
+
+ def close(self):
+ # None doesn't have a close attribute
+ if hasattr(self.wsgihandler, "close"):
+ self.wsgihandler.close()
+ asyncore.dispatcher.close(self)
def handle_close(self):
"""C{asyncore} interface"""
diff --git a/wsgitools/scgi/forkpool.py b/wsgitools/scgi/forkpool.py
index aa316d1..cdd50f0 100644
--- a/wsgitools/scgi/forkpool.py
+++ b/wsgitools/scgi/forkpool.py
@@ -12,7 +12,7 @@ import sys
import errno
import signal
-from wsgitools.scgi import _convert_environ
+from wsgitools.scgi import _convert_environ, FileWrapper
if sys.version_info[0] >= 3:
def exc_info_for_raise(exc_info):
@@ -444,13 +444,19 @@ class SCGIServer:
result = self.wsgiapp(environ, start_response)
assert hasattr(result, "__iter__")
- assert response_head[0] is not None
- result_iter = iter(result)
- for data in result_iter:
- assert isinstance(data, str)
- dumbsend(data)
- if response_head[0] != True:
+ if isinstance(result, FileWrapper) and result.can_transfer():
sendheaders()
+ sent = 1
+ while sent > 0:
+ sent = result.transfer(con)
+ else:
+ assert response_head[0] is not None
+ result_iter = iter(result)
+ for data in result_iter:
+ assert isinstance(data, str)
+ dumbsend(data)
+ if response_head[0] != True:
+ sendheaders()
if hasattr(result, "close"):
result.close()
sfw.close()