# Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 import asyncio import errno import os import pathlib import socket import unittest import pytest import linuxnamespaces class MountFlagsTest(unittest.TestCase): def test_tostrfromstr(self) -> None: for bit1 in range(32): for bit2 in range(bit1, 32): flag = ( linuxnamespaces.MountFlags(1 << bit1) | linuxnamespaces.MountFlags(1 << bit2) ) try: text = flag.tostr() except ValueError: continue self.assertEqual( linuxnamespaces.MountFlags.fromstr(text), flag ) class IDAllocationTest(unittest.TestCase): def test_idalloc(self) -> None: alloc = linuxnamespaces.IDAllocation() alloc.add_range(1, 2) alloc.add_range(5, 4) self.assertIn(alloc.find(3), (5, 6)) self.assertIn(alloc.allocate(3), (5, 6)) self.assertRaises(ValueError, alloc.find, 3) self.assertRaises(ValueError, alloc.allocate, 3) self.assertEqual(alloc.find(2), 1) def test_merge(self) -> None: alloc = linuxnamespaces.IDAllocation() alloc.add_range(1, 2) alloc.add_range(3, 2) self.assertIn(alloc.allocate(3), (1, 2)) class AsnycioTest(unittest.IsolatedAsyncioTestCase): async def test_eventfd(self) -> None: with linuxnamespaces.EventFD( 1, linuxnamespaces.EventFDFlags.NONBLOCK ) as efd: fut = asyncio.ensure_future(efd.aread()) await asyncio.sleep(0.000001) # Let the loop run self.assertTrue(fut.done()) self.assertEqual(await fut, 1) fut = asyncio.ensure_future(efd.aread()) await asyncio.sleep(0.000001) # Let the loop run self.assertFalse(fut.done()) efd.write() self.assertEqual(await fut, 1) async def test_run_in_fork(self) -> None: with linuxnamespaces.EventFD( 0, linuxnamespaces.EventFDFlags.NONBLOCK ) as efd: fut = asyncio.ensure_future(efd.aread()) set_ready = linuxnamespaces.async_run_in_fork(efd.write) await asyncio.sleep(0.000001) # Let the loop run self.assertFalse(fut.done()) await set_ready() await asyncio.wait_for(fut, 10) async def test_copyfd(self) -> None: rfd1, wfd1 = os.pipe2(os.O_NONBLOCK) rfd2, wfd2 = os.pipe2(os.O_NONBLOCK) fut = asyncio.ensure_future(linuxnamespaces.async_copyfd(rfd1, wfd2)) os.write(wfd1, b"hello") await asyncio.sleep(0.000001) # Let the loop run os.write(wfd1, b"world") loop = asyncio.get_running_loop() fut2 = loop.create_future() def callback() -> None: loop.remove_reader(rfd2) fut2.set_result(None) loop.add_reader(rfd2, callback) await fut2 self.assertEqual(os.read(rfd2, 11), b"helloworld") self.assertFalse(fut.done()) os.close(wfd1) await asyncio.sleep(0.000001) # Let the loop run self.assertTrue(fut.done()) os.close(rfd1) os.close(rfd2) os.close(wfd2) self.assertEqual(await fut, 10) async def test_copyfd_epipe(self) -> None: rfd1, wfd1 = os.pipe2(os.O_NONBLOCK) rfd2, wfd2 = os.pipe2(os.O_NONBLOCK) fut = asyncio.ensure_future(linuxnamespaces.async_copyfd(rfd1, wfd2)) os.close(rfd2) os.write(wfd1, b"hello") await asyncio.sleep(0.000001) # Let the loop run self.assertTrue(fut.done()) os.close(rfd1) os.close(wfd1) os.close(wfd2) exc = fut.exception() self.assertIsInstance(exc, OSError) self.assertEqual(exc.errno, errno.EPIPE) class UnshareTest(unittest.TestCase): @pytest.mark.forked def test_unshare_user(self) -> None: overflowuid = int(pathlib.Path("/proc/sys/fs/overflowuid").read_text()) idmap = linuxnamespaces.IDMapping(0, os.getuid(), 1) linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWUSER) self.assertEqual(os.getuid(), overflowuid) linuxnamespaces.newuidmap(-1, [idmap], False) self.assertEqual(os.getuid(), 0) # UID 1 is not mapped. self.assertRaises(OSError, os.setuid, 1) @pytest.mark.forked def test_mount_proc(self) -> None: idmap = linuxnamespaces.IDMapping(0, os.getuid(), 1) linuxnamespaces.unshare( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWPID ) linuxnamespaces.newuidmap(-1, [idmap], False) @linuxnamespaces.run_in_fork def setup() -> None: self.assertEqual(os.getpid(), 1) linuxnamespaces.mount("proc", "/proc", "proc") setup() @pytest.mark.forked def test_sethostname(self) -> None: self.assertRaises(socket.error, socket.sethostname, "example") linuxnamespaces.unshare( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWUTS ) socket.sethostname("example") @pytest.mark.forked def test_populate_dev(self) -> None: linuxnamespaces.unshare_user_idmap_nohelper( 0, 0, linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS, ) linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs", data="mode=0755") os.mkdir("/mnt/dev") linuxnamespaces.populate_dev("/", "/mnt", pidns=False) self.assertTrue(os.access("/mnt/dev/null", os.W_OK)) pathlib.Path("/mnt/dev/null").write_text("") class UnshareIdmapTest(unittest.TestCase): def setUp(self) -> None: super().setUp() self.uidalloc = linuxnamespaces.IDAllocation.loadsubid("uid") self.gidalloc = linuxnamespaces.IDAllocation.loadsubid("gid") try: self.uidalloc.find(65536) self.gidalloc.find(65536) except ValueError: self.skipTest("insufficient /etc/sub?id allocation") @pytest.mark.forked def test_unshare_user_idmap(self) -> None: overflowuid = int(pathlib.Path("/proc/sys/fs/overflowuid").read_text()) uidmap = linuxnamespaces.IDMapping( 0, self.uidalloc.allocate(65536), 65536 ) self.assertNotEqual(os.getuid(), uidmap.outerstart) gidmap = linuxnamespaces.IDMapping( 0, self.gidalloc.allocate(65536), 65536 ) pid = os.getpid() @linuxnamespaces.run_in_fork def setup() -> None: linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWUSER) setup() self.assertEqual(os.getuid(), overflowuid) os.setuid(0) self.assertEqual(os.getuid(), 0) os.setuid(1) self.assertEqual(os.getuid(), 1) @pytest.mark.forked def test_populate_dev(self) -> None: uidmap = linuxnamespaces.IDMapping( 0, self.uidalloc.allocate(65536), 65536 ) self.assertNotEqual(os.getuid(), uidmap.outerstart) gidmap = linuxnamespaces.IDMapping( 0, self.gidalloc.allocate(65536), 65536 ) pid = os.getpid() @linuxnamespaces.run_in_fork def setup() -> None: linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) linuxnamespaces.unshare( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWPID ) setup() os.setreuid(0, 0) os.setregid(0, 0) linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs") os.mkdir("/mnt/dev") @linuxnamespaces.run_in_fork def test() -> None: linuxnamespaces.populate_dev("/", "/mnt") test()