diff --git a/tractor/_actor.py b/tractor/_actor.py index 0c6c83e..efc9703 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -87,9 +87,10 @@ async def _invoke( ''' __tracebackhide__ = True - treat_as_gen = False + treat_as_gen: bool = False + failed_resp: bool = False - # possible a traceback (not sure what typing is for this..) + # possibly a traceback (not sure what typing is for this..) tb = None cancel_scope = trio.CancelScope() @@ -190,7 +191,8 @@ async def _invoke( ctx._scope_nursery = scope_nursery cs = scope_nursery.cancel_scope task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + res = await coro + await chan.send({'return': res, 'cid': cid}) except trio.MultiError: # if a context error was set then likely @@ -204,7 +206,12 @@ async def _invoke( # XXX: only pop the context tracking if # a ``@tractor.context`` entrypoint was called assert chan.uid + + # don't pop the local context until we know the + # associated child isn't in debug any more + await _debug.maybe_wait_for_debugger() ctx = actor._contexts.pop((chan.uid, cid)) + if ctx: log.runtime( f'Context entrypoint {func} was terminated:\n{ctx}' @@ -235,10 +242,24 @@ async def _invoke( else: # regular async function - await chan.send({'functype': 'asyncfunc', 'cid': cid}) + try: + await chan.send({'functype': 'asyncfunc', 'cid': cid}) + except trio.BrokenResourceError: + failed_resp = True + if is_rpc: + raise + else: + log.warning( + f'Failed to respond to non-rpc request: {func}' + ) + with cancel_scope as cs: task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + result = await coro + log.cancel(f'result: {result}') + if not failed_resp: + # only send result if we know IPC isn't down + await chan.send({'return': result, 'cid': cid}) except ( Exception, @@ -283,6 +304,7 @@ async def _invoke( except ( trio.ClosedResourceError, trio.BrokenResourceError, + BrokenPipeError, ): # if we can't propagate the error that's a big boo boo log.error( @@ -933,11 +955,13 @@ class Actor: chan._exc = exc raise exc - log.runtime( + log.info( f"Processing request from {actorid}\n" f"{ns}.{funcname}({kwargs})") + if ns == 'self': func = getattr(self, funcname) + if funcname == 'cancel': # don't start entire actor runtime @@ -974,12 +998,17 @@ class Actor: # ``_async_main()`` kwargs['chan'] = chan log.cancel( - f'{self.uid} was remotely cancelled by\n' - f'{chan.uid}!' - ) - await _invoke( - self, cid, chan, func, kwargs, is_rpc=False + f'Remote request to cancel task\n' + f'remote actor: {chan.uid}\n' + f'task: {cid}' ) + try: + await _invoke( + self, cid, chan, func, kwargs, is_rpc=False + ) + except BaseException: + log.exception("failed to cancel task?") + continue else: # complain to client about restricted modules @@ -1417,12 +1446,14 @@ class Actor: # n.cancel_scope.cancel() async def _cancel_task(self, cid, chan): - """Cancel a local task by call-id / channel. + ''' + Cancel a local task by call-id / channel. Note this method will be treated as a streaming function by remote actor-callers due to the declaration of ``ctx`` in the signature (for now). - """ + + ''' # right now this is only implicitly called by # streaming IPC but it should be called # to cancel any remotely spawned task diff --git a/tractor/_portal.py b/tractor/_portal.py index 9492f0d..c7c8700 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -542,6 +542,17 @@ class Portal: f'value from callee `{result}`' ) + # XXX: (MEGA IMPORTANT) if this is a root opened process we + # wait for any immediate child in debug before popping the + # context from the runtime msg loop otherwise inside + # ``Actor._push_result()`` the msg will be discarded and in + # the case where that msg is global debugger unlock (via + # a "stop" msg for a stream), this can result in a deadlock + # where the root is waiting on the lock to clear but the + # child has already cleared it and clobbered IPC. + from ._debug import maybe_wait_for_debugger + await maybe_wait_for_debugger() + # remove the context from runtime tracking self.actor._contexts.pop((self.channel.uid, ctx.cid))