diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9d832b2..2477531 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -51,7 +51,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, - shield: bool = False, _broadcaster: Optional[BroadcastReceiver] = None, ) -> None: @@ -295,6 +294,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): '''Send a message over this stream to the far end. ''' + # if self._eoc: + # raise trio.ClosedResourceError('This stream is already ded') + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -365,7 +367,7 @@ class Context: ''' side = 'caller' if self._portal else 'callee' - log.warning(f'Cancelling {side} side of context to {self.chan}') + log.warning(f'Cancelling {side} side of context to {self.chan.uid}') self._cancel_called = True @@ -396,6 +398,10 @@ class Context: log.warning( "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") + else: + log.warning( + "Timed out on cancelling remote task " + f"{cid} for {self._portal.channel.uid}") else: # callee side remote task @@ -439,16 +445,6 @@ class Context: # here we create a mem chan that corresponds to the # far end caller / callee. - # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.send_cmd()` so if you try - # to send a stop from the caller to the callee in the - # single-direction-stream case you'll get a lookup error - # currently. - _, recv_chan = actor.get_memchans( - self.chan.uid, - self.cid - ) - # Likewise if the surrounding context has been cancelled we error here # since it likely means the surrounding block was exited or # killed @@ -459,6 +455,16 @@ class Context: f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + # NOTE: in one way streaming this only happens on the + # caller side inside `Actor.send_cmd()` so if you try + # to send a stop from the caller to the callee in the + # single-direction-stream case you'll get a lookup error + # currently. + _, recv_chan = actor.get_memchans( + self.chan.uid, + self.cid + ) + # XXX: If the underlying channel feeder receive mem chan has # been closed then likely client code has already exited # a ``.open_stream()`` block prior or there was some other @@ -482,12 +488,6 @@ class Context: # await trio.lowlevel.checkpoint() yield rchan - except trio.EndOfChannel: - # likely the far end sent us a 'stop' message to - # terminate the stream. - raise - - else: # XXX: Make the stream "one-shot use". On exit, signal # ``trio.EndOfChannel``/``StopAsyncIteration`` to the # far end.