From 2bf8ce84cfa548ec51702e693b0bb997241c989d Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 6 Apr 2025 21:04:18 -0300 Subject: [PATCH] Add trio resource semantics to eventfd --- tests/test_eventfd.py | 42 ++++++++++++++++++++++++++++++++++++---- tractor/linux/eventfd.py | 39 +++++++++++++++++++++++++++---------- 2 files changed, 67 insertions(+), 14 deletions(-) diff --git a/tests/test_eventfd.py b/tests/test_eventfd.py index 3d757169..3432048b 100644 --- a/tests/test_eventfd.py +++ b/tests/test_eventfd.py @@ -1,13 +1,13 @@ import trio import pytest -from tractor.ipc import ( +from tractor.linux.eventfd import ( open_eventfd, EFDReadCancelled, EventFD ) -def test_eventfd_read_cancellation(): +def test_read_cancellation(): ''' Ensure EventFD.read raises EFDReadCancelled if EventFD.close() is called. @@ -15,7 +15,7 @@ def test_eventfd_read_cancellation(): ''' fd = open_eventfd() - async def _read(event: EventFD): + async def bg_read(event: EventFD): with pytest.raises(EFDReadCancelled): await event.read() @@ -25,8 +25,42 @@ def test_eventfd_read_cancellation(): EventFD(fd, 'w') as event, trio.fail_after(3) ): - n.start_soon(_read, event) + n.start_soon(bg_read, event) await trio.sleep(0.2) event.close() trio.run(main) + + +def test_read_trio_semantics(): + ''' + Ensure EventFD.read raises trio.ClosedResourceError and + trio.BusyResourceError. + + ''' + + fd = open_eventfd() + + async def bg_read(event: EventFD): + try: + await event.read() + + except EFDReadCancelled: + ... + + async def main(): + async with trio.open_nursery() as n: + + # start background read and attempt + # foreground read, should be busy + with EventFD(fd, 'w') as event: + n.start_soon(bg_read, event) + await trio.sleep(0.2) + with pytest.raises(trio.BusyResourceError): + await event.read() + + # attempt read after close + with pytest.raises(trio.ClosedResourceError): + await event.read() + + trio.run(main) diff --git a/tractor/linux/eventfd.py b/tractor/linux/eventfd.py index f262c051..8ddf3669 100644 --- a/tractor/linux/eventfd.py +++ b/tractor/linux/eventfd.py @@ -129,12 +129,21 @@ class EventFD: self._omode: str = omode self._fobj = None self._cscope: trio.CancelScope | None = None + self._is_closed: bool = True + self._read_lock = trio.StrictFIFOLock() + + @property + def closed(self) -> bool: + return self._is_closed @property def fd(self) -> int | None: return self._fd def write(self, value: int) -> int: + if self.closed: + raise trio.ClosedResourceError + return write_eventfd(self._fd, value) async def read(self) -> int: @@ -145,19 +154,26 @@ class EventFD: in order to make it cancellable when `self.close()` is called. ''' - self._cscope = trio.CancelScope() - with self._cscope: - return await trio.to_thread.run_sync( - read_eventfd, self._fd, - abandon_on_cancel=True - ) + if self.closed: + raise trio.ClosedResourceError - if self._cscope.cancelled_caught: - raise EFDReadCancelled + if self._read_lock.locked(): + raise trio.BusyResourceError - self._cscope = None + async with self._read_lock: + self._cscope = trio.CancelScope() + with self._cscope: + return await trio.to_thread.run_sync( + read_eventfd, self._fd, + abandon_on_cancel=True + ) - def read_direct(self) -> int: + if self._cscope.cancelled_caught: + raise EFDReadCancelled + + self._cscope = None + + def read_nowait(self) -> int: ''' Direct call to `read_eventfd(self.fd)`, unless `eventfd` was opened with `EFD_NONBLOCK` its gonna block the thread. @@ -167,6 +183,7 @@ class EventFD: def open(self): self._fobj = os.fdopen(self._fd, self._omode) + self._is_closed = False def close(self): if self._fobj: @@ -179,6 +196,8 @@ class EventFD: if self._cscope: self._cscope.cancel() + self._is_closed = True + def __enter__(self): self.open() return self