diff --git a/tractor/_actor.py b/tractor/_actor.py index 66b32c9..df754c3 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -650,81 +650,56 @@ class Actor: return send_chan = ctx._send_chan - assert send_chan - error = msg.get('error') - if error: - # If this is an error message from a context opened by - # ``Portal.open_context()`` we want to interrupt any ongoing - # (child) tasks within that context to be notified of the remote - # error relayed here. - # - # The reason we may want to raise the remote error immediately - # is that there is no guarantee the associated local task(s) - # will attempt to read from any locally opened stream any time - # soon. - # - # NOTE: this only applies when - # ``Portal.open_context()`` has been called since it is assumed - # (currently) that other portal APIs (``Portal.run()``, - # ``.run_in_actor()``) do their own error checking at the point - # of the call and result processing. - log.warning(f'Remote context for {chan.uid}:{cid} errored {msg}') - ctx._maybe_error_from_remote_msg(msg) + log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") + # XXX: we do **not** maintain backpressure and instead + # opt to relay stream overrun errors to the sender. try: - log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") + send_chan.send_nowait(msg) + # if an error is deteced we should always + # expect it to be raised by any context (stream) + # consumer task + await ctx._maybe_raise_from_remote_msg(msg) - # XXX: we do **not** maintain backpressure and instead - # opt to relay stream overrun errors to the sender. - try: - send_chan.send_nowait(msg) - except trio.WouldBlock: + except trio.WouldBlock: + # XXX: always push an error even if the local + # receiver is in overrun state. + await ctx._maybe_raise_from_remote_msg(msg) - # XXX: do we need this? - # if we're trying to push an error but we're in - # an overrun state we'll just get stuck sending - # the error that was sent to us back to it's sender - # instead of it actually being raises in the target - # task.. - # if error: - # raise unpack_error(msg, chan) from None + uid = chan.uid + lines = [ + 'Task context stream was overrun', + f'local task: {cid} @ {self.uid}', + f'remote sender: {uid}', + ] + if not ctx._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on `{self.uid[0]}` side! ***\n' + ) + text = '\n'.join(lines) - uid = chan.uid - - lines = [ - 'Task context stream was overrun', - f'local task: {cid} @ {self.uid}', - f'remote sender: {chan.uid}', - ] - if not ctx._stream_opened: - lines.insert( - 1, - f'\n*** No stream open on `{self.uid[0]}` side! ***\n' - ) - - text = '\n'.join(lines) + if ctx._backpressure: log.warning(text) - if ctx._backpressure: - await send_chan.send(msg) - else: + await send_chan.send(msg) + else: + try: + raise StreamOverrun(text) from None + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid try: - raise StreamOverrun(text) from None - except StreamOverrun as err: - err_msg = pack_error(err) - err_msg['cid'] = cid + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? await chan.send(err_msg) - - except trio.BrokenResourceError: - # TODO: what is the right way to handle the case where the - # local task has already sent a 'stop' / StopAsyncInteration - # to the other side but and possibly has closed the local - # feeder mem chan? Do we wait for some kind of ack or just - # let this fail silently and bubble up (currently)? - - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") def get_context( self, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 6c57065..05a3073 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -364,33 +364,55 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - def _maybe_error_from_remote_msg( + async def _maybe_raise_from_remote_msg( self, msg: Dict[str, Any], ) -> None: ''' - Unpack and raise a msg error into the local scope + (Maybe) unpack and raise a msg error into the local scope nursery for this context. Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. ''' - self._error = unpack_error(msg, self.chan) + error = msg.get('error') + if error: + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + log.error( + f'Remote context error for {self.chan.uid}:{self.cid}:\n' + f'{msg["error"]["tb_str"]}' + ) + # await ctx._maybe_error_from_remote_msg(msg) + self._error = unpack_error(msg, self.chan) - # TODO: tempted to **not** do this by-reraising in a - # nursery and instead cancel a surrounding scope, detect - # the cancellation, then lookup the error that was set? - if self._scope_nursery: + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? + if self._scope_nursery: - async def raiser(): - raise self._error from None + async def raiser(): + raise self._error from None - # from trio.testing import wait_all_tasks_blocked - # await wait_all_tasks_blocked() - if not self._scope_nursery._closed: # type: ignore - self._scope_nursery.start_soon(raiser) + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + if not self._scope_nursery._closed: # type: ignore + self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: '''