forked from goodboy/tractor
1
0
Fork 0

Fix `.receive()` re-assignment, drop `.clone()`

tokio_backup
Tyler Goodlet 2021-08-19 12:36:05 -04:00
parent 843a713f5a
commit 3ba01e7e40
1 changed files with 9 additions and 17 deletions

View File

@ -49,10 +49,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self,
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel,
shield: bool = False,
_broadcaster: Optional[BroadcastReceiver] = None,
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._broadcaster: Optional[BroadcastReceiver] = None
self._broadcaster = _broadcaster
# flag to denote end of stream
self._eoc: bool = False
@ -233,16 +236,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``).
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return type(self)(
self._ctx,
self._rx_chan.clone(),
)
@asynccontextmanager
async def subscribe(
self,
@ -265,9 +258,12 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# allocated ``BroadcastReceiver`` before calling this method for
# the first time.
if self._broadcaster is None:
self._broadcaster = broadcast_receiver(
bcast = self._broadcaster = broadcast_receiver(
self,
# use memory channel size by default
self._rx_chan._state.max_buffer_size, # type: ignore
receive_afunc=self.receive,
)
# NOTE: we override the original stream instance's receive
@ -275,15 +271,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# such that new subscribers will be copied received values
# and this stream doesn't have to expect it's original
# consumer(s) to get a new broadcast rx handle.
self.receive = self._broadcaster.receive # type: ignore
self.receive = bcast.receive # type: ignore
# seems there's no graceful way to type this with ``mypy``?
# https://github.com/python/mypy/issues/708
async with self._broadcaster.subscribe() as bstream:
# a ``MsgStream`` clone is allocated for the
# broadcaster to track this entry's subscription
stream_clone = bstream._rx
assert stream_clone is not self
yield bstream