diff options
author | Helmut Grohne <helmut@subdivi.de> | 2023-06-18 18:25:46 +0200 |
---|---|---|
committer | Helmut Grohne <helmut@subdivi.de> | 2023-06-18 22:43:06 +0200 |
commit | 7c99665565ea6e3194bfbb93cdacf3332803446e (patch) | |
tree | caa46b802b36c030b3deca99aa84aabeb2689769 | |
parent | 7c20ce632e6f50c8c64708fb9b0d335feba095d6 (diff) | |
download | wsgitools-7c99665565ea6e3194bfbb93cdacf3332803446e.tar.gz |
add testcase for scgi.forkpool
-rwxr-xr-x | test.py | 49 |
1 files changed, 42 insertions, 7 deletions
@@ -416,6 +416,15 @@ import asyncore import socket import threading +def fetch_scgi(port, req, body=b""): + with socket.socket() as client: + client.connect(("localhost", port)) + req = req.copy() + req["CONTENT_LENGTH"] = str(len(body)) + reqb = str2bytes("".join(map("%s\0%s\0".__mod__, req.items()))) + client.sendall(b"%d:%s,%s" % (len(reqb), reqb, body)) + return client.recv(65536) + class ScgiAsynchronousTest(unittest.TestCase): def testSimple(self): app = applications.StaticContent( @@ -430,18 +439,43 @@ class ScgiAsynchronousTest(unittest.TestCase): target=asyncore.loop, kwargs={"count": 10, "timeout": 0.01} ) serverthread.start() - client = socket.socket() - client.connect(("localhost", port)) - req = {"CONTENT_LENGTH": "0", "REQUEST_METHOD": "GET"} - reqb = str2bytes("".join(map("%s\0%s\0".__mod__, req.items()))) - client.send(b"%d:%s," % (len(reqb), reqb)) - data = client.recv(65536) - client.close() + data = fetch_scgi(port, {"REQUEST_METHOD": "GET"}) self.assertTrue(data.startswith(b"Status: 200 OK\r\n")) self.assertTrue(data.endswith(b"\r\n\r\nnothing")) serverthread.join() sock.close() +from wsgitools.scgi.forkpool import SCGIServer as SCGIForkServer +import os +import signal + +class ScgiForkTest(unittest.TestCase): + def testSimple(self): + app = applications.StaticContent( + "200 OK", [("Content-Type", "text/plain")], b"nothing" + ) + sock = socket.socket() + sock.bind(("localhost", 0)) + sock.listen(5) + port = sock.getsockname()[1] + pid = os.fork() + if pid == 0: + try: + SCGIForkServer( + app, port, reusesocket=sock + ).enable_sighandler().run() + except SystemExit: + pass + # The workers and the main fork server will reach this. + # Avoid calling unittest cleanup handlers. + os._exit(0) + sock.close() + data = fetch_scgi(port, {"REQUEST_METHOD": "GET"}) + os.kill(pid, signal.SIGTERM) + os.waitpid(pid, 0) + self.assertTrue(data.startswith(b"Status: 200 OK\r\n")) + self.assertTrue(data.endswith(b"\r\n\r\nnothing")) + def alltests(case): return unittest.TestLoader().loadTestsFromTestCase(case) @@ -457,6 +491,7 @@ fullsuite.addTest(alltests(NoWriteCallableMiddlewareTest)) fullsuite.addTest(alltests(RequestLogWSGIFilterTest)) fullsuite.addTest(alltests(GzipWSGIFilterTest)) fullsuite.addTest(alltests(ScgiAsynchronousTest)) +fullsuite.addTest(alltests(ScgiForkTest)) if __name__ == "__main__": runner = unittest.TextTestRunner(verbosity=2) |