forked from goodboy/tractor
1
0
Fork 0

Fix `.receive()` re-assignment, drop `.clone()`

live_on_air_from_tokio
Tyler Goodlet 2021-08-19 12:36:05 -04:00
parent 236ed0b0dd
commit a4cb0ef21f
1 changed files with 9 additions and 17 deletions

View File

@ -51,10 +51,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self, self,
ctx: 'Context', # typing: ignore # noqa ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel, rx_chan: trio.MemoryReceiveChannel,
shield: bool = False,
_broadcaster: Optional[BroadcastReceiver] = None,
) -> None: ) -> None:
self._ctx = ctx self._ctx = ctx
self._rx_chan = rx_chan self._rx_chan = rx_chan
self._broadcaster: Optional[BroadcastReceiver] = None self._broadcaster = _broadcaster
# flag to denote end of stream # flag to denote end of stream
self._eoc: bool = False self._eoc: bool = False
@ -234,16 +237,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# still need to consume msgs that are "in transit" from the far # still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``). # end (eg. for ``Context.result()``).
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return type(self)(
self._ctx,
self._rx_chan.clone(),
)
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(
self, self,
@ -266,9 +259,12 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# allocated ``BroadcastReceiver`` before calling this method for # allocated ``BroadcastReceiver`` before calling this method for
# the first time. # the first time.
if self._broadcaster is None: if self._broadcaster is None:
self._broadcaster = broadcast_receiver(
bcast = self._broadcaster = broadcast_receiver(
self, self,
# use memory channel size by default
self._rx_chan._state.max_buffer_size, # type: ignore self._rx_chan._state.max_buffer_size, # type: ignore
receive_afunc=self.receive,
) )
# NOTE: we override the original stream instance's receive # NOTE: we override the original stream instance's receive
@ -276,15 +272,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# such that new subscribers will be copied received values # such that new subscribers will be copied received values
# and this stream doesn't have to expect it's original # and this stream doesn't have to expect it's original
# consumer(s) to get a new broadcast rx handle. # consumer(s) to get a new broadcast rx handle.
self.receive = self._broadcaster.receive # type: ignore self.receive = bcast.receive # type: ignore
# seems there's no graceful way to type this with ``mypy``? # seems there's no graceful way to type this with ``mypy``?
# https://github.com/python/mypy/issues/708 # https://github.com/python/mypy/issues/708
async with self._broadcaster.subscribe() as bstream: async with self._broadcaster.subscribe() as bstream:
# a ``MsgStream`` clone is allocated for the
# broadcaster to track this entry's subscription
stream_clone = bstream._rx
assert stream_clone is not self
yield bstream yield bstream