From 79ef97305832ed6b3b111dbc42ca4108d3a6f8e9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jun 2025 14:47:03 -0400 Subject: [PATCH] Try `nest_from_op()` in some `._rpc` spots To start trying out, - using in the `Start`-msg handler-block to repr the msg coming *from* a `repr(Channel)` using '<=)` sclang op. - for a completed RPC task in `_invoke_non_context()`. - for the msg loop task's termination report. --- tractor/_rpc.py | 101 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 30 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 6a1da1c7..b6f8eb9b 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -64,6 +64,7 @@ from .trionics import ( from .devx import ( debug, add_div, + pformat as _pformat, ) from . import _state from .log import get_logger @@ -72,7 +73,7 @@ from .msg import ( MsgCodec, PayloadT, NamespacePath, - # pretty_struct, + pretty_struct, _ops as msgops, ) from tractor.msg.types import ( @@ -220,11 +221,18 @@ async def _invoke_non_context( task_status.started(ctx) result = await coro fname: str = func.__name__ + + op_nested_task: str = _pformat.nest_from_op( + input_op=f')> cid: {ctx.cid!r}', + text=f'{ctx._task}', + nest_indent=1, # under > + ) log.runtime( - 'RPC complete:\n' - f'task: {ctx._task}\n' - f'|_cid={ctx.cid}\n' - f'|_{fname}() -> {pformat(result)}\n' + f'RPC task complete\n' + f'\n' + f'{op_nested_task}\n' + f'\n' + f')> {fname}() -> {pformat(result)}\n' ) # NOTE: only send result if we know IPC isn't down @@ -1043,7 +1051,7 @@ async def process_messages( ): target_cid: str = kwargs['cid'] kwargs |= { - 'requesting_uid': chan.uid, + 'requesting_aid': chan.aid, 'ipc_msg': msg, # XXX NOTE! ONLY the rpc-task-owning @@ -1079,21 +1087,34 @@ async def process_messages( ns=ns, func=funcname, kwargs=kwargs, # type-spec this? see `msg.types` - uid=actorid, + uid=actor_uuid, ): + if actor_uuid != chan.aid.uid: + raise RuntimeError( + f'IPC msg <-> chan.aid mismatch!?\n' + f'Channel.aid = {chan.aid!r}\n' + f'Start.uid = {actor_uuid!r}\n' + ) + # await debug.pause() + op_repr: str = 'Start <=) ' + req_repr: str = _pformat.nest_from_op( + input_op=op_repr, + op_suffix='', + nest_prefix='', + text=f'{chan}', + + nest_indent=len(op_repr)-1, + rm_from_first_ln='<', + # ^XXX, subtract -1 to account for + # > {actor.uid}\n' - f' |_{actor}\n' - f' -> nsp: `{ns}.{funcname}({kwargs})`\n' - - # f' |_{ns}.{funcname}({kwargs})\n\n' - - # f'{pretty_struct.pformat(msg)}\n' + 'Handling RPC request\n' + f'{req_repr}\n' + f'\n' + f'->{{ ipc-context-id: {cid!r}\n' + f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n' ) # runtime-internal endpoint: `Actor.` @@ -1122,10 +1143,6 @@ async def process_messages( await chan.send(err_msg) continue - start_status += ( - f' -> func: {func}\n' - ) - # schedule a task for the requested RPC function # in the actor's main "service nursery". # @@ -1133,7 +1150,7 @@ async def process_messages( # supervision isolation? would avoid having to # manage RPC tasks individually in `._rpc_tasks` # table? - start_status += ' -> scheduling new task..\n' + start_status += '->( scheduling new task..\n' log.runtime(start_status) try: ctx: Context = await actor._service_n.start( @@ -1222,7 +1239,7 @@ async def process_messages( f'|_{chan}\n' ) await actor.cancel_rpc_tasks( - req_uid=actor.uid, + req_aid=actor.aid, # a "self cancel" in terms of the lifetime of the # IPC connection which is presumed to be the # source of any requests for spawned tasks. @@ -1294,13 +1311,37 @@ async def process_messages( finally: # msg debugging for when he machinery is brokey if msg is None: - message: str = 'Exiting IPC msg loop without receiving a msg?' + message: str = 'Exiting RPC-loop without receiving a msg?' else: + task_op_repr: str = ')>' + task: trio.Task = trio.lowlevel.current_task() + + # maybe add cancelled opt prefix + if task._cancel_status.effectively_cancelled: + task_op_repr = 'c' + task_op_repr + + task_repr: str = _pformat.nest_from_op( + input_op=task_op_repr, + text=f'{task!r}', + nest_indent=1, + ) + # chan_op_repr: str = '<=} ' + # chan_repr: str = _pformat.nest_from_op( + # input_op=chan_op_repr, + # op_suffix='', + # nest_prefix='', + # text=chan.pformat(), + # nest_indent=len(chan_op_repr)-1, + # rm_from_first_ln='<', + # ) message: str = ( - 'Exiting IPC msg loop with final msg\n\n' - f'<= peer: {chan.uid}\n' - f' |_{chan}\n\n' - # f'{pretty_struct.pformat(msg)}' + f'Exiting RPC-loop with final msg\n' + f'\n' + # f'{chan_repr}\n' + f'{task_repr}\n' + f'\n' + f'{pretty_struct.pformat(msg)}' + f'\n' ) log.runtime(message)