Update buncha log msg fmting in `._spawn`
Again using `Channel.aid.reprol()`, `.devx.pformat.nest_from_op()` and converting to multi-line code style an ' for str-report-contents. Tweak some imports to sub-mod level as well.enable_tpts
							parent
							
								
									ee9fa2e91d
								
							
						
					
					
						commit
						57e25411ee
					
				| 
						 | 
				
			
			@ -34,9 +34,9 @@ from typing import (
 | 
			
		|||
import trio
 | 
			
		||||
from trio import TaskStatus
 | 
			
		||||
 | 
			
		||||
from .devx.debug import (
 | 
			
		||||
    maybe_wait_for_debugger,
 | 
			
		||||
    acquire_debug_lock,
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    pformat as _pformat
 | 
			
		||||
)
 | 
			
		||||
from tractor._state import (
 | 
			
		||||
    current_actor,
 | 
			
		||||
| 
						 | 
				
			
			@ -51,14 +51,17 @@ from tractor._portal import Portal
 | 
			
		|||
from tractor._runtime import Actor
 | 
			
		||||
from tractor._entry import _mp_main
 | 
			
		||||
from tractor._exceptions import ActorFailure
 | 
			
		||||
from tractor.msg.types import (
 | 
			
		||||
    Aid,
 | 
			
		||||
    SpawnSpec,
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
    pretty_struct,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ipc import IPCServer
 | 
			
		||||
    from ipc import (
 | 
			
		||||
        _server,
 | 
			
		||||
        Channel,
 | 
			
		||||
    )
 | 
			
		||||
    from ._supervise import ActorNursery
 | 
			
		||||
    ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -328,20 +331,21 @@ async def soft_kill(
 | 
			
		|||
    see `.hard_kill()`).
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    peer_aid: Aid = portal.channel.aid
 | 
			
		||||
    chan: Channel = portal.channel
 | 
			
		||||
    peer_aid: msgtypes.Aid = chan.aid
 | 
			
		||||
    try:
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            f'Soft killing sub-actor via portal request\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'(c=> {peer_aid}\n'
 | 
			
		||||
            f'  |_{proc}\n'
 | 
			
		||||
            f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n'
 | 
			
		||||
            f'   |_{proc}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # wait on sub-proc to signal termination
 | 
			
		||||
        await wait_func(proc)
 | 
			
		||||
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        with trio.CancelScope(shield=True):
 | 
			
		||||
            await maybe_wait_for_debugger(
 | 
			
		||||
            await debug.maybe_wait_for_debugger(
 | 
			
		||||
                child_in_debug=_runtime_vars.get(
 | 
			
		||||
                    '_debug_mode', False
 | 
			
		||||
                ),
 | 
			
		||||
| 
						 | 
				
			
			@ -465,7 +469,7 @@ async def trio_proc(
 | 
			
		|||
        "--uid",
 | 
			
		||||
        # TODO, how to pass this over "wire" encodings like
 | 
			
		||||
        # cmdline args?
 | 
			
		||||
        # -[ ] maybe we can add an `Aid.min_tuple()` ?
 | 
			
		||||
        # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
 | 
			
		||||
        str(subactor.uid),
 | 
			
		||||
        # Address the child must connect to on startup
 | 
			
		||||
        "--parent_addr",
 | 
			
		||||
| 
						 | 
				
			
			@ -483,13 +487,14 @@ async def trio_proc(
 | 
			
		|||
 | 
			
		||||
    cancelled_during_spawn: bool = False
 | 
			
		||||
    proc: trio.Process|None = None
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    ipc_server: _server.Server = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        try:
 | 
			
		||||
            proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Started new child\n'
 | 
			
		||||
                f'|_{proc}\n'
 | 
			
		||||
                f'Started new child subproc\n'
 | 
			
		||||
                f'(>\n'
 | 
			
		||||
                f' |_{proc}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # wait for actor to spawn and connect back to us
 | 
			
		||||
| 
						 | 
				
			
			@ -507,10 +512,10 @@ async def trio_proc(
 | 
			
		|||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    # don't clobber an ongoing pdb
 | 
			
		||||
                    if is_root_process():
 | 
			
		||||
                        await maybe_wait_for_debugger()
 | 
			
		||||
                        await debug.maybe_wait_for_debugger()
 | 
			
		||||
 | 
			
		||||
                    elif proc is not None:
 | 
			
		||||
                        async with acquire_debug_lock(subactor.uid):
 | 
			
		||||
                        async with debug.acquire_debug_lock(subactor.uid):
 | 
			
		||||
                            # soft wait on the proc to terminate
 | 
			
		||||
                            with trio.move_on_after(0.5):
 | 
			
		||||
                                await proc.wait()
 | 
			
		||||
| 
						 | 
				
			
			@ -528,14 +533,19 @@ async def trio_proc(
 | 
			
		|||
 | 
			
		||||
        # send a "spawning specification" which configures the
 | 
			
		||||
        # initial runtime state of the child.
 | 
			
		||||
        sspec = SpawnSpec(
 | 
			
		||||
        sspec = msgtypes.SpawnSpec(
 | 
			
		||||
            _parent_main_data=subactor._parent_main_data,
 | 
			
		||||
            enable_modules=subactor.enable_modules,
 | 
			
		||||
            reg_addrs=subactor.reg_addrs,
 | 
			
		||||
            bind_addrs=bind_addrs,
 | 
			
		||||
            _runtime_vars=_runtime_vars,
 | 
			
		||||
        )
 | 
			
		||||
        log.runtime(f'Sending spawn spec: {str(sspec)}')
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Sending spawn spec to child\n'
 | 
			
		||||
            f'{{}}=> {chan.aid.reprol()!r}\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{pretty_struct.pformat(sspec)}\n'
 | 
			
		||||
        )
 | 
			
		||||
        await chan.send(sspec)
 | 
			
		||||
 | 
			
		||||
        # track subactor in current nursery
 | 
			
		||||
| 
						 | 
				
			
			@ -563,7 +573,7 @@ async def trio_proc(
 | 
			
		|||
            # condition.
 | 
			
		||||
            await soft_kill(
 | 
			
		||||
                proc,
 | 
			
		||||
                trio.Process.wait,
 | 
			
		||||
                trio.Process.wait,  # XXX, uses `pidfd_open()` below.
 | 
			
		||||
                portal
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -571,8 +581,7 @@ async def trio_proc(
 | 
			
		|||
            # tandem if not done already
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                'Cancelling portal result reaper task\n'
 | 
			
		||||
                f'>c)\n'
 | 
			
		||||
                f' |_{subactor.uid}\n'
 | 
			
		||||
                f'c)> {subactor.aid.reprol()!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            nursery.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -581,21 +590,24 @@ async def trio_proc(
 | 
			
		|||
        # allowed! Do this **after** cancellation/teardown to avoid
 | 
			
		||||
        # killing the process too early.
 | 
			
		||||
        if proc:
 | 
			
		||||
            reap_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op='>x)',
 | 
			
		||||
                text=subactor.pformat(),
 | 
			
		||||
            )
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f'Hard reap sequence starting for subactor\n'
 | 
			
		||||
                f'>x)\n'
 | 
			
		||||
                f' |_{subactor}@{subactor.uid}\n'
 | 
			
		||||
                f'{reap_repr}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            with trio.CancelScope(shield=True):
 | 
			
		||||
                # don't clobber an ongoing pdb
 | 
			
		||||
                if cancelled_during_spawn:
 | 
			
		||||
                    # Try again to avoid TTY clobbering.
 | 
			
		||||
                    async with acquire_debug_lock(subactor.uid):
 | 
			
		||||
                    async with debug.acquire_debug_lock(subactor.uid):
 | 
			
		||||
                        with trio.move_on_after(0.5):
 | 
			
		||||
                            await proc.wait()
 | 
			
		||||
 | 
			
		||||
                await maybe_wait_for_debugger(
 | 
			
		||||
                await debug.maybe_wait_for_debugger(
 | 
			
		||||
                    child_in_debug=_runtime_vars.get(
 | 
			
		||||
                        '_debug_mode', False
 | 
			
		||||
                    ),
 | 
			
		||||
| 
						 | 
				
			
			@ -624,7 +636,7 @@ async def trio_proc(
 | 
			
		|||
                #     acquire the lock and get notified of who has it,
 | 
			
		||||
                #     check that uid against our known children?
 | 
			
		||||
                # this_uid: tuple[str, str] = current_actor().uid
 | 
			
		||||
                # await acquire_debug_lock(this_uid)
 | 
			
		||||
                # await debug.acquire_debug_lock(this_uid)
 | 
			
		||||
 | 
			
		||||
                if proc.poll() is None:
 | 
			
		||||
                    log.cancel(f"Attempting to hard kill {proc}")
 | 
			
		||||
| 
						 | 
				
			
			@ -727,7 +739,7 @@ async def mp_proc(
 | 
			
		|||
 | 
			
		||||
    log.runtime(f"Started {proc}")
 | 
			
		||||
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    ipc_server: _server.Server = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        # wait for actor to spawn and connect back to us
 | 
			
		||||
        # channel should have handshake completed by the
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue