Fix rb non ipc case and tests in general

one_ring_to_rule_them_all ring-latest
Guillermo Rodriguez 2025-04-22 05:38:32 -03:00
parent 7766caf623
commit 053078ce8f
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
4 changed files with 86 additions and 50 deletions

View File

@ -120,21 +120,24 @@ async def open_pubsub_test_actors(
publisher_child,
batch_size=batch_size
) as (long_sctx, _),
open_ringbufs(ring_names) as tokens,
gather_contexts([
open_sub_channel_at('sub', ring)
for ring in tokens
]),
gather_contexts([
open_pub_channel_at('pub', ring)
for ring in tokens
]),
sub_portal.open_context(subscribe_range, size=size) as (rctx, _),
pub_portal.open_context(publish_range, size=size) as (sctx, _)
):
with open_ringbufs(ring_names) as tokens:
async with (
gather_contexts([
open_sub_channel_at('sub', ring)
for ring in tokens
]),
gather_contexts([
open_pub_channel_at('pub', ring)
for ring in tokens
]),
sub_portal.open_context(subscribe_range, size=size) as (rctx, _),
pub_portal.open_context(publish_range, size=size) as (sctx, _)
):
yield
yield
await rctx.wait_for_result()
await sctx.wait_for_result()
await long_sctx.cancel()
await long_rctx.cancel()

View File

