From 053078ce8fd52284d767b200e77bb2bd23b5c812 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 22 Apr 2025 05:38:32 -0300 Subject: [PATCH] Fix rb non ipc case and tests in general --- tests/test_ring_pubsub.py | 31 ++++++------ tests/test_ringbuf.py | 16 +++--- tractor/ipc/_ringbuf/__init__.py | 85 ++++++++++++++++++++++---------- tractor/ipc/_ringbuf/_pubsub.py | 4 +- 4 files changed, 86 insertions(+), 50 deletions(-) diff --git a/tests/test_ring_pubsub.py b/tests/test_ring_pubsub.py index 3bdbeb0a..b3b0dade 100644 --- a/tests/test_ring_pubsub.py +++ b/tests/test_ring_pubsub.py @@ -120,21 +120,24 @@ async def open_pubsub_test_actors( publisher_child, batch_size=batch_size ) as (long_sctx, _), + + open_ringbufs(ring_names) as tokens, + + gather_contexts([ + open_sub_channel_at('sub', ring) + for ring in tokens + ]), + gather_contexts([ + open_pub_channel_at('pub', ring) + for ring in tokens + ]), + sub_portal.open_context(subscribe_range, size=size) as (rctx, _), + pub_portal.open_context(publish_range, size=size) as (sctx, _) ): - with open_ringbufs(ring_names) as tokens: - async with ( - gather_contexts([ - open_sub_channel_at('sub', ring) - for ring in tokens - ]), - gather_contexts([ - open_pub_channel_at('pub', ring) - for ring in tokens - ]), - sub_portal.open_context(subscribe_range, size=size) as (rctx, _), - pub_portal.open_context(publish_range, size=size) as (sctx, _) - ): - yield + yield + + await rctx.wait_for_result() + await sctx.wait_for_result() await long_sctx.cancel() await long_rctx.cancel() diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py index 3e6a5734..1f6a1927 100644 --- a/tests/test_ringbuf.py +++ b/tests/test_ringbuf.py @@ -18,9 +18,6 @@ from tractor._testing.samples import ( RandomBytesGenerator ) -# in case you don't want to melt your cores, uncomment dis! -pytestmark = pytest.mark.skip - @tractor.context async def child_read_shm( @@ -273,19 +270,24 @@ def test_receiver_max_bytes(): msg = generate_single_byte_msgs(100) msgs = [] + rb_common = { + 'cleanup': False, + 'is_ipc': False + } + async def main(): async with ( - tractor.open_nursery(), open_ringbuf( 'test_ringbuf_max_bytes', - buf_size=10 + buf_size=10, + is_ipc=False ) as token, trio.open_nursery() as n, - attach_to_ringbuf_sender(token, cleanup=False) as sender, + attach_to_ringbuf_sender(token, **rb_common) as sender, - attach_to_ringbuf_receiver(token, cleanup=False) as receiver + attach_to_ringbuf_receiver(token, **rb_common) as receiver ): async def _send_and_close(): await sender.send_all(msg) diff --git a/tractor/ipc/_ringbuf/__init__.py b/tractor/ipc/_ringbuf/__init__.py index f9a88813..12943707 100644 --- a/tractor/ipc/_ringbuf/__init__.py +++ b/tractor/ipc/_ringbuf/__init__.py @@ -72,7 +72,7 @@ class RBToken(Struct, frozen=True): even in the case that ringbuf was not allocated by current actor. ''' - owner: str # if owner != `current_actor().name` we must use fdshare + owner: str | None # if owner != `current_actor().name` we must use fdshare shm_name: str @@ -104,6 +104,7 @@ class RBToken(Struct, frozen=True): def alloc_ringbuf( shm_name: str, buf_size: int = _DEFAULT_RB_SIZE, + is_ipc: bool = True ) -> tuple[SharedMemory, RBToken]: ''' Allocate OS resources for a ringbuf. @@ -114,18 +115,21 @@ def alloc_ringbuf( create=True ) token = RBToken( - owner=current_actor().name, + owner=current_actor().name if is_ipc else None, shm_name=shm_name, write_eventfd=open_eventfd(), wrap_eventfd=open_eventfd(), eof_eventfd=open_eventfd(), buf_size=buf_size ) - # register fds for sharing - share_fds( - shm_name, - token.fds, - ) + + if is_ipc: + # register fds for sharing + share_fds( + shm_name, + token.fds, + ) + return shm, token @@ -133,6 +137,7 @@ def alloc_ringbuf( def open_ringbuf_sync( shm_name: str, buf_size: int = _DEFAULT_RB_SIZE, + is_ipc: bool = True ) -> ContextManager[RBToken]: ''' Handle resources for a ringbuf (shm, eventfd), yield `RBToken` to @@ -143,11 +148,15 @@ def open_ringbuf_sync( shm: SharedMemory | None = None token: RBToken | None = None try: - shm, token = alloc_ringbuf(shm_name, buf_size=buf_size) + shm, token = alloc_ringbuf( + shm_name, + buf_size=buf_size, + is_ipc=is_ipc + ) yield token finally: - if token: + if token and is_ipc: unshare_fds(shm_name) if shm: @@ -157,6 +166,7 @@ def open_ringbuf_sync( async def open_ringbuf( shm_name: str, buf_size: int = _DEFAULT_RB_SIZE, + is_ipc: bool = True ) -> AsyncContextManager[RBToken]: ''' Helper to use `open_ringbuf_sync` inside an async with block. @@ -164,7 +174,8 @@ async def open_ringbuf( ''' with open_ringbuf_sync( shm_name, - buf_size=buf_size + buf_size=buf_size, + is_ipc=is_ipc ) as token: yield token @@ -173,6 +184,7 @@ async def open_ringbuf( def open_ringbufs_sync( shm_names: list[str], buf_sizes: int | list[str] = _DEFAULT_RB_SIZE, + is_ipc: bool = True ) -> ContextManager[tuple[RBToken]]: ''' Handle resources for multiple ringbufs at once. @@ -194,7 +206,11 @@ def open_ringbufs_sync( # allocate resources rings: list[tuple[SharedMemory, RBToken]] = [ - alloc_ringbuf(shm_name, buf_size=buf_size) + alloc_ringbuf( + shm_name, + buf_size=buf_size, + is_ipc=is_ipc + ) for shm_name, buf_size in zip(shm_names, buf_size) ] @@ -204,11 +220,12 @@ def open_ringbufs_sync( finally: # attempt fd unshare and shm unlink for each for shm, token in rings: - try: - unshare_fds(token.shm_name) + if is_ipc: + try: + unshare_fds(token.shm_name) - except RuntimeError: - log.exception(f'while unsharing fds of {token}') + except RuntimeError: + log.exception(f'while unsharing fds of {token}') shm.unlink() @@ -217,6 +234,7 @@ def open_ringbufs_sync( async def open_ringbufs( shm_names: list[str], buf_sizes: int | list[str] = _DEFAULT_RB_SIZE, + is_ipc: bool = True ) -> AsyncContextManager[tuple[RBToken]]: ''' Helper to use `open_ringbufs_sync` inside an async with block. @@ -224,7 +242,8 @@ async def open_ringbufs( ''' with open_ringbufs_sync( shm_names, - buf_sizes=buf_sizes + buf_sizes=buf_sizes, + is_ipc=is_ipc ) as tokens: yield tokens @@ -232,7 +251,8 @@ async def open_ringbufs( @cm def open_ringbuf_pair_sync( shm_name: str, - buf_size: int = _DEFAULT_RB_SIZE + buf_size: int = _DEFAULT_RB_SIZE, + is_ipc: bool = True ) -> ContextManager[tuple(RBToken, RBToken)]: ''' Handle resources for a ringbuf pair to be used for @@ -244,7 +264,8 @@ def open_ringbuf_pair_sync( f'{shm_name}.send', f'{shm_name}.recv' ], - buf_sizes=buf_size + buf_sizes=buf_size, + is_ipc=is_ipc ) as tokens: yield tokens @@ -252,7 +273,8 @@ def open_ringbuf_pair_sync( @acm async def open_ringbuf_pair( shm_name: str, - buf_size: int = _DEFAULT_RB_SIZE + buf_size: int = _DEFAULT_RB_SIZE, + is_ipc: bool = True ) -> AsyncContextManager[tuple[RBToken, RBToken]]: ''' Helper to use `open_ringbuf_pair_sync` inside an async with block. @@ -260,7 +282,8 @@ async def open_ringbuf_pair( ''' with open_ringbuf_pair_sync( shm_name, - buf_size=buf_size + buf_size=buf_size, + is_ipc=is_ipc ) as tokens: yield tokens @@ -828,7 +851,8 @@ async def attach_to_ringbuf_receiver( token: RBToken, cleanup: bool = True, - decoder: Decoder | None = None + decoder: Decoder | None = None, + is_ipc: bool = True ) -> AsyncContextManager[RingBufferReceiveChannel]: ''' @@ -840,7 +864,8 @@ async def attach_to_ringbuf_receiver( Launches `receiver._eof_monitor_task` in a `trio.Nursery`. ''' - token = await _maybe_obtain_shared_resources(token) + if is_ipc: + token = await _maybe_obtain_shared_resources(token) async with ( trio.open_nursery(strict_exception_groups=False) as n, @@ -860,7 +885,8 @@ async def attach_to_ringbuf_sender( token: RBToken, batch_size: int = 1, cleanup: bool = True, - encoder: Encoder | None = None + encoder: Encoder | None = None, + is_ipc: bool = True ) -> AsyncContextManager[RingBufferSendChannel]: ''' @@ -871,7 +897,8 @@ async def attach_to_ringbuf_sender( originally allocated by a different actor. ''' - token = await _maybe_obtain_shared_resources(token) + if is_ipc: + token = await _maybe_obtain_shared_resources(token) async with RingBufferSendChannel( token, @@ -951,7 +978,9 @@ async def attach_to_ringbuf_channel( cleanup_in: bool = True, cleanup_out: bool = True, encoder: Encoder | None = None, - decoder: Decoder | None = None + decoder: Decoder | None = None, + sender_ipc: bool = True, + receiver_ipc: bool = True ) -> AsyncContextManager[trio.StapledStream]: ''' Attach to two previously opened `RBToken`s and return a `RingBufferChannel` @@ -961,13 +990,15 @@ async def attach_to_ringbuf_channel( attach_to_ringbuf_receiver( token_in, cleanup=cleanup_in, - decoder=decoder + decoder=decoder, + is_ipc=receiver_ipc ) as receiver, attach_to_ringbuf_sender( token_out, batch_size=batch_size, cleanup=cleanup_out, - encoder=encoder + encoder=encoder, + is_ipc=sender_ipc ) as sender, ): yield RingBufferChannel(sender, receiver) diff --git a/tractor/ipc/_ringbuf/_pubsub.py b/tractor/ipc/_ringbuf/_pubsub.py index 7de1d9b2..c85de9ca 100644 --- a/tractor/ipc/_ringbuf/_pubsub.py +++ b/tractor/ipc/_ringbuf/_pubsub.py @@ -590,7 +590,7 @@ def set_publisher(topic: str, pub: RingBufferPublisher): entry.is_set.set() -def get_publisher(topic: str) -> RingBufferPublisher: +def get_publisher(topic: str = 'default') -> RingBufferPublisher: entry = _publishers.get(topic, None) if not entry or not entry.publisher: raise RuntimeError( @@ -685,7 +685,7 @@ def set_subscriber(topic: str, sub: RingBufferSubscriber): entry.is_set.set() -def get_subscriber(topic: str) -> RingBufferSubscriber: +def get_subscriber(topic: str = 'default') -> RingBufferSubscriber: entry = _subscribers.get(topic, None) if not entry or not entry.subscriber: raise RuntimeError(