Add trio resource semantics to eventfd
parent
1451feb159
commit
8e1f95881c
|
@ -1,13 +1,13 @@
|
||||||
import trio
|
import trio
|
||||||
import pytest
|
import pytest
|
||||||
from tractor.ipc import (
|
from tractor.linux.eventfd import (
|
||||||
open_eventfd,
|
open_eventfd,
|
||||||
EFDReadCancelled,
|
EFDReadCancelled,
|
||||||
EventFD
|
EventFD
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_eventfd_read_cancellation():
|
def test_read_cancellation():
|
||||||
'''
|
'''
|
||||||
Ensure EventFD.read raises EFDReadCancelled if EventFD.close()
|
Ensure EventFD.read raises EFDReadCancelled if EventFD.close()
|
||||||
is called.
|
is called.
|
||||||
|
@ -15,7 +15,7 @@ def test_eventfd_read_cancellation():
|
||||||
'''
|
'''
|
||||||
fd = open_eventfd()
|
fd = open_eventfd()
|
||||||
|
|
||||||
async def _read(event: EventFD):
|
async def bg_read(event: EventFD):
|
||||||
with pytest.raises(EFDReadCancelled):
|
with pytest.raises(EFDReadCancelled):
|
||||||
await event.read()
|
await event.read()
|
||||||
|
|
||||||
|
@ -25,8 +25,42 @@ def test_eventfd_read_cancellation():
|
||||||
EventFD(fd, 'w') as event,
|
EventFD(fd, 'w') as event,
|
||||||
trio.fail_after(3)
|
trio.fail_after(3)
|
||||||
):
|
):
|
||||||
n.start_soon(_read, event)
|
n.start_soon(bg_read, event)
|
||||||
await trio.sleep(0.2)
|
await trio.sleep(0.2)
|
||||||
event.close()
|
event.close()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_trio_semantics():
|
||||||
|
'''
|
||||||
|
Ensure EventFD.read raises trio.ClosedResourceError and
|
||||||
|
trio.BusyResourceError.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
fd = open_eventfd()
|
||||||
|
|
||||||
|
async def bg_read(event: EventFD):
|
||||||
|
try:
|
||||||
|
await event.read()
|
||||||
|
|
||||||
|
except EFDReadCancelled:
|
||||||
|
...
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
|
||||||
|
# start background read and attempt
|
||||||
|
# foreground read, should be busy
|
||||||
|
with EventFD(fd, 'w') as event:
|
||||||
|
n.start_soon(bg_read, event)
|
||||||
|
await trio.sleep(0.2)
|
||||||
|
with pytest.raises(trio.BusyResourceError):
|
||||||
|
await event.read()
|
||||||
|
|
||||||
|
# attempt read after close
|
||||||
|
with pytest.raises(trio.ClosedResourceError):
|
||||||
|
await event.read()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
|
@ -129,12 +129,21 @@ class EventFD:
|
||||||
self._omode: str = omode
|
self._omode: str = omode
|
||||||
self._fobj = None
|
self._fobj = None
|
||||||
self._cscope: trio.CancelScope | None = None
|
self._cscope: trio.CancelScope | None = None
|
||||||
|
self._is_closed: bool = True
|
||||||
|
self._read_lock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def closed(self) -> bool:
|
||||||
|
return self._is_closed
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def fd(self) -> int | None:
|
def fd(self) -> int | None:
|
||||||
return self._fd
|
return self._fd
|
||||||
|
|
||||||
def write(self, value: int) -> int:
|
def write(self, value: int) -> int:
|
||||||
|
if self.closed:
|
||||||
|
raise trio.ClosedResourceError
|
||||||
|
|
||||||
return write_eventfd(self._fd, value)
|
return write_eventfd(self._fd, value)
|
||||||
|
|
||||||
async def read(self) -> int:
|
async def read(self) -> int:
|
||||||
|
@ -145,19 +154,26 @@ class EventFD:
|
||||||
in order to make it cancellable when `self.close()` is called.
|
in order to make it cancellable when `self.close()` is called.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
self._cscope = trio.CancelScope()
|
if self.closed:
|
||||||
with self._cscope:
|
raise trio.ClosedResourceError
|
||||||
return await trio.to_thread.run_sync(
|
|
||||||
read_eventfd, self._fd,
|
|
||||||
abandon_on_cancel=True
|
|
||||||
)
|
|
||||||
|
|
||||||
if self._cscope.cancelled_caught:
|
if self._read_lock.locked():
|
||||||
raise EFDReadCancelled
|
raise trio.BusyResourceError
|
||||||
|
|
||||||
self._cscope = None
|
async with self._read_lock:
|
||||||
|
self._cscope = trio.CancelScope()
|
||||||
|
with self._cscope:
|
||||||
|
return await trio.to_thread.run_sync(
|
||||||
|
read_eventfd, self._fd,
|
||||||
|
abandon_on_cancel=True
|
||||||
|
)
|
||||||
|
|
||||||
def read_direct(self) -> int:
|
if self._cscope.cancelled_caught:
|
||||||
|
raise EFDReadCancelled
|
||||||
|
|
||||||
|
self._cscope = None
|
||||||
|
|
||||||
|
def read_nowait(self) -> int:
|
||||||
'''
|
'''
|
||||||
Direct call to `read_eventfd(self.fd)`, unless `eventfd` was
|
Direct call to `read_eventfd(self.fd)`, unless `eventfd` was
|
||||||
opened with `EFD_NONBLOCK` its gonna block the thread.
|
opened with `EFD_NONBLOCK` its gonna block the thread.
|
||||||
|
@ -167,6 +183,7 @@ class EventFD:
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
self._fobj = os.fdopen(self._fd, self._omode)
|
self._fobj = os.fdopen(self._fd, self._omode)
|
||||||
|
self._is_closed = False
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self._fobj:
|
if self._fobj:
|
||||||
|
@ -179,6 +196,8 @@ class EventFD:
|
||||||
if self._cscope:
|
if self._cscope:
|
||||||
self._cscope.cancel()
|
self._cscope.cancel()
|
||||||
|
|
||||||
|
self._is_closed = True
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.open()
|
self.open()
|
||||||
return self
|
return self
|
||||||
|
|
Loading…
Reference in New Issue