Compare commits

..

11 Commits

Author SHA1 Message Date
Guillermo Rodriguez efd11f7d74
Trying to make full suite pass with uds 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez 76cee99fc2
Finally switch to using address protocol in all runtime 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez 5f50206d84
Add root and random addr getters on MsgTransport type 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez a47a7a39b1
Starting to make tractor.ipc.Channel work with multiple MsgTransports 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez bab265b2d8
Important RingBuffBytesSender fix on non batched mode! & downgrade nix-shell python to lowest supported 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 010874bed5
Catch trio cancellation on RingBuffReceiver bg eof listener task, add batched mode to RingBuffBytesSender 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez ea010ab46a
Add direct read method on EventFD
Type hint all ctx managers in _ringbuf.py
Remove unnecesary send lock on ring chan sender
Handle EOF on ring chan receiver
Rename ringbuf tests to make it less redundant
2025-03-27 20:36:46 -03:00
Guillermo Rodriguez be7fc89ae9
Add direct ctx managers for RB channels 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 2a9a78651b
Improve test_ringbuf test, drop MsgTransport ring buf impl for now in favour of a trio.abc.Channel[bytes] impl, add docstrings 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez be818a720a
Switch `tractor.ipc.MsgTransport.stream` type to `trio.abc.Stream`
Add EOF signaling mechanism
Support proper `receive_some` end of stream semantics
Add StapledStream non-ipc test
Create MsgpackRBStream similar to MsgpackTCPStream for buffered whole-msg reads
Add EventFD.read cancellation on EventFD.close mechanism using cancel scope
Add test for eventfd cancellation
Improve and add docstrings
2025-03-27 20:36:46 -03:00
Guillermo Rodriguez ba353bf46f
Better encapsulate RingBuff ctx managment methods and support non ipc usage
Add trio.StrictFIFOLock on sender.send_all
Support max_bytes argument on receive_some, keep track of write_ptr on receiver
Add max_bytes receive test test_ringbuf_max_bytes
Add docstrings to all ringbuf tests
Remove EFD_NONBLOCK support, not necesary anymore since we can use abandon_on_cancel=True on trio.to_thread.run_sync
Close eventfd's after usage on open_ringbuf
2025-03-27 20:36:46 -03:00
8 changed files with 888 additions and 146 deletions

View File

@ -14,6 +14,6 @@ pkgs.mkShell {
shellHook = ''
set -e
uv venv .venv --python=3.12
uv venv .venv --python=3.11
'';
}

View File

@ -0,0 +1,32 @@
import trio
import pytest
from tractor.ipc import (
open_eventfd,
EFDReadCancelled,
EventFD
)
def test_eventfd_read_cancellation():
'''
Ensure EventFD.read raises EFDReadCancelled if EventFD.close()
is called.
'''
fd = open_eventfd()
async def _read(event: EventFD):
with pytest.raises(EFDReadCancelled):
await event.read()
async def main():
async with trio.open_nursery() as n:
with (
EventFD(fd, 'w') as event,
trio.fail_after(3)
):
n.start_soon(_read, event)
await trio.sleep(0.2)
event.close()
trio.run(main)

View File

