Add direct read method on EventFD
Type hint all ctx managers in _ringbuf.py Remove unnecesary send lock on ring chan sender Handle EOF on ring chan receiver Rename ringbuf tests to make it less redundant
parent
5902ad2ffb
commit
1bbf1f7ab5
tests
tractor/ipc
|
@ -179,7 +179,7 @@ async def child_blocked_receiver(
|
||||||
await receiver.receive_some()
|
await receiver.receive_some()
|
||||||
|
|
||||||
|
|
||||||
def test_ring_reader_cancel():
|
def test_reader_cancel():
|
||||||
'''
|
'''
|
||||||
Test that a receiver blocked on eventfd(2) read responds to
|
Test that a receiver blocked on eventfd(2) read responds to
|
||||||
cancellation.
|
cancellation.
|
||||||
|
@ -222,7 +222,7 @@ async def child_blocked_sender(
|
||||||
await sender.send_all(b'this will wrap')
|
await sender.send_all(b'this will wrap')
|
||||||
|
|
||||||
|
|
||||||
def test_ring_sender_cancel():
|
def test_sender_cancel():
|
||||||
'''
|
'''
|
||||||
Test that a sender blocked on eventfd(2) read responds to
|
Test that a sender blocked on eventfd(2) read responds to
|
||||||
cancellation.
|
cancellation.
|
||||||
|
@ -255,7 +255,7 @@ def test_ring_sender_cancel():
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
def test_ringbuf_max_bytes():
|
def test_receiver_max_bytes():
|
||||||
'''
|
'''
|
||||||
Test that RingBuffReceiver.receive_some's max_bytes optional
|
Test that RingBuffReceiver.receive_some's max_bytes optional
|
||||||
argument works correctly, send a msg of size 100, then
|
argument works correctly, send a msg of size 100, then
|
||||||
|
@ -384,7 +384,7 @@ async def child_channel_sender(
|
||||||
await chan.send(msg)
|
await chan.send(msg)
|
||||||
|
|
||||||
|
|
||||||
def test_ringbuf_channel():
|
def test_channel():
|
||||||
|
|
||||||
msg_amount_min = 100
|
msg_amount_min = 100
|
||||||
msg_amount_max = 1000
|
msg_amount_max = 1000
|
||||||
|
|
|
@ -157,6 +157,14 @@ class EventFD:
|
||||||
|
|
||||||
self._cscope = None
|
self._cscope = None
|
||||||
|
|
||||||
|
def read_direct(self) -> int:
|
||||||
|
'''
|
||||||
|
Direct call to `read_eventfd(self.fd)`, unless `eventfd` was
|
||||||
|
opened with `EFD_NONBLOCK` its gonna block the thread.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return read_eventfd(self._fd)
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
self._fobj = os.fdopen(self._fd, self._omode)
|
self._fobj = os.fdopen(self._fd, self._omode)
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,10 @@ IPC Reliable RingBuffer implementation
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import struct
|
import struct
|
||||||
|
from typing import (
|
||||||
|
ContextManager,
|
||||||
|
AsyncContextManager
|
||||||
|
)
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
asynccontextmanager as acm
|
asynccontextmanager as acm
|
||||||
|
@ -26,7 +30,6 @@ from contextlib import (
|
||||||
from multiprocessing.shared_memory import SharedMemory
|
from multiprocessing.shared_memory import SharedMemory
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from tricycle import BufferedReceiveStream
|
|
||||||
from msgspec import (
|
from msgspec import (
|
||||||
Struct,
|
Struct,
|
||||||
to_builtins
|
to_builtins
|
||||||
|
@ -34,7 +37,6 @@ from msgspec import (
|
||||||
|
|
||||||
from ._linux import (
|
from ._linux import (
|
||||||
open_eventfd,
|
open_eventfd,
|
||||||
close_eventfd,
|
|
||||||
EFDReadCancelled,
|
EFDReadCancelled,
|
||||||
EventFD
|
EventFD
|
||||||
)
|
)
|
||||||
|
@ -94,7 +96,7 @@ class RBToken(Struct, frozen=True):
|
||||||
def open_ringbuf(
|
def open_ringbuf(
|
||||||
shm_name: str,
|
shm_name: str,
|
||||||
buf_size: int = _DEFAULT_RB_SIZE,
|
buf_size: int = _DEFAULT_RB_SIZE,
|
||||||
) -> RBToken:
|
) -> ContextManager[RBToken]:
|
||||||
'''
|
'''
|
||||||
Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
|
Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
|
||||||
be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`
|
be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`
|
||||||
|
@ -106,31 +108,19 @@ def open_ringbuf(
|
||||||
create=True
|
create=True
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
token = RBToken(
|
with (
|
||||||
shm_name=shm_name,
|
EventFD(open_eventfd(), 'r') as write_event,
|
||||||
write_eventfd=open_eventfd(),
|
EventFD(open_eventfd(), 'r') as wrap_event,
|
||||||
wrap_eventfd=open_eventfd(),
|
EventFD(open_eventfd(), 'r') as eof_event,
|
||||||
eof_eventfd=open_eventfd(),
|
):
|
||||||
buf_size=buf_size
|
token = RBToken(
|
||||||
)
|
shm_name=shm_name,
|
||||||
yield token
|
write_eventfd=write_event.fd,
|
||||||
try:
|
wrap_eventfd=wrap_event.fd,
|
||||||
close_eventfd(token.write_eventfd)
|
eof_eventfd=eof_event.fd,
|
||||||
|
buf_size=buf_size
|
||||||
except OSError:
|
)
|
||||||
...
|
yield token
|
||||||
|
|
||||||
try:
|
|
||||||
close_eventfd(token.wrap_eventfd)
|
|
||||||
|
|
||||||
except OSError:
|
|
||||||
...
|
|
||||||
|
|
||||||
try:
|
|
||||||
close_eventfd(token.eof_eventfd)
|
|
||||||
|
|
||||||
except OSError:
|
|
||||||
...
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
shm.unlink()
|
shm.unlink()
|
||||||
|
@ -232,6 +222,7 @@ class RingBuffSender(trio.abc.SendStream):
|
||||||
self._eof_event.write(
|
self._eof_event.write(
|
||||||
self._ptr if self._ptr > 0 else self.size
|
self._ptr if self._ptr > 0 else self.size
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._cleanup:
|
if self._cleanup:
|
||||||
self._write_event.close()
|
self._write_event.close()
|
||||||
self._wrap_event.close()
|
self._wrap_event.close()
|
||||||
|
@ -239,7 +230,8 @@ class RingBuffSender(trio.abc.SendStream):
|
||||||
self._shm.close()
|
self._shm.close()
|
||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
self.close()
|
async with self._send_lock:
|
||||||
|
self.close()
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
self.open()
|
self.open()
|
||||||
|
@ -396,7 +388,7 @@ class RingBuffReceiver(trio.abc.ReceiveStream):
|
||||||
async def attach_to_ringbuf_receiver(
|
async def attach_to_ringbuf_receiver(
|
||||||
token: RBToken,
|
token: RBToken,
|
||||||
cleanup: bool = True
|
cleanup: bool = True
|
||||||
):
|
) -> AsyncContextManager[RingBuffReceiver]:
|
||||||
'''
|
'''
|
||||||
Attach a RingBuffReceiver from a previously opened
|
Attach a RingBuffReceiver from a previously opened
|
||||||
RBToken.
|
RBToken.
|
||||||
|
@ -418,7 +410,7 @@ async def attach_to_ringbuf_receiver(
|
||||||
async def attach_to_ringbuf_sender(
|
async def attach_to_ringbuf_sender(
|
||||||
token: RBToken,
|
token: RBToken,
|
||||||
cleanup: bool = True
|
cleanup: bool = True
|
||||||
):
|
) -> AsyncContextManager[RingBuffSender]:
|
||||||
'''
|
'''
|
||||||
Attach a RingBuffSender from a previously opened
|
Attach a RingBuffSender from a previously opened
|
||||||
RBToken.
|
RBToken.
|
||||||
|
@ -435,7 +427,7 @@ async def attach_to_ringbuf_sender(
|
||||||
def open_ringbuf_pair(
|
def open_ringbuf_pair(
|
||||||
name: str,
|
name: str,
|
||||||
buf_size: int = _DEFAULT_RB_SIZE
|
buf_size: int = _DEFAULT_RB_SIZE
|
||||||
):
|
) -> ContextManager[tuple(RBToken, RBToken)]:
|
||||||
'''
|
'''
|
||||||
Handle resources for a ringbuf pair to be used for
|
Handle resources for a ringbuf pair to be used for
|
||||||
bidirectional messaging.
|
bidirectional messaging.
|
||||||
|
@ -461,7 +453,7 @@ async def attach_to_ringbuf_stream(
|
||||||
token_out: RBToken,
|
token_out: RBToken,
|
||||||
cleanup_in: bool = True,
|
cleanup_in: bool = True,
|
||||||
cleanup_out: bool = True
|
cleanup_out: bool = True
|
||||||
):
|
) -> AsyncContextManager[trio.StapledStream]:
|
||||||
'''
|
'''
|
||||||
Attach a trio.StapledStream from a previously opened
|
Attach a trio.StapledStream from a previously opened
|
||||||
ringbuf pair.
|
ringbuf pair.
|
||||||
|
@ -494,16 +486,13 @@ class RingBuffBytesSender(trio.abc.SendChannel[bytes]):
|
||||||
sender: RingBuffSender
|
sender: RingBuffSender
|
||||||
):
|
):
|
||||||
self._sender = sender
|
self._sender = sender
|
||||||
self._send_lock = trio.StrictFIFOLock()
|
|
||||||
|
|
||||||
async def send(self, value: bytes) -> None:
|
async def send(self, value: bytes) -> None:
|
||||||
async with self._send_lock:
|
size: bytes = struct.pack("<I", len(value))
|
||||||
size: bytes = struct.pack("<I", len(value))
|
return await self._sender.send_all(size + value)
|
||||||
return await self._sender.send_all(size + value)
|
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
async with self._send_lock:
|
await self._sender.aclose()
|
||||||
await self._sender.aclose()
|
|
||||||
|
|
||||||
|
|
||||||
class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
|
class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
|
||||||
|
@ -544,6 +533,8 @@ class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
|
||||||
header: bytes = await self._receive_exactly(4)
|
header: bytes = await self._receive_exactly(4)
|
||||||
size: int
|
size: int
|
||||||
size, = struct.unpack("<I", header)
|
size, = struct.unpack("<I", header)
|
||||||
|
if size == 0:
|
||||||
|
raise trio.EndOfChannel
|
||||||
return await self._receive_exactly(size)
|
return await self._receive_exactly(size)
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
|
@ -554,7 +545,7 @@ class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
|
||||||
async def attach_to_ringbuf_rchannel(
|
async def attach_to_ringbuf_rchannel(
|
||||||
token: RBToken,
|
token: RBToken,
|
||||||
cleanup: bool = True
|
cleanup: bool = True
|
||||||
):
|
) -> AsyncContextManager[RingBuffBytesReceiver]:
|
||||||
'''
|
'''
|
||||||
Attach a RingBuffBytesReceiver from a previously opened
|
Attach a RingBuffBytesReceiver from a previously opened
|
||||||
RBToken.
|
RBToken.
|
||||||
|
@ -569,7 +560,7 @@ async def attach_to_ringbuf_rchannel(
|
||||||
async def attach_to_ringbuf_schannel(
|
async def attach_to_ringbuf_schannel(
|
||||||
token: RBToken,
|
token: RBToken,
|
||||||
cleanup: bool = True
|
cleanup: bool = True
|
||||||
):
|
) -> AsyncContextManager[RingBuffBytesSender]:
|
||||||
'''
|
'''
|
||||||
Attach a RingBuffBytesSender from a previously opened
|
Attach a RingBuffBytesSender from a previously opened
|
||||||
RBToken.
|
RBToken.
|
||||||
|
@ -611,7 +602,7 @@ async def attach_to_ringbuf_channel(
|
||||||
token_out: RBToken,
|
token_out: RBToken,
|
||||||
cleanup_in: bool = True,
|
cleanup_in: bool = True,
|
||||||
cleanup_out: bool = True
|
cleanup_out: bool = True
|
||||||
):
|
) -> AsyncContextManager[RingBuffChannel]:
|
||||||
'''
|
'''
|
||||||
Attach to an already opened ringbuf pair and return
|
Attach to an already opened ringbuf pair and return
|
||||||
a `RingBuffChannel`.
|
a `RingBuffChannel`.
|
||||||
|
|
Loading…
Reference in New Issue