summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2024-03-16 14:21:56 +0100
committerHelmut Grohne <helmut@subdivi.de>2024-03-16 14:21:56 +0100
commitddd02e390644a7ecf9a8d5a4b451cc19724dcd77 (patch)
treed001b753e3f2b65ba503926c21096033b3580754
parent0eaaa74e51408458a162bf5d2d7813056365268e (diff)
downloadpython-linuxnamespaces-ddd02e390644a7ecf9a8d5a4b451cc19724dcd77.tar.gz
add an asyncio variant of run_in_fork
-rw-r--r--linuxnamespaces/__init__.py53
-rw-r--r--tests/test_simple.py39
2 files changed, 80 insertions, 12 deletions
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py
index 02d1feb..d27fd7b 100644
--- a/linuxnamespaces/__init__.py
+++ b/linuxnamespaces/__init__.py
@@ -5,6 +5,7 @@
Python.
"""
+import asyncio
import bisect
import contextlib
import dataclasses
@@ -233,6 +234,58 @@ class run_in_fork:
self.wait()
+class async_run_in_fork:
+ """Decorator for running the decorated function once in a separate process.
+ Note that the decorator can only be used inside asynchronous code as it
+ uses the running event loop. The decorated function insetad must be
+ synchronous and it must not access the event loop of the main process.
+ """
+ def __init__(self, function: typing.Callable[[], None]):
+ """Fork a new process that will eventually run the given function and
+ then exit.
+ """
+ loop = asyncio.get_running_loop()
+ with asyncio.get_child_watcher() as watcher:
+ if not watcher.is_active():
+ raise RuntimeError(
+ "active child watcher required for creating a process"
+ )
+ self.future = loop.create_future()
+ self.efd = EventFD()
+ self.pid = os.fork()
+ if self.pid == 0:
+ self.efd.read()
+ self.efd.close()
+ function()
+ os._exit(0)
+ watcher.add_child_handler(self.pid, self._child_callback)
+
+ def _child_callback(self, pid: int, returncode: int) -> None:
+ if self.pid != pid:
+ return
+ self.future.set_result(returncode)
+
+ def start(self) -> None:
+ """Start the decorated function. It can only be started once."""
+ if not self.efd:
+ raise ValueError("this function can only be called once")
+ self.efd.write(1)
+ self.efd.close()
+
+ async def wait(self) -> None:
+ """Wait for the process running the decorated function to finish."""
+ if self.efd:
+ raise ValueError("start must be called before wait")
+ ret = await self.future
+ if ret != 0:
+ raise ValueError("something failed")
+
+ async def __call__(self) -> None:
+ """Start the decorated function and wait for its process to finish."""
+ self.start()
+ await self.wait()
+
+
def bind_mount(
source: AtLocationLike,
target: AtLocationLike,
diff --git a/tests/test_simple.py b/tests/test_simple.py
index 63f7804..e3d0fd9 100644
--- a/tests/test_simple.py
+++ b/tests/test_simple.py
@@ -48,18 +48,33 @@ class IDAllocationTest(unittest.TestCase):
self.assertIn(alloc.allocate(3), (1, 2))
-class EventFDTest(unittest.IsolatedAsyncioTestCase):
- async def test_async(self) -> None:
- efd = linuxnamespaces.EventFD(1, linuxnamespaces.EventFDFlags.NONBLOCK)
- fut = asyncio.ensure_future(efd.aread())
- await asyncio.sleep(0.000001) # Let the loop run
- self.assertTrue(fut.done())
- self.assertEqual(await fut, 1)
- fut = asyncio.ensure_future(efd.aread())
- await asyncio.sleep(0.000001) # Let the loop run
- self.assertFalse(fut.done())
- efd.write()
- self.assertEqual(await fut, 1)
+class AsnycioTest(unittest.IsolatedAsyncioTestCase):
+ async def test_eventfd(self) -> None:
+ with linuxnamespaces.EventFD(
+ 1, linuxnamespaces.EventFDFlags.NONBLOCK
+ ) as efd:
+ fut = asyncio.ensure_future(efd.aread())
+ await asyncio.sleep(0.000001) # Let the loop run
+ self.assertTrue(fut.done())
+ self.assertEqual(await fut, 1)
+ fut = asyncio.ensure_future(efd.aread())
+ await asyncio.sleep(0.000001) # Let the loop run
+ self.assertFalse(fut.done())
+ efd.write()
+ self.assertEqual(await fut, 1)
+
+ async def test_run_in_fork(self) -> None:
+ with linuxnamespaces.EventFD(
+ 0, linuxnamespaces.EventFDFlags.NONBLOCK
+ ) as efd:
+ fut = asyncio.ensure_future(efd.aread())
+ @linuxnamespaces.async_run_in_fork
+ def set_ready():
+ efd.write()
+ await asyncio.sleep(0.000001) # Let the loop run
+ self.assertFalse(fut.done())
+ await set_ready()
+ await asyncio.wait_for(fut, 10)
class UnshareTest(unittest.TestCase):