@ -18,9 +18,6 @@ from tractor._testing.samples import (
RandomBytesGenerator
)
# in case you don't want to melt your cores, uncomment dis!
pytestmark = pytest.mark.skip
@tractor.context
async def child_read_shm(
@ -273,19 +270,24 @@ def test_receiver_max_bytes():
msg = generate_single_byte_msgs(100)
msgs = []
rb_common = {
'cleanup': False,
'is_ipc': False
}
async def main():
async with (
tractor.open_nursery(),
open_ringbuf(
'test_ringbuf_max_bytes',
buf_size=10
buf_size=10,
is_ipc=False
) as token,
trio.open_nursery() as n,
attach_to_ringbuf_sender(token, cleanup=False) as sender,
attach_to_ringbuf_sender(token, **rb_common) as sender,
attach_to_ringbuf_receiver(token, cleanup=False) as receiver
attach_to_ringbuf_receiver(token, **rb_common) as receiver
):
async def _send_and_close():
await sender.send_all(msg)

View File

@ -72,7 +72,7 @@ class RBToken(Struct, frozen=True):
even in the case that ringbuf was not allocated by current actor.
'''
owner: str # if owner != `current_actor().name` we must use fdshare
owner: str | None # if owner != `current_actor().name` we must use fdshare
shm_name: str
@ -104,6 +104,7 @@ class RBToken(Struct, frozen=True):
def alloc_ringbuf(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
is_ipc: bool = True
) -> tuple[SharedMemory, RBToken]:
'''
Allocate OS resources for a ringbuf.
@ -114,18 +115,21 @@ def alloc_ringbuf(
create=True
)
token = RBToken(
owner=current_actor().name,
owner=current_actor().name if is_ipc else None,
shm_name=shm_name,
write_eventfd=open_eventfd(),
wrap_eventfd=open_eventfd(),
eof_eventfd=open_eventfd(),
buf_size=buf_size
)
# register fds for sharing
share_fds(
shm_name,
token.fds,
)
if is_ipc:
# register fds for sharing
share_fds(
shm_name,
token.fds,
)
return shm, token
@ -133,6 +137,7 @@ def alloc_ringbuf(
def open_ringbuf_sync(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
is_ipc: bool = True
) -> ContextManager[RBToken]:
'''
Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to
@ -143,11 +148,15 @@ def open_ringbuf_sync(
shm: SharedMemory | None = None
token: RBToken | None = None
try:
shm, token = alloc_ringbuf(shm_name, buf_size=buf_size)
shm, token = alloc_ringbuf(
shm_name,
buf_size=buf_size,
is_ipc=is_ipc
)
yield token
finally:
if token:
if token and is_ipc:
unshare_fds(shm_name)
if shm:
@ -157,6 +166,7 @@ def open_ringbuf_sync(
async def open_ringbuf(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE,
is_ipc: bool = True
) -> AsyncContextManager[RBToken]:
'''
Helper to use `open_ringbuf_sync` inside an async with block.
@ -164,7 +174,8 @@ async def open_ringbuf(
'''
with open_ringbuf_sync(
shm_name,
buf_size=buf_size
buf_size=buf_size,
is_ipc=is_ipc
) as token:
yield token
@ -173,6 +184,7 @@ async def open_ringbuf(
def open_ringbufs_sync(
shm_names: list[str],
buf_sizes: int | list[str] = _DEFAULT_RB_SIZE,
is_ipc: bool = True
) -> ContextManager[tuple[RBToken]]:
'''
Handle resources for multiple ringbufs at once.
@ -194,7 +206,11 @@ def open_ringbufs_sync(
# allocate resources
rings: list[tuple[SharedMemory, RBToken]] = [
alloc_ringbuf(shm_name, buf_size=buf_size)
alloc_ringbuf(
shm_name,
buf_size=buf_size,
is_ipc=is_ipc
)
for shm_name, buf_size in zip(shm_names, buf_size)
]
@ -204,11 +220,12 @@ def open_ringbufs_sync(
finally:
# attempt fd unshare and shm unlink for each
for shm, token in rings:
try:
unshare_fds(token.shm_name)
if is_ipc:
try:
unshare_fds(token.shm_name)
except RuntimeError:
log.exception(f'while unsharing fds of {token}')
except RuntimeError:
log.exception(f'while unsharing fds of {token}')
shm.unlink()
@ -217,6 +234,7 @@ def open_ringbufs_sync(
async def open_ringbufs(
shm_names: list[str],
buf_sizes: int | list[str] = _DEFAULT_RB_SIZE,
is_ipc: bool = True
) -> AsyncContextManager[tuple[RBToken]]:
'''
Helper to use `open_ringbufs_sync` inside an async with block.
@ -224,7 +242,8 @@ async def open_ringbufs(
'''
with open_ringbufs_sync(
shm_names,
buf_sizes=buf_sizes
buf_sizes=buf_sizes,
is_ipc=is_ipc
) as tokens:
yield tokens
@ -232,7 +251,8 @@ async def open_ringbufs(
@cm
def open_ringbuf_pair_sync(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE
buf_size: int = _DEFAULT_RB_SIZE,
is_ipc: bool = True
) -> ContextManager[tuple(RBToken, RBToken)]:
'''
Handle resources for a ringbuf pair to be used for
@ -244,7 +264,8 @@ def open_ringbuf_pair_sync(
f'{shm_name}.send',
f'{shm_name}.recv'
],
buf_sizes=buf_size
buf_sizes=buf_size,
is_ipc=is_ipc
) as tokens:
yield tokens
@ -252,7 +273,8 @@ def open_ringbuf_pair_sync(
@acm
async def open_ringbuf_pair(
shm_name: str,
buf_size: int = _DEFAULT_RB_SIZE
buf_size: int = _DEFAULT_RB_SIZE,
is_ipc: bool = True
) -> AsyncContextManager[tuple[RBToken, RBToken]]:
'''
Helper to use `open_ringbuf_pair_sync` inside an async with block.
@ -260,7 +282,8 @@ async def open_ringbuf_pair(
'''
with open_ringbuf_pair_sync(
shm_name,
buf_size=buf_size
buf_size=buf_size,
is_ipc=is_ipc
) as tokens:
yield tokens
@ -828,7 +851,8 @@ async def attach_to_ringbuf_receiver(
token: RBToken,
cleanup: bool = True,
decoder: Decoder | None = None
decoder: Decoder | None = None,
is_ipc: bool = True
) -> AsyncContextManager[RingBufferReceiveChannel]:
'''
@ -840,7 +864,8 @@ async def attach_to_ringbuf_receiver(
Launches `receiver._eof_monitor_task` in a `trio.Nursery`.
'''
token = await _maybe_obtain_shared_resources(token)
if is_ipc:
token = await _maybe_obtain_shared_resources(token)
async with (
trio.open_nursery(strict_exception_groups=False) as n,
@ -860,7 +885,8 @@ async def attach_to_ringbuf_sender(
token: RBToken,
batch_size: int = 1,
cleanup: bool = True,
encoder: Encoder | None = None
encoder: Encoder | None = None,
is_ipc: bool = True
) -> AsyncContextManager[RingBufferSendChannel]:
'''
@ -871,7 +897,8 @@ async def attach_to_ringbuf_sender(
originally allocated by a different actor.
'''
token = await _maybe_obtain_shared_resources(token)
if is_ipc:
token = await _maybe_obtain_shared_resources(token)
async with RingBufferSendChannel(
token,
@ -951,7 +978,9 @@ async def attach_to_ringbuf_channel(
cleanup_in: bool = True,
cleanup_out: bool = True,
encoder: Encoder | None = None,
decoder: Decoder | None = None
decoder: Decoder | None = None,
sender_ipc: bool = True,
receiver_ipc: bool = True
) -> AsyncContextManager[trio.StapledStream]:
'''
Attach to two previously opened `RBToken`s and return a `RingBufferChannel`
@ -961,13 +990,15 @@ async def attach_to_ringbuf_channel(
attach_to_ringbuf_receiver(
token_in,
cleanup=cleanup_in,
decoder=decoder
decoder=decoder,
is_ipc=receiver_ipc
) as receiver,
attach_to_ringbuf_sender(
token_out,
batch_size=batch_size,
cleanup=cleanup_out,
encoder=encoder
encoder=encoder,
is_ipc=sender_ipc
) as sender,
):
yield RingBufferChannel(sender, receiver)

View File

@ -590,7 +590,7 @@ def set_publisher(topic: str, pub: RingBufferPublisher):
entry.is_set.set()
def get_publisher(topic: str) -> RingBufferPublisher:
def get_publisher(topic: str = 'default') -> RingBufferPublisher:
entry = _publishers.get(topic, None)
if not entry or not entry.publisher:
raise RuntimeError(
@ -685,7 +685,7 @@ def set_subscriber(topic: str, sub: RingBufferSubscriber):
entry.is_set.set()
def get_subscriber(topic: str) -> RingBufferSubscriber:
def get_subscriber(topic: str = 'default') -> RingBufferSubscriber:
entry = _subscribers.get(topic, None)
if not entry or not entry.subscriber:
raise RuntimeError(