diff --git a/tractor/_actor.py b/tractor/_actor.py index 1f7d9f5..3a843e1 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -171,19 +171,17 @@ async def _invoke( except trio.Cancelled as err: tb = err.__traceback__ - if ctx._error is not None: - tb = ctx._error.__traceback__ - raise ctx._error - except trio.MultiError as err: + except trio.MultiError: + # if a context error was set then likely + # thei multierror was raised due to that if ctx._error is not None: - tb = ctx._error.__traceback__ - raise ctx._error from err - else: - raise + raise ctx._error from None + + raise assert cs - if cs.cancelled_caught or ctx._error: + if cs.cancelled_caught: # TODO: pack in ``trio.Cancelled.__traceback__`` here # so they can be unwrapped and displayed on the caller @@ -257,6 +255,11 @@ async def _invoke( task_status.started(err) finally: + assert chan.uid + ctx = actor._contexts.pop((chan.uid, cid)) + if ctx: + log.cancel(f'{ctx} was terminated') + # RPC task bookeeping try: scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) @@ -594,6 +597,10 @@ class Actor: log.runtime(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) + # for (uid, cid) in self._contexts.copy(): + # if chan.uid == uid: + # self._contexts.pop((uid, cid)) + log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected @@ -629,12 +636,20 @@ class Actor: Push an RPC result to the local consumer's queue. ''' - assert chan.uid, f"`chan.uid` can't be {chan.uid}" - ctx = self._contexts[(chan.uid, cid)] + uid = chan.uid + assert uid, f"`chan.uid` can't be {uid}" + try: + ctx = self._contexts[(uid, cid)] + except KeyError: + log.warning( + f'Ignoring {msg} for unknwon context with {uid}') + return + send_chan = ctx._send_chan assert send_chan - if msg.get('error'): + error = msg.get('error') + if error: # If this is an error message from a context opened by # ``Portal.open_context()`` we want to interrupt any ongoing # (child) tasks within that context to be notified of the remote @@ -650,7 +665,7 @@ class Actor: # (currently) that other portal APIs (``Portal.run()``, # ``.run_in_actor()``) do their own error checking at the point # of the call and result processing. - log.warning(f'Remote context for {chan.uid}:{cid} errored') + log.warning(f'Remote context for {chan.uid}:{cid} errored {msg}') ctx._maybe_error_from_remote_msg(msg) try: @@ -661,14 +676,36 @@ class Actor: try: send_chan.send_nowait(msg) except trio.WouldBlock: - log.warning(f'Caller task {cid} was overrun!?') + + # XXX: do we need this? + # if we're trying to push an error but we're in + # an overrun state we'll just get stuck sending + # the error that was sent to us back to it's sender + # instead of it actually being raises in the target + # task.. + # if error: + # raise unpack_error(msg, chan) from None + + uid = chan.uid + + lines = [ + 'Task context stream was overrun', + f'local task: {cid} @ {self.uid}', + f'remote sender: {chan.uid}', + ] + if not ctx._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on {self.uid[0]} side! ***\n' + ) + + text = '\n'.join(lines) + log.warning(text) if ctx._backpressure: await send_chan.send(msg) else: try: - raise StreamOverrun( - f'Context stream {cid} for {chan.uid} was overrun!' - ) + raise StreamOverrun(text) from None except StreamOverrun as err: err_msg = pack_error(err) err_msg['cid'] = cid