@ -1,15 +1,21 @@
import time
import hashlib
import trio
import pytest
import tractor
from tractor.ipc import (
open_ringbuf,
attach_to_ringbuf_receiver,
attach_to_ringbuf_sender,
attach_to_ringbuf_stream,
attach_to_ringbuf_channel,
RBToken,
RingBuffSender,
RingBuffReceiver
)
from tractor._testing.samples import generate_sample_messages
from tractor._testing.samples import (
generate_single_byte_msgs,
generate_sample_messages
)
@tractor.context
@ -17,19 +23,27 @@ async def child_read_shm(
ctx: tractor.Context,
msg_amount: int,
token: RBToken,
total_bytes: int,
) -> None:
recvd_bytes = 0
await ctx.started()
start_ts = time.time()
async with RingBuffReceiver(token) as receiver:
while recvd_bytes < total_bytes:
msg = await receiver.receive_some()
recvd_bytes += len(msg)
) -> str:
'''
Sub-actor used in `test_ringbuf`.
# make sure we dont hold any memoryviews
# before the ctx manager aclose()
msg = None
Attach to a ringbuf and receive all messages until end of stream.
Keep track of how many bytes received and also calculate
sha256 of the whole byte stream.
Calculate and print performance stats, finally return calculated
hash.
'''
await ctx.started()
print('reader started')
recvd_bytes = 0
recvd_hash = hashlib.sha256()
start_ts = time.time()
async with attach_to_ringbuf_receiver(token) as receiver:
async for msg in receiver:
recvd_hash.update(msg)
recvd_bytes += len(msg)
end_ts = time.time()
elapsed = end_ts - start_ts
@ -38,6 +52,9 @@ async def child_read_shm(
print(f'\n\telapsed ms: {elapsed_ms}')
print(f'\tmsg/sec: {int(msg_amount / elapsed):,}')
print(f'\tbytes/sec: {int(recvd_bytes / elapsed):,}')
print(f'\treceived bytes: {recvd_bytes:,}')
return recvd_hash.hexdigest()
@tractor.context
@ -48,16 +65,32 @@ async def child_write_shm(
rand_max: int,
token: RBToken,
) -> None:
msgs, total_bytes = generate_sample_messages(
'''
Sub-actor used in `test_ringbuf`
Generate `msg_amount` payloads with
`random.randint(rand_min, rand_max)` random bytes at the end,
Calculate sha256 hash and send it to parent on `ctx.started`.
Attach to ringbuf and send all generated messages.
'''
msgs, _total_bytes = generate_sample_messages(
msg_amount,
rand_min=rand_min,
rand_max=rand_max,
)
await ctx.started(total_bytes)
async with RingBuffSender(token) as sender:
print('writer hashing payload...')
sent_hash = hashlib.sha256(b''.join(msgs)).hexdigest()
print('writer done hashing.')
await ctx.started(sent_hash)
print('writer started')
async with attach_to_ringbuf_sender(token, cleanup=False) as sender:
for msg in msgs:
await sender.send_all(msg)
print('writer exit')
@pytest.mark.parametrize(
'msg_amount,rand_min,rand_max,buf_size',
@ -83,19 +116,23 @@ def test_ringbuf(
rand_max: int,
buf_size: int
):
'''
- Open a new ring buf on root actor
- Open `child_write_shm` ctx in sub-actor which will generate a
random payload and send its hash on `ctx.started`, finally sending
the payload through the stream.
- Open `child_read_shm` ctx in sub-actor which will receive the
payload, calculate perf stats and return the hash.
- Compare both hashes
'''
async def main():
with open_ringbuf(
'test_ringbuf',
buf_size=buf_size
) as token:
proc_kwargs = {
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
}
proc_kwargs = {'pass_fds': token.fds}
common_kwargs = {
'msg_amount': msg_amount,
'token': token,
}
async with tractor.open_nursery() as an:
send_p = await an.start_actor(
'ring_sender',
@ -110,17 +147,20 @@ def test_ringbuf(
async with (
send_p.open_context(
child_write_shm,
token=token,
msg_amount=msg_amount,
rand_min=rand_min,
rand_max=rand_max,
**common_kwargs
) as (sctx, total_bytes),
) as (_sctx, sent_hash),
recv_p.open_context(
child_read_shm,
**common_kwargs,
total_bytes=total_bytes,
) as (sctx, _sent),
token=token,
msg_amount=msg_amount
) as (rctx, _sent),
):
await recv_p.result()
recvd_hash = await rctx.result()
assert sent_hash == recvd_hash
await send_p.cancel_actor()
await recv_p.cancel_actor()
@ -134,23 +174,28 @@ async def child_blocked_receiver(
ctx: tractor.Context,
token: RBToken
):
async with RingBuffReceiver(token) as receiver:
async with attach_to_ringbuf_receiver(token) as receiver:
await ctx.started()
await receiver.receive_some()
def test_ring_reader_cancel():
def test_reader_cancel():
'''
Test that a receiver blocked on eventfd(2) read responds to
cancellation.
'''
async def main():
with open_ringbuf('test_ring_cancel_reader') as token:
async with (
tractor.open_nursery() as an,
RingBuffSender(token) as _sender,
attach_to_ringbuf_sender(token) as _sender,
):
recv_p = await an.start_actor(
'ring_blocked_receiver',
enable_modules=[__name__],
proc_kwargs={
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
'pass_fds': token.fds
}
)
async with (
@ -172,12 +217,17 @@ async def child_blocked_sender(
ctx: tractor.Context,
token: RBToken
):
async with RingBuffSender(token) as sender:
async with attach_to_ringbuf_sender(token) as sender:
await ctx.started()
await sender.send_all(b'this will wrap')
def test_ring_sender_cancel():
def test_sender_cancel():
'''
Test that a sender blocked on eventfd(2) read responds to
cancellation.
'''
async def main():
with open_ringbuf(
'test_ring_cancel_sender',
@ -188,7 +238,7 @@ def test_ring_sender_cancel():
'ring_blocked_sender',
enable_modules=[__name__],
proc_kwargs={
'pass_fds': (token.write_eventfd, token.wrap_eventfd)
'pass_fds': token.fds
}
)
async with (
@ -203,3 +253,171 @@ def test_ring_sender_cancel():
with pytest.raises(tractor._exceptions.ContextCancelled):
trio.run(main)
def test_receiver_max_bytes():
'''
Test that RingBuffReceiver.receive_some's max_bytes optional
argument works correctly, send a msg of size 100, then
force receive of messages with max_bytes == 1, wait until
100 of these messages are received, then compare join of
msgs with original message
'''
msg = generate_single_byte_msgs(100)
msgs = []
async def main():
with open_ringbuf(
'test_ringbuf_max_bytes',
buf_size=10
) as token:
async with (
trio.open_nursery() as n,
attach_to_ringbuf_sender(token, cleanup=False) as sender,
attach_to_ringbuf_receiver(token, cleanup=False) as receiver
):
async def _send_and_close():
await sender.send_all(msg)
await sender.aclose()
n.start_soon(_send_and_close)
while len(msgs) < len(msg):
msg_part = await receiver.receive_some(max_bytes=1)
assert len(msg_part) == 1
msgs.append(msg_part)
trio.run(main)
assert msg == b''.join(msgs)
def test_stapled_ringbuf():
'''
Open two ringbufs and give tokens to tasks (swap them such that in/out tokens
are inversed on each task) which will open the streams and use trio.StapledStream
to have a single bidirectional stream.
Then take turns to send and receive messages.
'''
msg = generate_single_byte_msgs(100)
pair_0_msgs = []
pair_1_msgs = []
pair_0_done = trio.Event()
pair_1_done = trio.Event()
async def pair_0(token_in: RBToken, token_out: RBToken):
async with attach_to_ringbuf_stream(
token_in,
token_out,
cleanup_in=False,
cleanup_out=False
) as stream:
# first turn to send
await stream.send_all(msg)
# second turn to receive
while len(pair_0_msgs) != len(msg):
_msg = await stream.receive_some(max_bytes=1)
pair_0_msgs.append(_msg)
pair_0_done.set()
await pair_1_done.wait()
async def pair_1(token_in: RBToken, token_out: RBToken):
async with attach_to_ringbuf_stream(
token_in,
token_out,
cleanup_in=False,
cleanup_out=False
) as stream:
# first turn to receive
while len(pair_1_msgs) != len(msg):
_msg = await stream.receive_some(max_bytes=1)
pair_1_msgs.append(_msg)
# second turn to send
await stream.send_all(msg)
pair_1_done.set()
await pair_0_done.wait()
async def main():
with tractor.ipc.open_ringbuf_pair(
'test_stapled_ringbuf'
) as (token_0, token_1):
async with trio.open_nursery() as n:
n.start_soon(pair_0, token_0, token_1)
n.start_soon(pair_1, token_1, token_0)
trio.run(main)
assert msg == b''.join(pair_0_msgs)
assert msg == b''.join(pair_1_msgs)
@tractor.context
async def child_channel_sender(
ctx: tractor.Context,
msg_amount_min: int,
msg_amount_max: int,
token_in: RBToken,
token_out: RBToken
):
import random
msgs, _total_bytes = generate_sample_messages(
random.randint(msg_amount_min, msg_amount_max),
rand_min=256,
rand_max=1024,
)
async with attach_to_ringbuf_channel(
token_in,
token_out
) as chan:
await ctx.started(msgs)
for msg in msgs:
await chan.send(msg)
def test_channel():
msg_amount_min = 100
msg_amount_max = 1000
async def main():
with tractor.ipc.open_ringbuf_pair(
'test_ringbuf_transport'
) as (token_0, token_1):
async with (
attach_to_ringbuf_channel(token_0, token_1) as chan,
tractor.open_nursery() as an
):
recv_p = await an.start_actor(
'test_ringbuf_transport_sender',
enable_modules=[__name__],
proc_kwargs={
'pass_fds': token_0.fds + token_1.fds
}
)
async with (
recv_p.open_context(
child_channel_sender,
msg_amount_min=msg_amount_min,
msg_amount_max=msg_amount_max,
token_in=token_1,
token_out=token_0
) as (ctx, msgs),
):
recv_msgs = []
async for msg in chan:
recv_msgs.append(msg)
await recv_p.cancel_actor()
assert recv_msgs == msgs
trio.run(main)

View File

@ -2,19 +2,59 @@ import os
import random
def generate_single_byte_msgs(amount: int) -> bytes:
'''
Generate a byte instance of len `amount` with:
```
byte_at_index(i) = (i % 10).encode()
```
this results in constantly repeating sequences of:
b'0123456789'
'''
return b''.join(str(i % 10).encode() for i in range(amount))
def generate_sample_messages(
amount: int,
rand_min: int = 0,
rand_max: int = 0,
silent: bool = False
silent: bool = False,
) -> tuple[list[bytes], int]:
'''
Generate bytes msgs for tests.
Messages will have the following format:
```
b'[{i:08}]' + os.urandom(random.randint(rand_min, rand_max))
```
so for message index 25:
b'[00000025]' + random_bytes
'''
msgs = []
size = 0
log_interval = None
if not silent:
print(f'\ngenerating {amount} messages...')
# calculate an apropiate log interval based on
# max message size
max_msg_size = 10 + rand_max
if max_msg_size <= 32 * 1024:
log_interval = 10_000
else:
log_interval = 1000
for i in range(amount):
msg = f'[{i:08}]'.encode('utf-8')
@ -26,7 +66,13 @@ def generate_sample_messages(
msgs.append(msg)
if not silent and i and i % 10_000 == 0:
if (
not silent
and
i > 0
and
i % log_interval == 0
):
print(f'{i} generated')
if not silent:

View File

@ -44,12 +44,23 @@ if platform.system() == 'Linux':
write_eventfd as write_eventfd,
read_eventfd as read_eventfd,
close_eventfd as close_eventfd,
EFDReadCancelled as EFDReadCancelled,
EventFD as EventFD,
)
from ._ringbuf import (
RBToken as RBToken,
open_ringbuf as open_ringbuf,
RingBuffSender as RingBuffSender,
RingBuffReceiver as RingBuffReceiver,
open_ringbuf as open_ringbuf
open_ringbuf_pair as open_ringbuf_pair,
attach_to_ringbuf_receiver as attach_to_ringbuf_receiver,
attach_to_ringbuf_sender as attach_to_ringbuf_sender,
attach_to_ringbuf_stream as attach_to_ringbuf_stream,
RingBuffBytesSender as RingBuffBytesSender,
RingBuffBytesReceiver as RingBuffBytesReceiver,
RingBuffChannel as RingBuffChannel,
attach_to_ringbuf_schannel as attach_to_ringbuf_schannel,
attach_to_ringbuf_rchannel as attach_to_ringbuf_rchannel,
attach_to_ringbuf_channel as attach_to_ringbuf_channel,
)

View File

@ -108,6 +108,10 @@ def close_eventfd(fd: int) -> int:
raise OSError(errno.errorcode[ffi.errno], 'close failed')
class EFDReadCancelled(Exception):
...
class EventFD:
'''
Use a previously opened eventfd(2), meant to be used in
@ -124,6 +128,7 @@ class EventFD:
self._fd: int = fd
self._omode: str = omode
self._fobj = None
self._cscope: trio.CancelScope | None = None
@property
def fd(self) -> int | None:
@ -133,17 +138,46 @@ class EventFD:
return write_eventfd(self._fd, value)
async def read(self) -> int:
return await trio.to_thread.run_sync(
read_eventfd, self._fd,
abandon_on_cancel=True
)
'''
Async wrapper for `read_eventfd(self.fd)`
`trio.to_thread.run_sync` is used, need to use a `trio.CancelScope`
in order to make it cancellable when `self.close()` is called.
'''
self._cscope = trio.CancelScope()
with self._cscope:
return await trio.to_thread.run_sync(
read_eventfd, self._fd,
abandon_on_cancel=True
)
if self._cscope.cancelled_caught:
raise EFDReadCancelled
self._cscope = None
def read_direct(self) -> int:
'''
Direct call to `read_eventfd(self.fd)`, unless `eventfd` was
opened with `EFD_NONBLOCK` its gonna block the thread.
'''
return read_eventfd(self._fd)
def open(self):
self._fobj = os.fdopen(self._fd, self._omode)
def close(self):
if self._fobj:
self._fobj.close()
try:
self._fobj.close()
except OSError:
...
if self._cscope:
self._cscope.cancel()
def __enter__(self):
self.open()

View File

@ -18,7 +18,15 @@ IPC Reliable RingBuffer implementation
'''
from __future__ import annotations
from contextlib import contextmanager as cm
import struct
from typing import (
ContextManager,
AsyncContextManager
)
from contextlib import (
contextmanager as cm,
asynccontextmanager as acm
)
from multiprocessing.shared_memory import SharedMemory
import trio
@ -28,25 +36,37 @@ from msgspec import (
)
from ._linux import (
EFD_NONBLOCK,
open_eventfd,
EFDReadCancelled,
EventFD
)
from ._mp_bs import disable_mantracker
from tractor.log import get_logger
from tractor._exceptions import (
InternalError
)
log = get_logger(__name__)
disable_mantracker()
_DEFAULT_RB_SIZE = 10 * 1024
class RBToken(Struct, frozen=True):
'''
RingBuffer token contains necesary info to open the two
RingBuffer token contains necesary info to open the three
eventfds and the shared memory
'''
shm_name: str
write_eventfd: int
wrap_eventfd: int
write_eventfd: int # used to signal writer ptr advance
wrap_eventfd: int # used to signal reader ready after wrap around
eof_eventfd: int # used to signal writer closed
buf_size: int
def as_msg(self):
@ -59,62 +79,97 @@ class RBToken(Struct, frozen=True):
return RBToken(**msg)
@property
def fds(self) -> tuple[int, int, int]:
'''
Useful for `pass_fds` params
'''
return (
self.write_eventfd,
self.wrap_eventfd,
self.eof_eventfd
)
@cm
def open_ringbuf(
shm_name: str,
buf_size: int = 10 * 1024,
write_efd_flags: int = 0,
wrap_efd_flags: int = 0
) -> RBToken:
buf_size: int = _DEFAULT_RB_SIZE,
) -> ContextManager[RBToken]:
'''
Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
be used with `attach_to_ringbuf_sender` and `attach_to_ringbuf_receiver`
'''
shm = SharedMemory(
name=shm_name,
size=buf_size,
create=True
)
try:
token = RBToken(
shm_name=shm_name,
write_eventfd=open_eventfd(flags=write_efd_flags),
wrap_eventfd=open_eventfd(flags=wrap_efd_flags),
buf_size=buf_size
)
yield token
with (
EventFD(open_eventfd(), 'r') as write_event,
EventFD(open_eventfd(), 'r') as wrap_event,
EventFD(open_eventfd(), 'r') as eof_event,
):
token = RBToken(
shm_name=shm_name,
write_eventfd=write_event.fd,
wrap_eventfd=wrap_event.fd,
eof_eventfd=eof_event.fd,
buf_size=buf_size
)
yield token
finally:
shm.unlink()
Buffer = bytes | bytearray | memoryview
'''
IPC Reliable Ring Buffer
`eventfd(2)` is used for wrap around sync, to signal writes to
the reader and end of stream.
'''
class RingBuffSender(trio.abc.SendStream):
'''
IPC Reliable Ring Buffer sender side implementation
Ring Buffer sender side implementation
`eventfd(2)` is used for wrap around sync, and also to signal
writes to the reader.
Do not use directly! manage with `attach_to_ringbuf_sender`
after having opened a ringbuf context with `open_ringbuf`.
'''
def __init__(
self,
token: RBToken,
start_ptr: int = 0,
cleanup: bool = False
):
token = RBToken.from_msg(token)
self._shm = SharedMemory(
name=token.shm_name,
size=token.buf_size,
create=False
)
self._write_event = EventFD(token.write_eventfd, 'w')
self._wrap_event = EventFD(token.wrap_eventfd, 'r')
self._ptr = start_ptr
self._token = RBToken.from_msg(token)
self._shm: SharedMemory | None = None
self._write_event = EventFD(self._token.write_eventfd, 'w')
self._wrap_event = EventFD(self._token.wrap_eventfd, 'r')
self._eof_event = EventFD(self._token.eof_eventfd, 'w')
self._ptr = 0
self._cleanup = cleanup
self._send_lock = trio.StrictFIFOLock()
@property
def key(self) -> str:
def name(self) -> str:
if not self._shm:
raise ValueError('shared memory not initialized yet!')
return self._shm.name
@property
def size(self) -> int:
return self._shm.size
return self._token.buf_size
@property
def ptr(self) -> int:
@ -128,73 +183,97 @@ class RingBuffSender(trio.abc.SendStream):
def wrap_fd(self) -> int:
return self._wrap_event.fd
async def send_all(self, data: bytes | bytearray | memoryview):
# while data is larger than the remaining buf
target_ptr = self.ptr + len(data)
while target_ptr > self.size:
# write all bytes that fit
remaining = self.size - self.ptr
self._shm.buf[self.ptr:] = data[:remaining]
# signal write and wait for reader wrap around
self._write_event.write(remaining)
await self._wrap_event.read()
async def _wait_wrap(self):
await self._wrap_event.read()
# wrap around and trim already written bytes
self._ptr = 0
data = data[remaining:]
target_ptr = self._ptr + len(data)
async def send_all(self, data: Buffer):
async with self._send_lock:
# while data is larger than the remaining buf
target_ptr = self.ptr + len(data)
while target_ptr > self.size:
# write all bytes that fit
remaining = self.size - self.ptr
self._shm.buf[self.ptr:] = data[:remaining]
# signal write and wait for reader wrap around
self._write_event.write(remaining)
await self._wait_wrap()
# remaining data fits on buffer
self._shm.buf[self.ptr:target_ptr] = data
self._write_event.write(len(data))
self._ptr = target_ptr
# wrap around and trim already written bytes
self._ptr = 0
data = data[remaining:]
target_ptr = self._ptr + len(data)
# remaining data fits on buffer
self._shm.buf[self.ptr:target_ptr] = data
self._write_event.write(len(data))
self._ptr = target_ptr
async def wait_send_all_might_not_block(self):
raise NotImplementedError
async def aclose(self):
self._write_event.close()
self._wrap_event.close()
self._shm.close()
async def __aenter__(self):
def open(self):
self._shm = SharedMemory(
name=self._token.shm_name,
size=self._token.buf_size,
create=False
)
self._write_event.open()
self._wrap_event.open()
self._eof_event.open()
def close(self):
self._eof_event.write(
self._ptr if self._ptr > 0 else self.size
)
if self._cleanup:
self._write_event.close()
self._wrap_event.close()
self._eof_event.close()
self._shm.close()
async def aclose(self):
async with self._send_lock:
self.close()
async def __aenter__(self):
self.open()
return self
class RingBuffReceiver(trio.abc.ReceiveStream):
'''
IPC Reliable Ring Buffer receiver side implementation
Ring Buffer receiver side implementation
`eventfd(2)` is used for wrap around sync, and also to signal
writes to the reader.
Do not use directly! manage with `attach_to_ringbuf_receiver`
after having opened a ringbuf context with `open_ringbuf`.
'''
def __init__(
self,
token: RBToken,
start_ptr: int = 0,
flags: int = 0
cleanup: bool = True,
):
token = RBToken.from_msg(token)
self._shm = SharedMemory(
name=token.shm_name,
size=token.buf_size,
create=False
)
self._write_event = EventFD(token.write_eventfd, 'w')
self._wrap_event = EventFD(token.wrap_eventfd, 'r')
self._ptr = start_ptr
self._flags = flags
self._token = RBToken.from_msg(token)
self._shm: SharedMemory | None = None
self._write_event = EventFD(self._token.write_eventfd, 'w')
self._wrap_event = EventFD(self._token.wrap_eventfd, 'r')
self._eof_event = EventFD(self._token.eof_eventfd, 'r')
self._ptr: int = 0
self._write_ptr: int = 0
self._end_ptr: int = -1
self._cleanup: bool = cleanup
@property
def key(self) -> str:
def name(self) -> str:
if not self._shm:
raise ValueError('shared memory not initialized yet!')
return self._shm.name
@property
def size(self) -> int:
return self._shm.size
return self._token.buf_size
@property
def ptr(self) -> int:
@ -208,46 +287,368 @@ class RingBuffReceiver(trio.abc.ReceiveStream):
def wrap_fd(self) -> int:
return self._wrap_event.fd
async def receive_some(
self,
max_bytes: int | None = None,
nb_timeout: float = 0.1
) -> memoryview:
# if non blocking eventfd enabled, do polling
# until next write, this allows signal handling
if self._flags | EFD_NONBLOCK:
delta = None
while delta is None:
async def _eof_monitor_task(self):
'''
Long running EOF event monitor, automatically run in bg by
`attach_to_ringbuf_receiver` context manager, if EOF event
is set its value will be the end pointer (highest valid
index to be read from buf, after setting the `self._end_ptr`
we close the write event which should cancel any blocked
`self._write_event.read()`s on it.
'''
try:
self._end_ptr = await self._eof_event.read()
self._write_event.close()
except EFDReadCancelled:
...
except trio.Cancelled:
...
async def receive_some(self, max_bytes: int | None = None) -> bytes:
'''
Receive up to `max_bytes`, if no `max_bytes` is provided
a reasonable default is used.
'''
if max_bytes is None:
max_bytes: int = _DEFAULT_RB_SIZE
if max_bytes < 1:
raise ValueError("max_bytes must be >= 1")
# delta is remaining bytes we havent read
delta = self._write_ptr - self._ptr
if delta == 0:
# we have read all we can, see if new data is available
if self._end_ptr < 0:
# if we havent been signaled about EOF yet
try:
delta = await self._write_event.read()
self._write_ptr += delta
except OSError as e:
if e.errno == 'EAGAIN':
continue
except EFDReadCancelled:
# while waiting for new data `self._write_event` was closed
# this means writer signaled EOF
if self._end_ptr > 0:
# final self._write_ptr modification and recalculate delta
self._write_ptr = self._end_ptr
delta = self._end_ptr - self._ptr
raise e
else:
# shouldnt happen cause self._eof_monitor_task always sets
# self._end_ptr before closing self._write_event
raise InternalError(
'self._write_event.read cancelled but self._end_ptr is not set'
)
else:
delta = await self._write_event.read()
else:
# no more bytes to read and self._end_ptr set, EOF reached
return b''
# dont overflow caller
delta = min(delta, max_bytes)
target_ptr = self._ptr + delta
# fetch next segment and advance ptr
next_ptr = self._ptr + delta
segment = self._shm.buf[self._ptr:next_ptr]
self._ptr = next_ptr
segment = bytes(self._shm.buf[self._ptr:target_ptr])
self._ptr = target_ptr
if self.ptr == self.size:
if self._ptr == self.size:
# reached the end, signal wrap around
self._ptr = 0
self._write_ptr = 0
self._wrap_event.write(1)
return segment
async def aclose(self):
self._write_event.close()
self._wrap_event.close()
self._shm.close()
async def __aenter__(self):
def open(self):
self._shm = SharedMemory(
name=self._token.shm_name,
size=self._token.buf_size,
create=False
)
self._write_event.open()
self._wrap_event.open()
self._eof_event.open()
def close(self):
if self._cleanup:
self._write_event.close()
self._wrap_event.close()
self._eof_event.close()
self._shm.close()
async def aclose(self):
self.close()
async def __aenter__(self):
self.open()
return self
@acm
async def attach_to_ringbuf_receiver(
token: RBToken,
cleanup: bool = True
) -> AsyncContextManager[RingBuffReceiver]:
'''
Attach a RingBuffReceiver from a previously opened
RBToken.
Launches `receiver._eof_monitor_task` in a `trio.Nursery`.
'''
async with (
trio.open_nursery() as n,
RingBuffReceiver(
token,
cleanup=cleanup
) as receiver
):
n.start_soon(receiver._eof_monitor_task)
yield receiver
@acm
async def attach_to_ringbuf_sender(
token: RBToken,
cleanup: bool = True
) -> AsyncContextManager[RingBuffSender]:
'''
Attach a RingBuffSender from a previously opened
RBToken.
'''
async with RingBuffSender(
token,
cleanup=cleanup
) as sender:
yield sender
@cm
def open_ringbuf_pair(
name: str,
buf_size: int = _DEFAULT_RB_SIZE
) -> ContextManager[tuple(RBToken, RBToken)]:
'''
Handle resources for a ringbuf pair to be used for
bidirectional messaging.
'''
with (
open_ringbuf(
name + '.pair0',
buf_size=buf_size
) as token_0,
open_ringbuf(
name + '.pair1',
buf_size=buf_size
) as token_1
):
yield token_0, token_1
@acm
async def attach_to_ringbuf_stream(
token_in: RBToken,
token_out: RBToken,
cleanup_in: bool = True,
cleanup_out: bool = True
) -> AsyncContextManager[trio.StapledStream]:
'''
Attach a trio.StapledStream from a previously opened
ringbuf pair.
'''
async with (
attach_to_ringbuf_receiver(
token_in,
cleanup=cleanup_in
) as receiver,
attach_to_ringbuf_sender(
token_out,
cleanup=cleanup_out
) as sender,
):
yield trio.StapledStream(sender, receiver)
class RingBuffBytesSender(trio.abc.SendChannel[bytes]):
'''
In order to guarantee full messages are received, all bytes
sent by `RingBuffBytesSender` are preceded with a 4 byte header
which decodes into a uint32 indicating the actual size of the
next payload.
Optional batch mode:
If `batch_size` > 1 messages wont get sent immediately but will be
stored until `batch_size` messages are pending, then it will send
them all at once.
`batch_size` can be changed dynamically but always call, `flush()`
right before.
'''
def __init__(
self,
sender: RingBuffSender,
batch_size: int = 1
):
self._sender = sender
self.batch_size = batch_size
self._batch_msg_len = 0
self._batch: bytes = b''
async def flush(self) -> None:
await self._sender.send_all(self._batch)
self._batch = b''
self._batch_msg_len = 0
async def send(self, value: bytes) -> None:
msg: bytes = struct.pack("<I", len(value)) + value
if self.batch_size == 1:
await self._sender.send_all(msg)
return
self._batch += msg
self._batch_msg_len += 1
if self._batch_msg_len == self.batch_size:
await self.flush()
async def aclose(self) -> None:
await self._sender.aclose()
class RingBuffBytesReceiver(trio.abc.ReceiveChannel[bytes]):
'''
See `RingBuffBytesSender` docstring.
A `tricycle.BufferedReceiveStream` is used for the
`receive_exactly` API.
'''
def __init__(
self,
receiver: RingBuffReceiver
):
self._receiver = receiver
async def _receive_exactly(self, num_bytes: int) -> bytes:
'''
Fetch bytes from receiver until we read exactly `num_bytes`
or end of stream is signaled.
'''
payload = b''
while len(payload) < num_bytes:
remaining = num_bytes - len(payload)
new_bytes = await self._receiver.receive_some(
max_bytes=remaining
)
if new_bytes == b'':
raise trio.EndOfChannel
payload += new_bytes
return payload
async def receive(self) -> bytes:
header: bytes = await self._receive_exactly(4)
size: int
size, = struct.unpack("<I", header)
if size == 0:
raise trio.EndOfChannel
return await self._receive_exactly(size)
async def aclose(self) -> None:
await self._receiver.aclose()
@acm
async def attach_to_ringbuf_rchannel(
token: RBToken,
cleanup: bool = True
) -> AsyncContextManager[RingBuffBytesReceiver]:
'''
Attach a RingBuffBytesReceiver from a previously opened
RBToken.
'''
async with attach_to_ringbuf_receiver(
token, cleanup=cleanup
) as receiver:
yield RingBuffBytesReceiver(receiver)
@acm
async def attach_to_ringbuf_schannel(
token: RBToken,
cleanup: bool = True,
batch_size: int = 1,
) -> AsyncContextManager[RingBuffBytesSender]:
'''
Attach a RingBuffBytesSender from a previously opened
RBToken.
'''
async with attach_to_ringbuf_sender(
token, cleanup=cleanup
) as sender:
yield RingBuffBytesSender(sender, batch_size=batch_size)
class RingBuffChannel(trio.abc.Channel[bytes]):
'''
Combine `RingBuffBytesSender` and `RingBuffBytesReceiver`
in order to expose the bidirectional `trio.abc.Channel` API.
'''
def __init__(
self,
sender: RingBuffBytesSender,
receiver: RingBuffBytesReceiver
):
self._sender = sender
self._receiver = receiver
async def send(self, value: bytes):
await self._sender.send(value)
async def receive(self) -> bytes:
return await self._receiver.receive()
async def aclose(self):
await self._receiver.aclose()
await self._sender.aclose()
@acm
async def attach_to_ringbuf_channel(
token_in: RBToken,
token_out: RBToken,
cleanup_in: bool = True,
cleanup_out: bool = True
) -> AsyncContextManager[RingBuffChannel]:
'''
Attach to an already opened ringbuf pair and return
a `RingBuffChannel`.
'''
async with (
attach_to_ringbuf_rchannel(
token_in,
cleanup=cleanup_in
) as receiver,
attach_to_ringbuf_schannel(
token_out,
cleanup=cleanup_out
) as sender,
):
yield RingBuffChannel(sender, receiver)

View File

@ -73,7 +73,7 @@ class MsgTransport(Protocol[MsgType]):
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
stream: trio.SocketStream
stream: trio.abc.Stream
drained: list[MsgType]
address_type: ClassVar[Type[Address]]