diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 0d43b1a..943cccb 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -174,48 +174,6 @@ async def do_hard_kill( proc.kill() -@asynccontextmanager -async def spawn_subactor( - subactor: 'Actor', - parent_addr: Tuple[str, int], -): - spawn_cmd = [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # We provide the child's unique identifier on this exec/spawn - # line for debugging purposes when viewing the process tree from - # the OS; it otherwise can be passed via the parent channel if - # we prefer in the future (for privacy). - "--uid", - str(subactor.uid), - # Address the child must connect to on startup - "--parent_addr", - str(parent_addr) - ] - - if subactor.loglevel: - spawn_cmd += [ - "--loglevel", - subactor.loglevel - ] - - proc = await trio.open_process(spawn_cmd) - try: - yield proc - - finally: - log.runtime(f"Attempting to kill {proc}") - - # XXX: do this **after** cancellation/tearfown - # to avoid killing the process too early - # since trio does this internally on ``__aexit__()`` - - await do_hard_kill(proc) - - async def new_proc( name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa @@ -228,8 +186,10 @@ async def new_proc( *, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - """Create a new ``multiprocessing.Process`` using the + """ + Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. + """ cancel_scope = None @@ -237,43 +197,66 @@ async def new_proc( subactor._spawn_method = _spawn_method if _spawn_method == 'trio': - async with trio.open_nursery() as nursery: - async with spawn_subactor( - subactor, - parent_addr, - ) as proc: - log.runtime(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) - portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( - subactor, proc, portal) + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). + "--uid", + str(subactor.uid), + # Address the child must connect to on startup + "--parent_addr", + str(parent_addr) + ] - # send additional init params - await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, - }) + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel + ] - # track subactor in current nursery - curr_actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + try: + proc = await trio.open_process(spawn_cmd) - # resume caller at next checkpoint now that child is up - task_status.started(portal) + log.runtime(f"Started {proc}") - # wait for ActorNursery.wait() to be called - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, proc, portal) + # send additional init params + await chan.send({ + "_parent_main_data": subactor._parent_main_data, + "enable_modules": subactor.enable_modules, + "_arb_addr": subactor._arb_addr, + "bind_host": bind_addr[0], + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, + }) + + # track subactor in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + + # resume caller at next checkpoint now that child is up + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: cancel_scope = await nursery.start( cancel_on_completion, @@ -285,25 +268,10 @@ async def new_proc( # Wait for proc termination but **dont' yet** call # ``trio.Process.__aexit__()`` (it tears down stdio # which will kill any waiting remote pdb trace). - - # TODO: No idea how we can enforce zombie - # reaping more stringently without the shield - # we used to have below... - - # with trio.CancelScope(shield=True): - # async with proc: - - # Always "hard" join sub procs since no actor zombies - # are allowed! - - # this is a "light" (cancellable) join, the hard join is - # in the enclosing scope (see above). + # This is a "soft" (cancellable) join/reap. await proc.wait() - log.debug(f"Joined {proc}") - # pop child entry to indicate we no longer managing this subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - + finally: # cancel result waiter that may have been spawned in # tandem if not done already if cancel_scope: @@ -311,6 +279,19 @@ async def new_proc( "Cancelling existing result waiter task for " f"{subactor.uid}") cancel_scope.cancel() + + log.runtime(f"Attempting to kill {proc}") + + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. + with trio.CancelScope(shield=True): + await do_hard_kill(proc) + + log.debug(f"Joined {proc}") + # pop child entry to indicate we no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + else: # `multiprocessing` # async with trio.open_nursery() as nursery: