Run first soft wait inside a task

Theoretically we can actually concurrently soft wait the process and the
nursery exit event, the only problem at the moment is some pubsub tests
puking. You're right in guessing this isn't really changing anything but
it is meant to be a reminder. If we can add this the spawn task can
report when a process dies earlier then expected and in the longer term
once we remove the `ActorNursery.run_in_actor()` API, we can probably do
away with both the nursery exit event and the portal result fetching.
sigint2
Tyler Goodlet 2022-02-14 10:25:42 -05:00
parent b8117dad2a
commit f4af2b9fda
1 changed files with 50 additions and 21 deletions

View File

@ -295,7 +295,7 @@ async def new_proc(
# the OS; it otherwise can be passed via the parent channel if # the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy). # we prefer in the future (for privacy).
"--uid", "--uid",
str(subactor.uid), str(uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
str(parent_addr) str(parent_addr)
@ -321,8 +321,7 @@ async def new_proc(
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(uid)
subactor.uid)
except trio.Cancelled: except trio.Cancelled:
cancelled_during_spawn = True cancelled_during_spawn = True
@ -363,10 +362,54 @@ async def new_proc(
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called
n_exited = actor_nursery._join_procs
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait() await n_exited.wait()
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
async def soft_wait_and_maybe_cancel_ria_task():
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
if n_exited.is_set():
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}"
)
nursery.cancel_scope.cancel()
else:
log.warning(
f'Process for actor {uid} terminated before'
'nursery exit. ' 'This may mean an IPC'
'connection failed!'
)
nursery.start_soon(soft_wait_and_maybe_cancel_ria_task)
# TODO: when we finally remove the `.run_in_actor()` api
# we should be able to entirely drop these 2 blocking calls:
# - we don't need to wait on nursery exit to capture
# process-spawn-machinery level errors (and propagate them).
# - we don't need to wait on final results from ria portals
# since this will be done in some higher level wrapper API.
# XXX: interestingly we can't put this here bc it causes
# the pub-sub tests to fail? wth.. should probably drop
# those XD
# wait for ActorNursery.wait() to be called
# with trio.CancelScope(shield=True):
# await n_exited.wait()
if portal in actor_nursery._cancel_after_result_on_exit: if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon( nursery.start_soon(
cancel_on_completion, cancel_on_completion,
@ -375,22 +418,6 @@ async def new_proc(
errors errors
) )
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally: finally:
# The "hard" reap since no actor zombies are allowed! # The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid # XXX: do this **after** cancellation/tearfown to avoid
@ -407,8 +434,10 @@ async def new_proc(
await proc.wait() await proc.wait()
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get('_debug_mode', False), child_in_debug=_runtime_vars.get(
'_debug_mode', False)
) )
if proc.poll() is None: if proc.poll() is None: