summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--linuxnamespaces/__init__.py62
-rw-r--r--tests/test_simple.py4
2 files changed, 48 insertions, 18 deletions
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py
index 8adf45b..5b163df 100644
--- a/linuxnamespaces/__init__.py
+++ b/linuxnamespaces/__init__.py
@@ -26,27 +26,39 @@ class run_in_fork:
"""Decorator for running the decorated function once in a separate process.
"""
- def __init__(self, function: typing.Callable[[], None]):
- """Fork a new process that will eventually run the given function and
- then exit.
+ def __init__(
+ self, function: typing.Callable[[], None], start: bool = False
+ ):
+ """Fork a new process that will run the given function and then exit.
+ If start is true, run it immediately, otherwise the start or __call__
+ method should be used.
"""
- self.efd = EventFD()
+ self.efd = None if start else EventFD()
self.pid = os.fork()
if self.pid == 0:
try:
- self.efd.read()
- self.efd.close()
+ if self.efd is not None:
+ self.efd.read()
+ self.efd.close()
+ self.efd = None
function()
except:
os._exit(1)
os._exit(0)
+ @classmethod
+ def now(cls, function: typing.Callable[[], None]) -> typing.Self:
+ """Fork a new process that will immediately run the given function and
+ then exit."""
+ return cls(function, start=True)
+
def start(self) -> None:
"""Start the decorated function. It can only be started once."""
if not self.efd:
raise ValueError("this function can only be called once")
self.efd.write(1)
self.efd.close()
+ self.efd = None
def wait(self) -> None:
"""Wait for the process running the decorated function to finish."""
@@ -57,8 +69,11 @@ class run_in_fork:
raise ValueError("something failed")
def __call__(self) -> None:
- """Start the decorated function and wait for its process to finish."""
- self.start()
+ """Start the decorated function if needed and wait for its process to
+ finish.
+ """
+ if self.efd:
+ self.start()
self.wait()
@@ -69,9 +84,12 @@ class async_run_in_fork:
synchronous and it must not access the event loop of the main process.
"""
- def __init__(self, function: typing.Callable[[], None]):
- """Fork a new process that will eventually run the given function and
- then exit.
+ def __init__(
+ self, function: typing.Callable[[], None], start: bool = False
+ ):
+ """Fork a new process that will run the given function and then exit.
+ If start is true, run it immediately, otherwise the start or __call__
+ method should be used.
"""
loop = asyncio.get_running_loop()
with asyncio.get_child_watcher() as watcher:
@@ -80,18 +98,26 @@ class async_run_in_fork:
"active child watcher required for creating a process"
)
self.future = loop.create_future()
- self.efd = EventFD()
+ self.efd = None if start else EventFD()
self.pid = os.fork()
if self.pid == 0:
try:
- self.efd.read()
- self.efd.close()
+ if self.efd:
+ self.efd.read()
+ self.efd.close()
+ self.efd = None
function()
except:
os._exit(1)
os._exit(0)
watcher.add_child_handler(self.pid, self._child_callback)
+ @classmethod
+ def now(cls, function: typing.Callable[[], None]) -> typing.Self:
+ """Fork a new process that will immediately run the given function and
+ then exit."""
+ return cls(function, start=True)
+
def _child_callback(self, pid: int, returncode: int) -> None:
if self.pid != pid:
return
@@ -103,6 +129,7 @@ class async_run_in_fork:
raise ValueError("this function can only be called once")
self.efd.write(1)
self.efd.close()
+ self.efd = None
async def wait(self) -> None:
"""Wait for the process running the decorated function to finish."""
@@ -113,8 +140,11 @@ class async_run_in_fork:
raise ValueError("something failed")
async def __call__(self) -> None:
- """Start the decorated function and wait for its process to finish."""
- self.start()
+ """Start the decorated function if needed and wait for its process to
+ finish.
+ """
+ if self.efd:
+ self.start()
await self.wait()
diff --git a/tests/test_simple.py b/tests/test_simple.py
index 2b5252f..38a253d 100644
--- a/tests/test_simple.py
+++ b/tests/test_simple.py
@@ -214,7 +214,7 @@ class UnshareTest(unittest.TestCase):
| linuxnamespaces.CloneFlags.NEWPID
)
linuxnamespaces.newuidmap(-1, [idmap], False)
- @linuxnamespaces.run_in_fork
+ @linuxnamespaces.run_in_fork.now
def setup() -> None:
self.assertEqual(os.getpid(), 1)
linuxnamespaces.mount("proc", "/proc", "proc")
@@ -307,7 +307,7 @@ class UnshareIdmapTest(unittest.TestCase):
os.setregid(0, 0)
linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs")
os.mkdir("/mnt/dev")
- @linuxnamespaces.run_in_fork
+ @linuxnamespaces.run_in_fork.now
def test() -> None:
linuxnamespaces.populate_dev("/", "/mnt")
test()