diff --git a/tractor/_streaming.py b/tractor/_streaming.py index d41729e..6e08272 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -2,6 +2,7 @@ Message stream types and APIs. """ +from __future__ import annotations import inspect from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass @@ -17,6 +18,7 @@ import trio from ._ipc import Channel from ._exceptions import unpack_error, ContextCancelled from ._state import current_actor +from ._broadcast import broadcast_receiver, BroadcastReceiver from .log import get_logger @@ -49,6 +51,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): ) -> None: self._ctx = ctx self._rx_chan = rx_chan + self._broadcaster: Optional[BroadcastReceiver] = None # flag to denote end of stream self._eoc: bool = False @@ -229,6 +232,56 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). + def clone(self): + """Clone this receive channel allowing for multi-task + consumption from the same channel. + + """ + return type(self)( + self._ctx, + self._rx_chan.clone(), + ) + + @asynccontextmanager + async def subscribe( + self, + + ) -> BroadcastReceiver: + '''Allocate and return a ``BroadcastReceiver`` which delegates + to this message stream. + + This allows multiple local tasks to receive each their own copy + of this message stream. + + This operation is indempotent and and mutates this stream's + receive machinery to copy and window-length-store each received + value from the far end via the internally created broudcast + receiver wrapper. + + ''' + if self._broadcaster is None: + self._broadcaster = broadcast_receiver( + self, + self._rx_chan._state.max_buffer_size, + ) + # override the original stream instance's receive to + # delegate to the broadcaster receive such that + # new subscribers will be copied received values + # XXX: this operation is indempotent and non-reversible, + # so be sure you can deal with any (theoretical) overhead + # of the the ``BroadcastReceiver`` before calling + # this method for the first time. + + # XXX: why does this work without a recursion issue?! + self.receive = self._broadcaster.receive + + async with self._broadcaster.subscribe() as bstream: + # a ``MsgStream`` clone is allocated for the + # broadcaster to track this entry's subscription + stream_clone = bstream._rx + assert stream_clone is not self + yield bstream + class MsgStream(ReceiveMsgStream, trio.abc.Channel): """ @@ -245,17 +298,6 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) - # TODO: but make it broadcasting to consumers - def clone(self): - """Clone this receive channel allowing for multi-task - consumption from the same channel. - - """ - return MsgStream( - self._ctx, - self._rx_chan.clone(), - ) - @dataclass class Context: