From c322a193f2609f75be3db3f623be8a5f099f256e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Apr 2022 17:07:06 -0400 Subject: [PATCH] Make `LinkedTaskChannel` trio-task-broadcastable with `.subscribe()` --- tractor/to_asyncio.py | 46 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index ca15b00..ab4ff34 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -38,6 +38,10 @@ from outcome import Error from .log import get_logger from ._state import current_actor from ._exceptions import AsyncioCancelled +from .trionics._broadcast import ( + broadcast_receiver, + BroadcastReceiver, +) log = get_logger(__name__) @@ -63,6 +67,7 @@ class LinkedTaskChannel(trio.abc.Channel): # set after ``asyncio.create_task()`` _aio_task: Optional[asyncio.Task] = None _aio_err: Optional[BaseException] = None + _broadcaster: Optional[BroadcastReceiver] = None async def aclose(self) -> None: await self._from_aio.aclose() @@ -79,7 +84,7 @@ class LinkedTaskChannel(trio.abc.Channel): return await self._from_aio.receive() - async def wait_ayncio_complete(self) -> None: + async def wait_asyncio_complete(self) -> None: await self._aio_task_complete.wait() # def cancel_asyncio_task(self) -> None: @@ -94,6 +99,43 @@ class LinkedTaskChannel(trio.abc.Channel): ''' self._to_aio.put_nowait(item) + def closed(self) -> bool: + return self._from_aio._closed + + # TODO: shoud we consider some kind of "decorator" system + # that checks for structural-typing compatibliity and then + # automatically adds this ctx-mngr-as-method machinery? + @acm + async def subscribe( + self, + + ) -> AsyncIterator[BroadcastReceiver]: + ''' + Allocate and return a ``BroadcastReceiver`` which delegates + to this inter-task channel. + + This allows multiple local tasks to receive each their own copy + of this message stream. + + See ``tractor._streaming.MsgStream.subscribe()`` for further + similar details. + ''' + if self._broadcaster is None: + + bcast = self._broadcaster = broadcast_receiver( + self, + # use memory channel size by default + self._from_aio._state.max_buffer_size, # type: ignore + receive_afunc=self.receive, + ) + + self.receive = bcast.receive # type: ignore + + async with self._broadcaster.subscribe() as bstream: + assert bstream.key != self._broadcaster.key + assert bstream._recv == self._broadcaster._recv + yield bstream + def _run_asyncio_task( @@ -334,7 +376,6 @@ async def translate_aio_errors( maybe_raise_aio_err() - async def run_task( func: Callable, *, @@ -425,7 +466,6 @@ def run_as_asyncio_guest( trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): - actor = current_actor() if isinstance(main_outcome, Error): error = main_outcome.error