diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3d7e6b1..b3657b7 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -295,7 +295,7 @@ async def new_proc( # the OS; it otherwise can be passed via the parent channel if # we prefer in the future (for privacy). "--uid", - str(subactor.uid), + str(uid), # Address the child must connect to on startup "--parent_addr", str(parent_addr) @@ -321,8 +321,7 @@ async def new_proc( # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) + event, chan = await actor_nursery._actor.wait_for_peer(uid) except trio.Cancelled: cancelled_during_spawn = True @@ -363,10 +362,54 @@ async def new_proc( task_status.started(portal) # wait for ActorNursery.wait() to be called + n_exited = actor_nursery._join_procs with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() + await n_exited.wait() async with trio.open_nursery() as nursery: + + async def soft_wait_and_maybe_cancel_ria_task(): + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + trio.Process.wait, + portal + ) + + if n_exited.is_set(): + # cancel result waiter that may have been spawned in + # tandem if not done already + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}" + ) + nursery.cancel_scope.cancel() + + else: + log.warning( + f'Process for actor {uid} terminated before' + 'nursery exit. ' 'This may mean an IPC' + 'connection failed!' + ) + + nursery.start_soon(soft_wait_and_maybe_cancel_ria_task) + + # TODO: when we finally remove the `.run_in_actor()` api + # we should be able to entirely drop these 2 blocking calls: + # - we don't need to wait on nursery exit to capture + # process-spawn-machinery level errors (and propagate them). + # - we don't need to wait on final results from ria portals + # since this will be done in some higher level wrapper API. + + # XXX: interestingly we can't put this here bc it causes + # the pub-sub tests to fail? wth.. should probably drop + # those XD + # wait for ActorNursery.wait() to be called + # with trio.CancelScope(shield=True): + # await n_exited.wait() + if portal in actor_nursery._cancel_after_result_on_exit: nursery.start_soon( cancel_on_completion, @@ -375,22 +418,6 @@ async def new_proc( errors ) - # This is a "soft" (cancellable) join/reap which - # will remote cancel the actor on a ``trio.Cancelled`` - # condition. - await soft_wait( - proc, - trio.Process.wait, - portal - ) - - # cancel result waiter that may have been spawned in - # tandem if not done already - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - nursery.cancel_scope.cancel() - finally: # The "hard" reap since no actor zombies are allowed! # XXX: do this **after** cancellation/tearfown to avoid @@ -407,8 +434,10 @@ async def new_proc( await proc.wait() if is_root_process(): + await maybe_wait_for_debugger( - child_in_debug=_runtime_vars.get('_debug_mode', False), + child_in_debug=_runtime_vars.get( + '_debug_mode', False) ) if proc.poll() is None: