summaryrefslogtreecommitdiff
path: root/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test.py')
-rwxr-xr-xtest.py88
1 files changed, 41 insertions, 47 deletions
diff --git a/test.py b/test.py
index 5cbf842..6aef5bf 100755
--- a/test.py
+++ b/test.py
@@ -1,26 +1,15 @@
#!/usr/bin/env python
+import base64
import unittest
import doctest
import re
import wsgiref.validate
-# Cannot use io module as it is broken in 2.6.
-# Writing a str to a io.StringIO results in an exception.
-try:
- import cStringIO as io
-except ImportError:
- import StringIO as io
-try:
- from hashlib import md5
-except ImportError:
- from md5 import md5
+import io
+from hashlib import md5
import sys
-try:
- next
-except NameError:
- def next(iterator):
- return iterator.next()
+from wsgitools.internal import bytes2str, str2bytes
class Request(object):
def __init__(self, case):
@@ -37,7 +26,7 @@ class Request(object):
QUERY_STRING="")
self.environ.update({
"wsgi.version": (1, 0),
- "wsgi.input": io.StringIO(),
+ "wsgi.input": io.BytesIO(),
"wsgi.errors": sys.stderr,
"wsgi.url_scheme": "http",
"wsgi.multithread": False,
@@ -136,14 +125,14 @@ class Result(object):
self.testcase.fail("header %s not found" % name)
def get_data(self):
- return "".join(self.writtendata) + "".join(self.returneddata)
+ return b"".join(self.writtendata) + b"".join(self.returneddata)
from wsgitools import applications
class StaticContentTest(unittest.TestCase):
def setUp(self):
self.app = applications.StaticContent(
- "200 Found", [("Content-Type", "text/plain")], "nothing")
+ "200 Found", [("Content-Type", "text/plain")], b"nothing")
self.req = Request(self)
def testGet(self):
@@ -160,7 +149,7 @@ class StaticContentTest(unittest.TestCase):
class StaticFileTest(unittest.TestCase):
def setUp(self):
- self.app = applications.StaticFile(io.StringIO("success"), "200 Found",
+ self.app = applications.StaticFile(io.BytesIO(b"success"), "200 Found",
[("Content-Type", "text/plain")])
self.req = Request(self)
@@ -181,7 +170,7 @@ from wsgitools import digest
class AuthDigestMiddlewareTest(unittest.TestCase):
def setUp(self):
self.staticapp = applications.StaticContent(
- "200 Found", [("Content-Type", "text/plain")], "success")
+ "200 Found", [("Content-Type", "text/plain")], b"success")
token_gen = digest.AuthTokenGenerator("foo", lambda _: "baz")
self.app = digest.AuthDigestMiddleware(
wsgiref.validate.validator(self.staticapp), token_gen)
@@ -212,9 +201,9 @@ class AuthDigestMiddlewareTest(unittest.TestCase):
res.getheader("WWW-Authenticate").split())))
nonce = nonce.split('"')[1]
req = self.req.copy()
- token = md5("bar:foo:%s" % password).hexdigest()
- other = md5("GET:").hexdigest()
- resp = md5("%s:%s:%s" % (token, nonce, other)).hexdigest()
+ token = md5(str2bytes("bar:foo:%s" % password)).hexdigest()
+ other = md5(str2bytes("GET:")).hexdigest()
+ resp = md5(str2bytes("%s:%s:%s" % (token, nonce, other))).hexdigest()
req.setheader('Authorization', 'Digest algorithm=md5,nonce="%s",' \
'uri=,username=bar,response="%s"' % (nonce, resp))
res = req(self.app)
@@ -232,9 +221,10 @@ class AuthDigestMiddlewareTest(unittest.TestCase):
res.getheader("WWW-Authenticate").split())))
nonce = nonce.split('"')[1]
req = self.req.copy()
- token = md5("bar:foo:baz").hexdigest()
- other = md5("GET:").hexdigest()
- resp = md5("%s:%s:1:qux:auth:%s" % (token, nonce, other)).hexdigest()
+ token = md5(str2bytes("bar:foo:baz")).hexdigest()
+ other = md5(str2bytes("GET:")).hexdigest()
+ resp = "%s:%s:1:qux:auth:%s" % (token, nonce, other)
+ resp = md5(str2bytes(resp)).hexdigest()
req.setheader('Authorization', 'Digest algorithm=md5,nonce="%s",' \
'uri=,username=bar,response="%s",qop=auth,nc=1,' \
'cnonce=qux' % (nonce, resp))
@@ -246,28 +236,28 @@ from wsgitools import middlewares
def writing_application(environ, start_response):
write = start_response("404 Not found", [("Content-Type", "text/plain")])
write = start_response("200 Ok", [("Content-Type", "text/plain")])
- write("first")
- yield ""
- yield "second"
+ write(b"first")
+ yield b""
+ yield b"second"
def write_only_application(environ, start_response):
write = start_response("200 Ok", [("Content-Type", "text/plain")])
- write("first")
- write("second")
- yield ""
+ write(b"first")
+ write(b"second")
+ yield b""
class NoWriteCallableMiddlewareTest(unittest.TestCase):
def testWrite(self):
app = middlewares.NoWriteCallableMiddleware(writing_application)
res = Request(self)(app)
self.assertEqual(res.writtendata, [])
- self.assertEqual("".join(res.returneddata), "firstsecond")
+ self.assertEqual(b"".join(res.returneddata), b"firstsecond")
def testWriteOnly(self):
app = middlewares.NoWriteCallableMiddleware(write_only_application)
res = Request(self)(app)
self.assertEqual(res.writtendata, [])
- self.assertEqual("".join(res.returneddata), "firstsecond")
+ self.assertEqual(b"".join(res.returneddata), b"firstsecond")
class StupidIO(object):
"""file-like without tell method, so StaticFile is not able to
@@ -287,7 +277,7 @@ class StupidIO(object):
class ContentLengthMiddlewareTest(unittest.TestCase):
def setUp(self):
- self.staticapp = applications.StaticFile(StupidIO("success"),
+ self.staticapp = applications.StaticFile(StupidIO(b"success"),
"200 Found", [("Content-Type", "text/plain")])
self.app = middlewares.ContentLengthMiddleware(self.staticapp,
maxstore=10)
@@ -319,29 +309,29 @@ class CachingMiddlewareTest(unittest.TestCase):
if "maxage0" in environ["SCRIPT_NAME"]:
headers.append(("Cache-Control", "max-age=0"))
start_response("200 Found", headers)
- return ["%d" % count]
+ return [str2bytes("%d" % count)]
def testCache(self):
res = Request(self)(self.cached)
res.status(200)
- self.assertEqual(res.get_data(), "1")
+ self.assertEqual(res.get_data(), b"1")
res = Request(self)(self.cached)
res.status(200)
- self.assertEqual(res.get_data(), "1")
+ self.assertEqual(res.get_data(), b"1")
def testNoCache(self):
res = Request(self)(self.cached)
res.status(200)
- self.assertEqual(res.get_data(), "1")
+ self.assertEqual(res.get_data(), b"1")
res = Request(self).setheader(
"Cache-Control", "max-age=0")(self.cached)
res.status(200)
- self.assertEqual(res.get_data(), "2")
+ self.assertEqual(res.get_data(), b"2")
class BasicAuthMiddlewareTest(unittest.TestCase):
def setUp(self):
self.staticapp = applications.StaticContent(
- "200 Found", [("Content-Type", "text/plain")], "success")
+ "200 Found", [("Content-Type", "text/plain")], b"success")
checkpw = middlewares.DictAuthChecker({"bar": "baz"})
self.app = middlewares.BasicAuthMiddleware(
wsgiref.validate.validator(self.staticapp), checkpw)
@@ -368,7 +358,8 @@ class BasicAuthMiddlewareTest(unittest.TestCase):
def doauth(self, password="baz", status=200):
req = self.req.copy()
- token = ("bar:%s" % password).encode("base64").strip()
+ token = "bar:%s" % password
+ token = bytes2str(base64.b64encode(str2bytes(token)))
req.setheader('Authorization', 'Basic %s' % token)
res = req(self.app)
res.status(status)
@@ -385,8 +376,11 @@ import gzip
class RequestLogWSGIFilterTest(unittest.TestCase):
def testSimple(self):
app = applications.StaticContent("200 Found",
- [("Content-Type", "text/plain")], "nothing")
- log = io.StringIO()
+ [("Content-Type", "text/plain")], b"nothing")
+ if isinstance("x", bytes):
+ log = io.BytesIO()
+ else:
+ log = io.StringIO()
logfilter = filters.RequestLogWSGIFilter.creator(log)
app = filters.WSGIFilterMiddleware(app, logfilter)
req = Request(self)
@@ -401,13 +395,13 @@ class RequestLogWSGIFilterTest(unittest.TestCase):
class GzipWSGIFilterTest(unittest.TestCase):
def testSimple(self):
app = applications.StaticContent("200 Found",
- [("Content-Type", "text/plain")], "nothing")
+ [("Content-Type", "text/plain")], b"nothing")
app = filters.WSGIFilterMiddleware(app, filters.GzipWSGIFilter)
req = Request(self)
req.environ["HTTP_ACCEPT_ENCODING"] = "gzip"
res = req(app)
- data = gzip.GzipFile(fileobj=io.StringIO(res.get_data())).read()
- self.assertEqual(data, "nothing")
+ data = gzip.GzipFile(fileobj=io.BytesIO(res.get_data())).read()
+ self.assertEqual(data, b"nothing")
def alltests(case):
return unittest.TestLoader().loadTestsFromTestCase(case)