From 3ba01e7e40390bc59154cdb8b22cb35a78cce066 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Aug 2021 12:36:05 -0400 Subject: [PATCH] Fix `.receive()` re-assignment, drop `.clone()` --- tractor/_streaming.py | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 2956451..14067c6 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -49,10 +49,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, + shield: bool = False, + _broadcaster: Optional[BroadcastReceiver] = None, + ) -> None: self._ctx = ctx self._rx_chan = rx_chan - self._broadcaster: Optional[BroadcastReceiver] = None + self._broadcaster = _broadcaster # flag to denote end of stream self._eoc: bool = False @@ -233,16 +236,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - def clone(self): - """Clone this receive channel allowing for multi-task - consumption from the same channel. - - """ - return type(self)( - self._ctx, - self._rx_chan.clone(), - ) - @asynccontextmanager async def subscribe( self, @@ -265,9 +258,12 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # allocated ``BroadcastReceiver`` before calling this method for # the first time. if self._broadcaster is None: - self._broadcaster = broadcast_receiver( + + bcast = self._broadcaster = broadcast_receiver( self, + # use memory channel size by default self._rx_chan._state.max_buffer_size, # type: ignore + receive_afunc=self.receive, ) # NOTE: we override the original stream instance's receive @@ -275,15 +271,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # such that new subscribers will be copied received values # and this stream doesn't have to expect it's original # consumer(s) to get a new broadcast rx handle. - self.receive = self._broadcaster.receive # type: ignore + self.receive = bcast.receive # type: ignore # seems there's no graceful way to type this with ``mypy``? # https://github.com/python/mypy/issues/708 async with self._broadcaster.subscribe() as bstream: - # a ``MsgStream`` clone is allocated for the - # broadcaster to track this entry's subscription - stream_clone = bstream._rx - assert stream_clone is not self yield bstream