From f5513ba0059ef4a7504ff18ec0c36b6411366d76 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 13 Apr 2025 13:36:02 -0300 Subject: [PATCH] Adapt ringbuf pubsub to new RBToken owner system --- tests/test_ring_pubsub.py | 182 +++++++++++++++ tests/test_ringd.py | 272 ---------------------- tractor/ipc/_ringbuf/_pubsub.py | 330 ++++++++++++-------------- tractor/ipc/_ringbuf/_ringd.py | 401 -------------------------------- 4 files changed, 336 insertions(+), 849 deletions(-) create mode 100644 tests/test_ring_pubsub.py delete mode 100644 tests/test_ringd.py delete mode 100644 tractor/ipc/_ringbuf/_ringd.py diff --git a/tests/test_ring_pubsub.py b/tests/test_ring_pubsub.py new file mode 100644 index 00000000..3bdbeb0a --- /dev/null +++ b/tests/test_ring_pubsub.py @@ -0,0 +1,182 @@ +from typing import AsyncContextManager +from contextlib import asynccontextmanager as acm + +import trio +import pytest +import tractor + +from tractor.trionics import gather_contexts + +from tractor.ipc._ringbuf import open_ringbufs +from tractor.ipc._ringbuf._pubsub import ( + open_ringbuf_publisher, + open_ringbuf_subscriber, + get_publisher, + get_subscriber, + open_pub_channel_at, + open_sub_channel_at +) + + +log = tractor.log.get_console_log(level='info') + + +@tractor.context +async def publish_range( + ctx: tractor.Context, + size: int +): + pub = get_publisher() + await ctx.started() + for i in range(size): + await pub.send(i.to_bytes(4)) + log.info(f'sent {i}') + + await pub.flush() + + log.info('range done') + + +@tractor.context +async def subscribe_range( + ctx: tractor.Context, + size: int +): + sub = get_subscriber() + await ctx.started() + + for i in range(size): + recv = int.from_bytes(await sub.receive()) + if recv != i: + raise AssertionError( + f'received: {recv} expected: {i}' + ) + + log.info(f'received: {recv}') + + log.info('range done') + + +@tractor.context +async def subscriber_child(ctx: tractor.Context): + try: + async with open_ringbuf_subscriber(guarantee_order=True): + await ctx.started() + await trio.sleep_forever() + + finally: + log.info('subscriber exit') + + +@tractor.context +async def publisher_child( + ctx: tractor.Context, + batch_size: int +): + try: + async with open_ringbuf_publisher( + guarantee_order=True, + batch_size=batch_size + ): + await ctx.started() + await trio.sleep_forever() + + finally: + log.info('publisher exit') + + +@acm +async def open_pubsub_test_actors( + + ring_names: list[str], + size: int, + batch_size: int + +) -> AsyncContextManager[tuple[tractor.Portal, tractor.Portal]]: + + with trio.fail_after(5): + async with tractor.open_nursery( + enable_modules=[ + 'tractor.linux._fdshare' + ] + ) as an: + modules = [ + __name__, + 'tractor.linux._fdshare', + 'tractor.ipc._ringbuf._pubsub' + ] + sub_portal = await an.start_actor( + 'sub', + enable_modules=modules + ) + pub_portal = await an.start_actor( + 'pub', + enable_modules=modules + ) + + async with ( + sub_portal.open_context(subscriber_child) as (long_rctx, _), + pub_portal.open_context( + publisher_child, + batch_size=batch_size + ) as (long_sctx, _), + ): + with open_ringbufs(ring_names) as tokens: + async with ( + gather_contexts([ + open_sub_channel_at('sub', ring) + for ring in tokens + ]), + gather_contexts([ + open_pub_channel_at('pub', ring) + for ring in tokens + ]), + sub_portal.open_context(subscribe_range, size=size) as (rctx, _), + pub_portal.open_context(publish_range, size=size) as (sctx, _) + ): + yield + + await long_sctx.cancel() + await long_rctx.cancel() + + await an.cancel() + + +@pytest.mark.parametrize( + ('ring_names', 'size', 'batch_size'), + [ + ( + ['ring-first'], + 100, + 1 + ), + ( + ['ring-first'], + 69, + 1 + ), + ( + [f'multi-ring-{i}' for i in range(3)], + 1000, + 100 + ), + ], + ids=[ + 'simple', + 'redo-simple', + 'multi-ring', + ] +) +def test_pubsub( + request, + ring_names: list[str], + size: int, + batch_size: int +): + async def main(): + async with open_pubsub_test_actors( + ring_names, size, batch_size + ): + ... + + trio.run(main) diff --git a/tests/test_ringd.py b/tests/test_ringd.py deleted file mode 100644 index e08b7c1c..00000000 --- a/tests/test_ringd.py +++ /dev/null @@ -1,272 +0,0 @@ -import trio -import tractor -import msgspec - -from tractor.ipc import ( - attach_to_ringbuf_receiver, - attach_to_ringbuf_sender -) -from tractor.ipc._ringbuf._pubsub import ( - open_ringbuf_publisher, - open_ringbuf_subscriber -) - -import tractor.ipc._ringbuf._ringd as ringd - - -log = tractor.log.get_console_log(level='info') - - -@tractor.context -async def recv_child( - ctx: tractor.Context, - ring_name: str -): - async with ( - ringd.open_ringbuf(ring_name) as token, - - attach_to_ringbuf_receiver(token) as chan, - ): - await ctx.started() - async for msg in chan: - log.info(f'received {int.from_bytes(msg)}') - - -@tractor.context -async def send_child( - ctx: tractor.Context, - ring_name: str -): - async with ( - ringd.attach_ringbuf(ring_name) as token, - - attach_to_ringbuf_sender(token) as chan, - ): - await ctx.started() - for i in range(100): - await chan.send(i.to_bytes(4)) - log.info(f'sent {i}') - - - -def test_ringd(): - ''' - Spawn ringd actor and two childs that access same ringbuf through ringd. - - Both will use `ringd.open_ringbuf` to allocate the ringbuf, then attach to - them as sender and receiver. - - ''' - async def main(): - async with ( - tractor.open_nursery() as an, - - ringd.open_ringd() - ): - recv_portal = await an.start_actor( - 'recv', - enable_modules=[__name__] - ) - send_portal = await an.start_actor( - 'send', - enable_modules=[__name__] - ) - - async with ( - recv_portal.open_context( - recv_child, - ring_name='ring' - ) as (rctx, _), - - send_portal.open_context( - send_child, - ring_name='ring' - ) as (sctx, _), - ): - ... - - await an.cancel() - - trio.run(main) - - -class Struct(msgspec.Struct): - - def encode(self) -> bytes: - return msgspec.msgpack.encode(self) - - -class AddChannelMsg(Struct, frozen=True, tag=True): - name: str - - -class RemoveChannelMsg(Struct, frozen=True, tag=True): - name: str - - -class RangeMsg(Struct, frozen=True, tag=True): - size: int - - -ControlMessages = AddChannelMsg | RemoveChannelMsg | RangeMsg - - -@tractor.context -async def subscriber_child(ctx: tractor.Context): - await ctx.started() - async with ( - open_ringbuf_subscriber(guarantee_order=True) as subs, - trio.open_nursery() as n, - ctx.open_stream() as stream - ): - range_msg = None - range_event = trio.Event() - range_scope = trio.CancelScope() - - async def _control_listen_task(): - nonlocal range_msg, range_event - async for msg in stream: - msg = msgspec.msgpack.decode(msg, type=ControlMessages) - match msg: - case AddChannelMsg(): - await subs.add_channel(msg.name) - - case RemoveChannelMsg(): - subs.remove_channel(msg.name) - - case RangeMsg(): - range_msg = msg - range_event.set() - - await stream.send(b'ack') - - range_scope.cancel() - - n.start_soon(_control_listen_task) - - with range_scope: - while True: - await range_event.wait() - range_event = trio.Event() - for i in range(range_msg.size): - recv = int.from_bytes(await subs.receive()) - if recv != i: - raise AssertionError( - f'received: {recv} expected: {i}' - ) - - log.info(f'received: {recv}') - - await stream.send(b'valid range') - log.info('finished range') - - log.info('subscriber exit') - - -@tractor.context -async def publisher_child(ctx: tractor.Context): - await ctx.started() - async with ( - open_ringbuf_publisher(guarantee_order=True) as pub, - ctx.open_stream() as stream - ): - async for msg in stream: - msg = msgspec.msgpack.decode(msg, type=ControlMessages) - match msg: - case AddChannelMsg(): - await pub.add_channel(msg.name, must_exist=True) - - case RemoveChannelMsg(): - pub.remove_channel(msg.name) - - case RangeMsg(): - for i in range(msg.size): - await pub.send(i.to_bytes(4)) - log.info(f'sent {i}') - - await stream.send(b'ack') - - log.info('publisher exit') - - - -def test_pubsub(): - ''' - Spawn to childs a publisher and a subscriber, use context streams - to dynamically test different scenarios with different channel - configurations between them. - - ''' - async def main(): - async with ( - tractor.open_nursery( - loglevel='info', - # debug_mode=True, - # enable_stack_on_sig=True - ) as an, - - ringd.open_ringd() - ): - recv_portal = await an.start_actor( - 'recv', - enable_modules=[__name__] - ) - send_portal = await an.start_actor( - 'send', - enable_modules=[__name__] - ) - - async with ( - recv_portal.open_context(subscriber_child) as (rctx, _), - rctx.open_stream() as recv_stream, - send_portal.open_context(publisher_child) as (sctx, _), - sctx.open_stream() as send_stream, - ): - async def send_wait_ack(msg: bytes): - await recv_stream.send(msg) - ack = await recv_stream.receive() - assert ack == b'ack' - - await send_stream.send(msg) - ack = await send_stream.receive() - assert ack == b'ack' - - async def add_channel(name: str): - await send_wait_ack(AddChannelMsg(name=name).encode()) - - async def remove_channel(name: str): - await send_wait_ack(RemoveChannelMsg(name=name).encode()) - - async def send_range(size: int): - await send_wait_ack(RangeMsg(size=size).encode()) - range_ack = await recv_stream.receive() - assert range_ack == b'valid range' - - # simple test, open one channel and send 0..100 range - ring_name = 'ring-first' - await add_channel(ring_name) - await send_range(100) - await remove_channel(ring_name) - - # redo - ring_name = 'ring-redo' - await add_channel(ring_name) - await send_range(100) - await remove_channel(ring_name) - - # multi chan test - ring_names = [] - for i in range(3): - ring_names.append(f'multi-ring-{i}') - - for name in ring_names: - await add_channel(name) - - await send_range(1000) - - for name in ring_names: - await remove_channel(name) - - await an.cancel() - - trio.run(main) diff --git a/tractor/ipc/_ringbuf/_pubsub.py b/tractor/ipc/_ringbuf/_pubsub.py index e2575ab6..40adf611 100644 --- a/tractor/ipc/_ringbuf/_pubsub.py +++ b/tractor/ipc/_ringbuf/_pubsub.py @@ -31,7 +31,8 @@ from dataclasses import dataclass import trio import tractor -from tractor.ipc import ( +from tractor.ipc._ringbuf import ( + RBToken, RingBufferSendChannel, RingBufferReceiveChannel, attach_to_ringbuf_sender, @@ -42,7 +43,8 @@ from tractor.trionics import ( order_send_channel, order_receive_channel ) -import tractor.ipc._ringbuf._ringd as ringd + +import tractor.linux._fdshare as fdshare log = tractor.log.get_logger(__name__) @@ -53,9 +55,10 @@ ChannelType = TypeVar('ChannelType') @dataclass class ChannelInfo: - name: str + token: RBToken channel: ChannelType cancel_scope: trio.CancelScope + teardown: trio.Event class ChannelManager(Generic[ChannelType]): @@ -88,8 +91,6 @@ class ChannelManager(Generic[ChannelType]): self._is_closed: bool = True - self._teardown = trio.Event() - @property def closed(self) -> bool: return self._is_closed @@ -100,9 +101,9 @@ class ChannelManager(Generic[ChannelType]): async def _channel_handler_task( self, - name: str, - must_exist: bool = False, + token: RBToken, task_status=trio.TASK_STATUS_IGNORED, + **kwargs ): ''' Open channel resources, add to internal data structures, signal channel @@ -114,12 +115,16 @@ class ChannelManager(Generic[ChannelType]): kwargs are proxied to `self._open_channel` acm. ''' - async with self._open_channel(name, must_exist=must_exist) as chan: + async with self._open_channel( + token, + **kwargs + ) as chan: cancel_scope = trio.CancelScope() info = ChannelInfo( - name=name, + token=token, channel=chan, - cancel_scope=cancel_scope + cancel_scope=cancel_scope, + teardown=trio.Event() ) self._channels.append(info) @@ -131,10 +136,7 @@ class ChannelManager(Generic[ChannelType]): with cancel_scope: await self._channel_task(info) - self._maybe_destroy_channel(name) - - if len(self) == 0: - self._teardown.set() + self._maybe_destroy_channel(token.shm_name) def _find_channel(self, name: str) -> tuple[int, ChannelInfo] | None: ''' @@ -145,7 +147,7 @@ class ChannelManager(Generic[ChannelType]): ''' for entry in enumerate(self._channels): i, info = entry - if info.name == name: + if info.token.shm_name == name: return entry return None @@ -161,9 +163,14 @@ class ChannelManager(Generic[ChannelType]): if maybe_entry: i, info = maybe_entry info.cancel_scope.cancel() + info.teardown.set() del self._channels[i] - async def add_channel(self, name: str, must_exist: bool = False): + async def add_channel( + self, + token: RBToken, + **kwargs + ): ''' Add a new channel to be handled @@ -173,11 +180,11 @@ class ChannelManager(Generic[ChannelType]): await self._n.start(partial( self._channel_handler_task, - name, - must_exist=must_exist + RBToken.from_msg(token), + **kwargs )) - def remove_channel(self, name: str): + async def remove_channel(self, name: str): ''' Remove a channel and stop its handling @@ -185,8 +192,18 @@ class ChannelManager(Generic[ChannelType]): if self.closed: raise trio.ClosedResourceError + maybe_entry = self._find_channel(name) + if not maybe_entry: + # return + raise RuntimeError( + f'tried to remove channel {name} but if does not exist' + ) + + i, info = maybe_entry self._maybe_destroy_channel(name) + await info.teardown.wait() + # if that was last channel reset connect event if len(self) == 0: self._connect_event = trio.Event() @@ -225,15 +242,7 @@ class ChannelManager(Generic[ChannelType]): if info.channel.closed: continue - self.remove_channel(info.name) - - try: - await self._teardown.wait() - - except trio.Cancelled: - # log.exception('close was cancelled') - raise - + await self.remove_channel(info.name) self._is_closed = True @@ -257,16 +266,12 @@ class RingBufferPublisher(trio.abc.SendChannel[bytes]): self, n: trio.Nursery, - # new ringbufs created will have this buf_size - buf_size: int = 10 * 1024, - # amount of msgs to each ring before switching turns msgs_per_turn: int = 1, # global batch size for all channels batch_size: int = 1 ): - self._buf_size = buf_size self._batch_size: int = batch_size self.msgs_per_turn = msgs_per_turn @@ -331,63 +336,32 @@ class RingBufferPublisher(trio.abc.SendChannel[bytes]): async def add_channel( self, - name: str, - must_exist: bool = False + token: RBToken, ): - await self._chanmngr.add_channel(name, must_exist=must_exist) + await self._chanmngr.add_channel(token) - def remove_channel(self, name: str): - self._chanmngr.remove_channel(name) + async def remove_channel(self, name: str): + await self._chanmngr.remove_channel(name) @acm async def _open_channel( self, - name: str, - must_exist: bool = False + token: RBToken ) -> AsyncContextManager[RingBufferSendChannel]: - ''' - Open a ringbuf through `ringd` and attach as send side - ''' - if must_exist: - ringd_fn = ringd.attach_ringbuf - kwargs = {} - - else: - ringd_fn = ringd.open_ringbuf - kwargs = {'buf_size': self._buf_size} - - async with ( - ringd_fn( - name=name, - **kwargs - ) as token, - - attach_to_ringbuf_sender( - token, - batch_size=self._batch_size - ) as ring, - ): + async with attach_to_ringbuf_sender( + token, + batch_size=self._batch_size + ) as ring: yield ring - # try: - # # ensure all messages are sent - # await ring.flush() - - # except Exception as e: - # e.add_note(f'while closing ringbuf send channel {name}') - # log.exception(e) async def _channel_task(self, info: ChannelInfo) -> None: ''' Wait forever until channel cancellation ''' - try: - await trio.sleep_forever() - - except trio.Cancelled: - ... + await trio.sleep_forever() async def send(self, msg: bytes): ''' @@ -441,8 +415,7 @@ class RingBufferPublisher(trio.abc.SendChannel[bytes]): log.warning('tried to close RingBufferPublisher but its already closed...') return - with trio.CancelScope(shield=True): - await self._chanmngr.close() + await self._chanmngr.close() self._is_closed = True @@ -467,15 +440,10 @@ class RingBufferSubscriber(trio.abc.ReceiveChannel[bytes]): self, n: trio.Nursery, - # new ringbufs created will have this buf_size - buf_size: int = 10 * 1024, - # if connecting to a publisher that has already sent messages set # to the next expected payload index this subscriber will receive start_index: int = 0 ): - self._buf_size = buf_size - self._chanmngr = ChannelManager[RingBufferReceiveChannel]( n, self._open_channel, @@ -499,40 +467,24 @@ class RingBufferSubscriber(trio.abc.ReceiveChannel[bytes]): def get_channel(self, name: str): return self._chanmngr[name] - async def add_channel(self, name: str, must_exist: bool = False): - await self._chanmngr.add_channel(name, must_exist=must_exist) + async def add_channel( + self, + token: RBToken + ): + await self._chanmngr.add_channel(token) - def remove_channel(self, name: str): - self._chanmngr.remove_channel(name) + async def remove_channel(self, name: str): + await self._chanmngr.remove_channel(name) @acm async def _open_channel( self, - name: str, - must_exist: bool = False + token: RBToken - ) -> AsyncContextManager[RingBufferReceiveChannel]: - ''' - Open a ringbuf through `ringd` and attach as receiver side - ''' - if must_exist: - ringd_fn = ringd.attach_ringbuf - kwargs = {} - - else: - ringd_fn = ringd.open_ringbuf - kwargs = {'buf_size': self._buf_size} - - async with ( - ringd_fn( - name=name, - **kwargs - ) as token, - - attach_to_ringbuf_receiver(token) as chan - ): - yield chan + ) -> AsyncContextManager[RingBufferSendChannel]: + async with attach_to_ringbuf_receiver(token) as ring: + yield ring async def _channel_task(self, info: ChannelInfo) -> None: ''' @@ -582,6 +534,7 @@ class RingBufferSubscriber(trio.abc.ReceiveChannel[bytes]): await self._chanmngr.close() await self._schan.aclose() await self._rchan.aclose() + self._is_closed = True @@ -641,89 +594,128 @@ def get_subscriber() -> RingBufferSubscriber: @tractor.context -async def open_pub_channel( +async def _add_pub_channel( ctx: tractor.Context, - ring_name: str, - must_exist: bool = False + token: RBToken ): publisher = get_publisher() - await publisher.add_channel( - ring_name, - must_exist=must_exist - ) - await ctx.started() + await publisher.add_channel(token) - try: - await trio.sleep_forever() - finally: - try: - publisher.remove_channel(ring_name) - - except trio.ClosedResourceError: - ... +@tractor.context +async def _remove_pub_channel( + ctx: tractor.Context, + ring_name: str +): + publisher = get_publisher() + await ctx.started() + maybe_token = fdshare.maybe_get_fds(ring_name) + if maybe_token: + await publisher.remove_channel(ring_name) @acm async def open_pub_channel_at( actor_name: str, - ring_name: str, - must_exist: bool = False + token: RBToken, + cleanup: bool = True, ): async with ( tractor.find_actor(actor_name) as portal, + portal.open_context( - open_pub_channel, - ring_name=ring_name, - must_exist=must_exist + _add_pub_channel, + token=token ) as (ctx, _) ): + ... + + try: yield - await ctx.cancel() + + except trio.Cancelled: + log.exception( + 'open_pub_channel_at got cancelled!\n' + f'\tactor_name = {actor_name}\n' + f'\ttoken = {token}\n' + ) + raise + + finally: + if not cleanup: + return + + async with tractor.find_actor(actor_name) as portal: + if portal: + async with portal.open_context( + _remove_pub_channel, + ring_name=token.shm_name + ) as (ctx, _): + ... @tractor.context -async def open_sub_channel( +async def _add_sub_channel( ctx: tractor.Context, - ring_name: str, - must_exist: bool = False + token: RBToken ): subscriber = get_subscriber() - await subscriber.add_channel( - ring_name, - must_exist=must_exist - ) - await ctx.started() + await subscriber.add_channel(token) - try: - await trio.sleep_forever() - finally: - try: - subscriber.remove_channel(ring_name) - - except trio.ClosedResourceError: - ... +@tractor.context +async def _remove_sub_channel( + ctx: tractor.Context, + ring_name: str +): + subscriber = get_subscriber() + await ctx.started() + maybe_token = fdshare.maybe_get_fds(ring_name) + if maybe_token: + await subscriber.remove_channel(ring_name) @acm async def open_sub_channel_at( actor_name: str, - ring_name: str, - must_exist: bool = False + token: RBToken, + cleanup: bool = True, ): async with ( tractor.find_actor(actor_name) as portal, + portal.open_context( - open_sub_channel, - ring_name=ring_name, - must_exist=must_exist + _add_sub_channel, + token=token ) as (ctx, _) ): + ... + + try: yield - await ctx.cancel() + + except trio.Cancelled: + log.exception( + 'open_sub_channel_at got cancelled!\n' + f'\tactor_name = {actor_name}\n' + f'\ttoken = {token}\n' + ) + raise + + finally: + if not cleanup: + return + + async with tractor.find_actor(actor_name) as portal: + if portal: + async with portal.open_context( + _remove_sub_channel, + ring_name=token.shm_name + ) as (ctx, _): + ... + ''' @@ -733,9 +725,6 @@ High level helpers to open publisher & subscriber @acm async def open_ringbuf_publisher( - # buf size for created rings - buf_size: int = 10 * 1024, - # global batch size for channels batch_size: int = 1, @@ -747,9 +736,6 @@ async def open_ringbuf_publisher( # index guarantee_order: bool = False, - # explicit nursery cancel call on cleanup - force_cancel: bool = False, - # on creation, set the `_publisher` global in order to use the provided # tractor.context & helper utils for adding and removing new channels from # remote actors @@ -764,7 +750,6 @@ async def open_ringbuf_publisher( trio.open_nursery(strict_exception_groups=False) as n, RingBufferPublisher( n, - buf_size=buf_size, batch_size=batch_size ) as publisher ): @@ -777,23 +762,17 @@ async def open_ringbuf_publisher( try: yield publisher - finally: - if force_cancel: - # implicitly cancel any running channel handler task - n.cancel_scope.cancel() + except trio.Cancelled: + with trio.CancelScope(shield=True): + await publisher.aclose() + raise @acm async def open_ringbuf_subscriber( - # buf size for created rings - buf_size: int = 10 * 1024, - # expect indexed payloads and unwrap them in order guarantee_order: bool = False, - # explicit nursery cancel call on cleanup - force_cancel: bool = False, - # on creation, set the `_subscriber` global in order to use the provided # tractor.context & helper utils for adding and removing new channels from # remote actors @@ -805,10 +784,7 @@ async def open_ringbuf_subscriber( ''' async with ( trio.open_nursery(strict_exception_groups=False) as n, - RingBufferSubscriber( - n, - buf_size=buf_size - ) as subscriber + RingBufferSubscriber(n) as subscriber ): # maybe monkey patch `.receive` to use indexed payloads if guarantee_order: @@ -819,8 +795,10 @@ async def open_ringbuf_subscriber( global _subscriber set_subscriber(subscriber) - yield subscriber + try: + yield subscriber - if force_cancel: - # implicitly cancel any running channel handler task - n.cancel_scope.cancel() + except trio.Cancelled: + with trio.CancelScope(shield=True): + await subscriber.aclose() + raise diff --git a/tractor/ipc/_ringbuf/_ringd.py b/tractor/ipc/_ringbuf/_ringd.py deleted file mode 100644 index 51818e34..00000000 --- a/tractor/ipc/_ringbuf/_ringd.py +++ /dev/null @@ -1,401 +0,0 @@ -# tractor: structured concurrent "actors". -# Copyright 2018-eternity Tyler Goodlet. - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -''' -Actor to broker ringbuf resources, creates and allocates -the resources, then automatically does fd passing. - -call open_ringd in your root actor - -then on actors that need a ringbuf resource use - -open_ringbuf acm, will automatically contact ringd. -''' -import os -import tempfile -from typing import AsyncContextManager -from pathlib import Path -from contextlib import ( - asynccontextmanager as acm -) -from dataclasses import dataclass - -import trio -import tractor -from tractor.linux import ( - send_fds, - recv_fds, -) - -import tractor.ipc._ringbuf as ringbuf -from tractor.ipc._ringbuf import RBToken - - -log = tractor.log.get_logger(__name__) - - -''' -Daemon implementation - -''' - - -_ringd_actor_name: str = 'ringd' - - -_root_name: str = f'{_ringd_actor_name}-{os.getpid()}' - - -def _make_ring_name(name: str) -> str: - ''' - User provided ring names will be prefixed by the ringd actor name and pid. - ''' - return f'{_root_name}.{name}' - - -@dataclass -class RingInfo: - token: RBToken - creator: str - - -_rings: dict[str, RingInfo] = {} -_ring_lock = trio.StrictFIFOLock() - - -def _maybe_get_ring(name: str) -> RingInfo | None: - ''' - Maybe return RingInfo for a given name str - - ''' - # if full name was passed, strip root name - if _root_name in name: - name = name.replace(f'{_root_name}.', '') - - return _rings.get(name, None) - - -def _get_ring(name: str) -> RingInfo: - ''' - Return a RingInfo for a given name or raise - ''' - info = _maybe_get_ring(name) - - if not info: - raise RuntimeError(f'Ring \"{name}\" not found!') - - return info - - -def _insert_ring(name: str, info: RingInfo): - ''' - Add a new ring - ''' - if name in _rings: - raise RuntimeError(f'A ring with name {name} already exists!') - - _rings[name] = info - - -def _destroy_ring(name: str): - ''' - Delete information about a ring - ''' - if name not in _rings: - raise RuntimeError(f'Tried to delete non existant {name} ring!') - - del _rings[name] - - -@tractor.context -async def _pass_fds( - ctx: tractor.Context, - name: str, - sock_path: str -): - ''' - Ringd endpoint to request passing fds of a ring. - - Supports passing fullname or not (ringd actor name and pid before ring - name). - - See `_attach_to_ring` function for usage. - ''' - async with _ring_lock: - # get ring fds or raise error - token = _get_ring(name).token - - # start fd passing context using socket on `sock_path` - async with send_fds(token.fds, sock_path): - log.info(f'connected to {sock_path} for fd passing') - # use started to signal socket is ready and send token in order for - # client to get extra info like buf_size - await ctx.started(token) - # send_fds will block until receive side acks - - log.info(f'ring {name} fds: {token.fds}, sent') - - -@tractor.context -async def _open_ringbuf( - ctx: tractor.Context, - caller: str, - name: str, - buf_size: int = 10 * 1024 -): - ''' - Ringd endpoint to create and allocate resources for a new ring. - - ''' - await _ring_lock.acquire() - maybe_info = _maybe_get_ring(name) - - if maybe_info: - raise RuntimeError( - f'Tried to create ringbuf but it already exists: {name}' - ) - - fullname = _make_ring_name(name) - - with ringbuf.open_ringbuf( - fullname, - buf_size=buf_size - ) as token: - - _insert_ring( - name, - RingInfo( - token=token, - creator=caller, - ) - ) - - _ring_lock.release() - - # yield full ring name to rebuild token after fd passing - await ctx.started(fullname) - - # await ctx cancel to remove ring from tracking and cleanup - try: - log.info(f'ring {name} created by {caller}') - await trio.sleep_forever() - - finally: - _destroy_ring(name) - - log.info(f'ring {name} destroyed by {caller}') - - -@tractor.context -async def _attach_ringbuf( - ctx: tractor.Context, - caller: str, - name: str -) -> str: - ''' - Ringd endpoint to "attach" to an existing ring, this just ensures ring - actually exists and returns its full name. - ''' - async with _ring_lock: - info = _maybe_get_ring(name) - - if not info: - raise RuntimeError( - f'{caller} tried to open_ringbuf but it doesn\'t exist: {name}' - ) - - await ctx.started() - - # return full ring name to rebuild token after fd passing - return info.token.shm_name - - -@tractor.context -async def _maybe_open_ringbuf( - ctx: tractor.Context, - caller: str, - name: str, - buf_size: int = 10 * 1024, -): - ''' - If ring already exists attach, if not create it. - ''' - maybe_info = _maybe_get_ring(name) - - if maybe_info: - return await _attach_ringbuf(ctx, caller, name) - - return await _open_ringbuf(ctx, caller, name, buf_size=buf_size) - - -''' -Ringd client side helpers - -''' - - -@acm -async def open_ringd(**kwargs) -> tractor.Portal: - ''' - Spawn new ringd actor. - - ''' - async with tractor.open_nursery(**kwargs) as an: - portal = await an.start_actor( - _ringd_actor_name, - enable_modules=[__name__] - ) - yield portal - await an.cancel() - - -@acm -async def wait_for_ringd() -> tractor.Portal: - ''' - Wait for ringd actor to be up. - - ''' - async with tractor.wait_for_actor( - _ringd_actor_name - ) as portal: - yield portal - - -async def _request_ring_fds( - fullname: str -) -> RBToken: - ''' - Private helper to fetch ring fds from ringd actor. - ''' - actor = tractor.current_actor() - - fd_amount = 3 - sock_path = str( - Path(tempfile.gettempdir()) - / - f'{fullname}-to-{actor.name}.sock' - ) - - log.info(f'trying to attach to {fullname}...') - - async with ( - tractor.find_actor(_ringd_actor_name) as ringd, - - ringd.open_context( - _pass_fds, - name=fullname, - sock_path=sock_path - ) as (ctx, token), - ): - fds = await recv_fds(sock_path, fd_amount) - write, wrap, eof = fds - log.info( - f'received fds, write: {write}, wrap: {wrap}, eof: {eof}' - ) - - token = RBToken.from_msg(token) - - return RBToken( - shm_name=fullname, - write_eventfd=write, - wrap_eventfd=wrap, - eof_eventfd=eof, - buf_size=token.buf_size - ) - - - -@acm -async def open_ringbuf( - name: str, - buf_size: int = 10 * 1024, -) -> AsyncContextManager[RBToken]: - ''' - Create a new ring and retrieve its fds. - - ''' - actor = tractor.current_actor() - async with ( - wait_for_ringd() as ringd, - - ringd.open_context( - _open_ringbuf, - caller=actor.name, - name=name, - buf_size=buf_size, - ) as (ctx, fullname), - ): - token = await _request_ring_fds(fullname) - log.info(f'{actor.name} opened {token}') - try: - yield token - - finally: - with trio.CancelScope(shield=True): - await ctx.cancel() - - -@acm -async def attach_ringbuf( - name: str, -) -> AsyncContextManager[RBToken]: - ''' - Attach to an existing ring and retreive its fds. - - ''' - actor = tractor.current_actor() - async with ( - wait_for_ringd() as ringd, - - ringd.open_context( - _attach_ringbuf, - caller=actor.name, - name=name, - ) as (ctx, _), - ): - fullname = await ctx.wait_for_result() - token = await _request_ring_fds(fullname) - log.info(f'{actor.name} attached {token}') - yield token - - -@acm -async def maybe_open_ringbuf( - name: str, - buf_size: int = 10 * 1024, -) -> AsyncContextManager[RBToken]: - ''' - Attach or create a ring and retreive its fds. - - ''' - actor = tractor.current_actor() - async with ( - wait_for_ringd() as ringd, - - ringd.open_context( - _maybe_open_ringbuf, - caller=actor.name, - name=name, - buf_size=buf_size, - ) as (ctx, fullname), - ): - token = await _request_ring_fds(fullname) - log.info(f'{actor.name} opened {token}') - try: - yield token - - finally: - with trio.CancelScope(shield=True): - await ctx.cancel()