diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 1bb1b81..3dcc24b 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -51,10 +51,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, + shield: bool = False, + _broadcaster: Optional[BroadcastReceiver] = None, + ) -> None: self._ctx = ctx self._rx_chan = rx_chan - self._broadcaster: Optional[BroadcastReceiver] = None + self._broadcaster = _broadcaster # flag to denote end of stream self._eoc: bool = False @@ -234,16 +237,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - def clone(self): - """Clone this receive channel allowing for multi-task - consumption from the same channel. - - """ - return type(self)( - self._ctx, - self._rx_chan.clone(), - ) - @asynccontextmanager async def subscribe( self, @@ -266,9 +259,12 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # allocated ``BroadcastReceiver`` before calling this method for # the first time. if self._broadcaster is None: - self._broadcaster = broadcast_receiver( + + bcast = self._broadcaster = broadcast_receiver( self, + # use memory channel size by default self._rx_chan._state.max_buffer_size, # type: ignore + receive_afunc=self.receive, ) # NOTE: we override the original stream instance's receive @@ -276,15 +272,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # such that new subscribers will be copied received values # and this stream doesn't have to expect it's original # consumer(s) to get a new broadcast rx handle. - self.receive = self._broadcaster.receive # type: ignore + self.receive = bcast.receive # type: ignore # seems there's no graceful way to type this with ``mypy``? # https://github.com/python/mypy/issues/708 async with self._broadcaster.subscribe() as bstream: - # a ``MsgStream`` clone is allocated for the - # broadcaster to track this entry's subscription - stream_clone = bstream._rx - assert stream_clone is not self yield bstream