Always consider the debugger when exiting contexts
When in an uncertain teardown state and in debug mode a context can be popped from actor runtime before a child finished debugging (the case when the parent is tearing down but the child hasn't closed/completed its tty lock IPC exit phase) and the child sends the "stop" message to unlock the debugger but it's ignored bc the parent has already dropped the ctx. Instead we call `._debug.maybe_wait_for_deugger()` before these context removals to avoid the root getting stuck thinking the lock was never released. Further, add special `Actor._cancel_task()` handling code inside `_invoke()` which continues to execute the method despite the IPC channel to the caller being broken and thus avoiding potential hangs due to a target (child) actor task remaining alive.signint_saviour
parent
4779badd96
commit
b21f2e16ad
|
@ -87,9 +87,10 @@ async def _invoke(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
treat_as_gen = False
|
treat_as_gen: bool = False
|
||||||
|
failed_resp: bool = False
|
||||||
|
|
||||||
# possible a traceback (not sure what typing is for this..)
|
# possibly a traceback (not sure what typing is for this..)
|
||||||
tb = None
|
tb = None
|
||||||
|
|
||||||
cancel_scope = trio.CancelScope()
|
cancel_scope = trio.CancelScope()
|
||||||
|
@ -190,7 +191,8 @@ async def _invoke(
|
||||||
ctx._scope_nursery = scope_nursery
|
ctx._scope_nursery = scope_nursery
|
||||||
cs = scope_nursery.cancel_scope
|
cs = scope_nursery.cancel_scope
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
res = await coro
|
||||||
|
await chan.send({'return': res, 'cid': cid})
|
||||||
|
|
||||||
except trio.MultiError:
|
except trio.MultiError:
|
||||||
# if a context error was set then likely
|
# if a context error was set then likely
|
||||||
|
@ -204,7 +206,12 @@ async def _invoke(
|
||||||
# XXX: only pop the context tracking if
|
# XXX: only pop the context tracking if
|
||||||
# a ``@tractor.context`` entrypoint was called
|
# a ``@tractor.context`` entrypoint was called
|
||||||
assert chan.uid
|
assert chan.uid
|
||||||
|
|
||||||
|
# don't pop the local context until we know the
|
||||||
|
# associated child isn't in debug any more
|
||||||
|
await _debug.maybe_wait_for_debugger()
|
||||||
ctx = actor._contexts.pop((chan.uid, cid))
|
ctx = actor._contexts.pop((chan.uid, cid))
|
||||||
|
|
||||||
if ctx:
|
if ctx:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Context entrypoint {func} was terminated:\n{ctx}'
|
f'Context entrypoint {func} was terminated:\n{ctx}'
|
||||||
|
@ -235,10 +242,24 @@ async def _invoke(
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# regular async function
|
# regular async function
|
||||||
await chan.send({'functype': 'asyncfunc', 'cid': cid})
|
try:
|
||||||
|
await chan.send({'functype': 'asyncfunc', 'cid': cid})
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
failed_resp = True
|
||||||
|
if is_rpc:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
f'Failed to respond to non-rpc request: {func}'
|
||||||
|
)
|
||||||
|
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
result = await coro
|
||||||
|
log.cancel(f'result: {result}')
|
||||||
|
if not failed_resp:
|
||||||
|
# only send result if we know IPC isn't down
|
||||||
|
await chan.send({'return': result, 'cid': cid})
|
||||||
|
|
||||||
except (
|
except (
|
||||||
Exception,
|
Exception,
|
||||||
|
@ -283,6 +304,7 @@ async def _invoke(
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
BrokenPipeError,
|
||||||
):
|
):
|
||||||
# if we can't propagate the error that's a big boo boo
|
# if we can't propagate the error that's a big boo boo
|
||||||
log.error(
|
log.error(
|
||||||
|
@ -933,11 +955,13 @@ class Actor:
|
||||||
chan._exc = exc
|
chan._exc = exc
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
log.runtime(
|
log.info(
|
||||||
f"Processing request from {actorid}\n"
|
f"Processing request from {actorid}\n"
|
||||||
f"{ns}.{funcname}({kwargs})")
|
f"{ns}.{funcname}({kwargs})")
|
||||||
|
|
||||||
if ns == 'self':
|
if ns == 'self':
|
||||||
func = getattr(self, funcname)
|
func = getattr(self, funcname)
|
||||||
|
|
||||||
if funcname == 'cancel':
|
if funcname == 'cancel':
|
||||||
|
|
||||||
# don't start entire actor runtime
|
# don't start entire actor runtime
|
||||||
|
@ -974,12 +998,17 @@ class Actor:
|
||||||
# ``_async_main()``
|
# ``_async_main()``
|
||||||
kwargs['chan'] = chan
|
kwargs['chan'] = chan
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'{self.uid} was remotely cancelled by\n'
|
f'Remote request to cancel task\n'
|
||||||
f'{chan.uid}!'
|
f'remote actor: {chan.uid}\n'
|
||||||
)
|
f'task: {cid}'
|
||||||
await _invoke(
|
|
||||||
self, cid, chan, func, kwargs, is_rpc=False
|
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
|
await _invoke(
|
||||||
|
self, cid, chan, func, kwargs, is_rpc=False
|
||||||
|
)
|
||||||
|
except BaseException:
|
||||||
|
log.exception("failed to cancel task?")
|
||||||
|
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
# complain to client about restricted modules
|
||||||
|
@ -1417,12 +1446,14 @@ class Actor:
|
||||||
# n.cancel_scope.cancel()
|
# n.cancel_scope.cancel()
|
||||||
|
|
||||||
async def _cancel_task(self, cid, chan):
|
async def _cancel_task(self, cid, chan):
|
||||||
"""Cancel a local task by call-id / channel.
|
'''
|
||||||
|
Cancel a local task by call-id / channel.
|
||||||
|
|
||||||
Note this method will be treated as a streaming function
|
Note this method will be treated as a streaming function
|
||||||
by remote actor-callers due to the declaration of ``ctx``
|
by remote actor-callers due to the declaration of ``ctx``
|
||||||
in the signature (for now).
|
in the signature (for now).
|
||||||
"""
|
|
||||||
|
'''
|
||||||
# right now this is only implicitly called by
|
# right now this is only implicitly called by
|
||||||
# streaming IPC but it should be called
|
# streaming IPC but it should be called
|
||||||
# to cancel any remotely spawned task
|
# to cancel any remotely spawned task
|
||||||
|
|
|
@ -542,6 +542,17 @@ class Portal:
|
||||||
f'value from callee `{result}`'
|
f'value from callee `{result}`'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||||
|
# wait for any immediate child in debug before popping the
|
||||||
|
# context from the runtime msg loop otherwise inside
|
||||||
|
# ``Actor._push_result()`` the msg will be discarded and in
|
||||||
|
# the case where that msg is global debugger unlock (via
|
||||||
|
# a "stop" msg for a stream), this can result in a deadlock
|
||||||
|
# where the root is waiting on the lock to clear but the
|
||||||
|
# child has already cleared it and clobbered IPC.
|
||||||
|
from ._debug import maybe_wait_for_debugger
|
||||||
|
await maybe_wait_for_debugger()
|
||||||
|
|
||||||
# remove the context from runtime tracking
|
# remove the context from runtime tracking
|
||||||
self.actor._contexts.pop((self.channel.uid, ctx.cid))
|
self.actor._contexts.pop((self.channel.uid, ctx.cid))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue