diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a3e3194e..1faadd73 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -34,9 +34,9 @@ from typing import ( import trio from trio import TaskStatus -from .devx.debug import ( - maybe_wait_for_debugger, - acquire_debug_lock, +from .devx import ( + debug, + pformat as _pformat ) from tractor._state import ( current_actor, @@ -51,14 +51,17 @@ from tractor._portal import Portal from tractor._runtime import Actor from tractor._entry import _mp_main from tractor._exceptions import ActorFailure -from tractor.msg.types import ( - Aid, - SpawnSpec, +from tractor.msg import ( + types as msgtypes, + pretty_struct, ) if TYPE_CHECKING: - from ipc import IPCServer + from ipc import ( + _server, + Channel, + ) from ._supervise import ActorNursery ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) @@ -328,20 +331,21 @@ async def soft_kill( see `.hard_kill()`). ''' - peer_aid: Aid = portal.channel.aid + chan: Channel = portal.channel + peer_aid: msgtypes.Aid = chan.aid try: log.cancel( f'Soft killing sub-actor via portal request\n' f'\n' - f'(c=> {peer_aid}\n' - f' |_{proc}\n' + f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n' + f' |_{proc}\n' ) # wait on sub-proc to signal termination await wait_func(proc) except trio.Cancelled: with trio.CancelScope(shield=True): - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=_runtime_vars.get( '_debug_mode', False ), @@ -465,7 +469,7 @@ async def trio_proc( "--uid", # TODO, how to pass this over "wire" encodings like # cmdline args? - # -[ ] maybe we can add an `Aid.min_tuple()` ? + # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? str(subactor.uid), # Address the child must connect to on startup "--parent_addr", @@ -483,13 +487,14 @@ async def trio_proc( cancelled_during_spawn: bool = False proc: trio.Process|None = None - ipc_server: IPCServer = actor_nursery._actor.ipc_server + ipc_server: _server.Server = actor_nursery._actor.ipc_server try: try: proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) log.runtime( - 'Started new child\n' - f'|_{proc}\n' + f'Started new child subproc\n' + f'(>\n' + f' |_{proc}\n' ) # wait for actor to spawn and connect back to us @@ -507,10 +512,10 @@ async def trio_proc( with trio.CancelScope(shield=True): # don't clobber an ongoing pdb if is_root_process(): - await maybe_wait_for_debugger() + await debug.maybe_wait_for_debugger() elif proc is not None: - async with acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock(subactor.uid): # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() @@ -528,14 +533,19 @@ async def trio_proc( # send a "spawning specification" which configures the # initial runtime state of the child. - sspec = SpawnSpec( + sspec = msgtypes.SpawnSpec( _parent_main_data=subactor._parent_main_data, enable_modules=subactor.enable_modules, reg_addrs=subactor.reg_addrs, bind_addrs=bind_addrs, _runtime_vars=_runtime_vars, ) - log.runtime(f'Sending spawn spec: {str(sspec)}') + log.runtime( + f'Sending spawn spec to child\n' + f'{{}}=> {chan.aid.reprol()!r}\n' + f'\n' + f'{pretty_struct.pformat(sspec)}\n' + ) await chan.send(sspec) # track subactor in current nursery @@ -563,7 +573,7 @@ async def trio_proc( # condition. await soft_kill( proc, - trio.Process.wait, + trio.Process.wait, # XXX, uses `pidfd_open()` below. portal ) @@ -571,8 +581,7 @@ async def trio_proc( # tandem if not done already log.cancel( 'Cancelling portal result reaper task\n' - f'>c)\n' - f' |_{subactor.uid}\n' + f'c)> {subactor.aid.reprol()!r}\n' ) nursery.cancel_scope.cancel() @@ -581,21 +590,24 @@ async def trio_proc( # allowed! Do this **after** cancellation/teardown to avoid # killing the process too early. if proc: + reap_repr: str = _pformat.nest_from_op( + input_op='>x)', + text=subactor.pformat(), + ) log.cancel( f'Hard reap sequence starting for subactor\n' - f'>x)\n' - f' |_{subactor}@{subactor.uid}\n' + f'{reap_repr}' ) with trio.CancelScope(shield=True): # don't clobber an ongoing pdb if cancelled_during_spawn: # Try again to avoid TTY clobbering. - async with acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock(subactor.uid): with trio.move_on_after(0.5): await proc.wait() - await maybe_wait_for_debugger( + await debug.maybe_wait_for_debugger( child_in_debug=_runtime_vars.get( '_debug_mode', False ), @@ -624,7 +636,7 @@ async def trio_proc( # acquire the lock and get notified of who has it, # check that uid against our known children? # this_uid: tuple[str, str] = current_actor().uid - # await acquire_debug_lock(this_uid) + # await debug.acquire_debug_lock(this_uid) if proc.poll() is None: log.cancel(f"Attempting to hard kill {proc}") @@ -727,7 +739,7 @@ async def mp_proc( log.runtime(f"Started {proc}") - ipc_server: IPCServer = actor_nursery._actor.ipc_server + ipc_server: _server.Server = actor_nursery._actor.ipc_server try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the