Add trio resource semantics to eventfd

one_ring_to_rule_them_all
Guillermo Rodriguez 2025-04-06 21:04:18 -03:00
parent 0de70172fc
commit 2bf8ce84cf
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
2 changed files with 67 additions and 14 deletions

View File

@ -1,13 +1,13 @@
import trio
import pytest
from tractor.ipc import (
from tractor.linux.eventfd import (
open_eventfd,
EFDReadCancelled,
EventFD
)
def test_eventfd_read_cancellation():
def test_read_cancellation():
'''
Ensure EventFD.read raises EFDReadCancelled if EventFD.close()
is called.
@ -15,7 +15,7 @@ def test_eventfd_read_cancellation():
'''
fd = open_eventfd()
async def _read(event: EventFD):
async def bg_read(event: EventFD):
with pytest.raises(EFDReadCancelled):
await event.read()
@ -25,8 +25,42 @@ def test_eventfd_read_cancellation():
EventFD(fd, 'w') as event,
trio.fail_after(3)
):
n.start_soon(_read, event)
n.start_soon(bg_read, event)
await trio.sleep(0.2)
event.close()
trio.run(main)
def test_read_trio_semantics():
'''
Ensure EventFD.read raises trio.ClosedResourceError and
trio.BusyResourceError.
'''
fd = open_eventfd()
async def bg_read(event: EventFD):
try:
await event.read()
except EFDReadCancelled:
...
async def main():
async with trio.open_nursery() as n:
# start background read and attempt
# foreground read, should be busy
with EventFD(fd, 'w') as event:
n.start_soon(bg_read, event)
await trio.sleep(0.2)
with pytest.raises(trio.BusyResourceError):
await event.read()
# attempt read after close
with pytest.raises(trio.ClosedResourceError):
await event.read()
trio.run(main)

View File

@ -129,12 +129,21 @@ class EventFD:
self._omode: str = omode
self._fobj = None
self._cscope: trio.CancelScope | None = None
self._is_closed: bool = True
self._read_lock = trio.StrictFIFOLock()
@property
def closed(self) -> bool:
return self._is_closed
@property
def fd(self) -> int | None:
return self._fd
def write(self, value: int) -> int:
if self.closed:
raise trio.ClosedResourceError
return write_eventfd(self._fd, value)
async def read(self) -> int:
@ -145,19 +154,26 @@ class EventFD:
in order to make it cancellable when `self.close()` is called.
'''
self._cscope = trio.CancelScope()
with self._cscope:
return await trio.to_thread.run_sync(
read_eventfd, self._fd,
abandon_on_cancel=True
)
if self.closed:
raise trio.ClosedResourceError
if self._cscope.cancelled_caught:
raise EFDReadCancelled
if self._read_lock.locked():
raise trio.BusyResourceError
self._cscope = None
async with self._read_lock:
self._cscope = trio.CancelScope()
with self._cscope:
return await trio.to_thread.run_sync(
read_eventfd, self._fd,
abandon_on_cancel=True
)
def read_direct(self) -> int:
if self._cscope.cancelled_caught:
raise EFDReadCancelled
self._cscope = None
def read_nowait(self) -> int:
'''
Direct call to `read_eventfd(self.fd)`, unless `eventfd` was
opened with `EFD_NONBLOCK` its gonna block the thread.
@ -167,6 +183,7 @@ class EventFD:
def open(self):
self._fobj = os.fdopen(self._fd, self._omode)
self._is_closed = False
def close(self):
if self._fobj:
@ -179,6 +196,8 @@ class EventFD:
if self._cscope:
self._cscope.cancel()
self._is_closed = True
def __enter__(self):
self.open()
return self