diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 4e7f9fa..c41f6f5 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -48,15 +48,12 @@ import trio from trio import ( CancelScope, ) -from trio.lowlevel import ( - current_task, - Task, -) from trio_typing import ( Nursery, TaskStatus, ) +from .msg import NamespacePath from ._ipc import Channel from ._context import ( mk_context, @@ -145,8 +142,9 @@ async def _invoke( cs: CancelScope | None = None ctx = actor.get_context( - chan, - cid, + chan=chan, + cid=cid, + nsf=NamespacePath.from_ref(func), # We shouldn't ever need to pass this through right? # it's up to the soon-to-be called rpc task to # open the stream with this option. @@ -276,8 +274,8 @@ async def _invoke( # TODO: should would be nice to have our # `TaskMngr` nursery here! - # res: Any = await coro - res = await coro + res: Any = await coro + ctx._result = res # deliver final result to caller side. await chan.send({ @@ -315,11 +313,13 @@ async def _invoke( # associated child isn't in debug any more await maybe_wait_for_debugger() ctx: Context = actor._contexts.pop((chan.uid, cid)) - log.cancel( - f'Context task was terminated:\n' - f'func: {func}\n' - f'ctx: {pformat(ctx)}' + res_msg: str = ( + 'IPC context terminated with result:\n' + f'result={ctx._result}\n' + f'error={ctx._local_error}\n' + f'|_{pformat(ctx)}\n\n' ) + log.cancel(res_msg) if ctx.cancelled_caught: @@ -331,7 +331,6 @@ async def _invoke( ctx._maybe_raise_remote_err(re) # fname: str = func.__name__ - task: Task = current_task() cs: CancelScope = ctx._scope if cs.cancel_called: our_uid: tuple = actor.uid @@ -378,16 +377,16 @@ async def _invoke( div_str + f'<= canceller: {canceller}\n' f'=> uid: {our_uid}\n' - f' |_ task: `{task.name}()`' + f' |_{ctx._task}()\n' ) # TODO: does this ever get set any more or can # we remove it? if ctx._cancel_msg: msg += ( - '------ - ------\n' - 'IPC msg:\n' - f'{ctx._cancel_msg}' + # '------ - ------\n' + # 'IPC msg:\n' + f'\n{ctx._cancel_msg}' ) # task-contex was either cancelled by request using @@ -435,7 +434,12 @@ async def _invoke( task_status.started(ctx) result = await coro fname: str = func.__name__ - log.runtime(f'{fname}() result: {result}') + log.runtime( + 'RPC complete:\n' + f'task: {ctx._task}\n' + f'|_cid={ctx.cid}\n' + f'|_{fname}() -> {pformat(result)}\n' + ) # NOTE: only send result if we know IPC isn't down if ( @@ -965,7 +969,7 @@ class Actor: # and bail after timeout (2-generals on closure). assert chan.msgstream - log.runtime( + log.warning( f'Draining lingering msgs from stream {chan.msgstream}' ) @@ -977,13 +981,24 @@ class Actor: # making sure any RPC response to that call is # delivered the local calling task. # TODO: factor this into a helper? - log.runtime(f'drained {msg} for {chan.uid}') + log.warning( + 'Draining msg from disconnected\n' + f'peer: {chan.uid}]\n\n' + f'{pformat(msg)}\n' + ) cid = msg.get('cid') if cid: # deliver response to local caller/waiter - await self._push_result(chan, cid, msg) + await self._push_result( + chan, + cid, + msg, + ) - log.runtime('Waiting on actor nursery to exit..') + log.runtime( + 'Waiting on local actor nursery to exit..\n' + f'|_{local_nursery}\n' + ) await local_nursery.exited.wait() if disconnected: @@ -1167,6 +1182,7 @@ class Actor: self, chan: Channel, cid: str, + nsf: NamespacePath, msg_buffer_size: int | None = None, allow_overruns: bool = False, @@ -1180,11 +1196,15 @@ class Actor: task-as-function invocation. ''' - log.runtime(f"Getting result queue for {chan.uid} cid {cid}") actor_uid = chan.uid assert actor_uid try: ctx = self._contexts[(actor_uid, cid)] + log.runtime( + f'Retreived cached IPC ctx for\n' + f'peer: {chan.uid}\n' + f'cid:{cid}\n' + ) ctx._allow_overruns = allow_overruns # adjust buffer size if specified @@ -1193,9 +1213,15 @@ class Actor: state.max_buffer_size = msg_buffer_size except KeyError: + log.runtime( + f'Creating NEW IPC ctx for\n' + f'peer: {chan.uid}\n' + f'cid: {cid}\n' + ) ctx = mk_context( chan, cid, + nsf=nsf, msg_buffer_size=msg_buffer_size or self.msg_buffer_size, _allow_overruns=allow_overruns, ) @@ -1206,11 +1232,13 @@ class Actor: async def start_remote_task( self, chan: Channel, - ns: str, - func: str, + nsf: NamespacePath, kwargs: dict, + + # IPC channel config msg_buffer_size: int | None = None, allow_overruns: bool = False, + load_nsf: bool = False, ) -> Context: ''' @@ -1225,20 +1253,43 @@ class Actor: cid = str(uuid.uuid4()) assert chan.uid ctx = self.get_context( - chan, - cid, + chan=chan, + cid=cid, + nsf=nsf, msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, ) - log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") + + if ( + 'self' in nsf + or not load_nsf + ): + ns, _, func = nsf.partition(':') + else: + # TODO: pass nsf directly over wire! + # -[ ] but, how to do `self:`?? + ns, func = nsf.to_tuple() + + log.runtime( + 'Sending cmd to\n' + f'peer: {chan.uid} => \n' + '\n' + f'=> {ns}.{func}({kwargs})\n' + ) await chan.send( - {'cmd': (ns, func, kwargs, self.uid, cid)} + {'cmd': ( + ns, + func, + kwargs, + self.uid, + cid, + )} ) # Wait on first response msg and validate; this should be # immediate. - first_msg = await ctx._recv_chan.receive() - functype = first_msg.get('functype') + first_msg: dict = await ctx._recv_chan.receive() + functype: str = first_msg.get('functype') if 'error' in first_msg: raise unpack_error(first_msg, chan) @@ -1280,14 +1331,19 @@ class Actor: parent_data: dict[str, Any] parent_data = await chan.recv() log.runtime( - "Received state from parent:\n" - f"{parent_data}" + 'Received state from parent:\n\n' + # TODO: eventually all these msgs as + # `msgspec.Struct` with a special mode that + # pformats them in multi-line mode, BUT only + # if "trace"/"util" mode is enabled? + f'{pformat(parent_data)}\n' ) accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') rvs = parent_data.pop('_runtime_vars') if rvs['_debug_mode']: try: + log.info('Enabling `stackscope` traces on SIGUSR1') from .devx import enable_stack_on_sig enable_stack_on_sig() except ImportError: @@ -1368,7 +1424,8 @@ class Actor: for listener in listeners ] log.runtime( - f'Started tcp server(s) on {sockets}' + 'Started TCP server(s)\n' + f'|_{sockets}\n' ) self._listeners.extend(listeners) @@ -1923,7 +1980,7 @@ async def process_messages( log.runtime( 'Entering IPC msg loop:\n' f'peer: {chan.uid}\n' - f'|_{chan}' + f'|_{chan}\n' ) nursery_cancelled_before_task: bool = False msg: dict | None = None @@ -1969,12 +2026,17 @@ async def process_messages( if cid: # deliver response to local caller/waiter # via its per-remote-context memory channel. - await actor._push_result(chan, cid, msg) + await actor._push_result( + chan, + cid, + msg, + ) log.runtime( - f'Waiting on next IPC msg from {chan.uid}:\n' + 'Waiting on next IPC msg from\n' + f'peer: {chan.uid}:\n' + f'|_{chan}\n' # f'last msg: {msg}\n' - f'|_{chan}' ) continue @@ -1994,9 +2056,11 @@ async def process_messages( raise exc log.runtime( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - + 'Handling RPC cmd from\n' + f'peer: {actorid}\n' + '\n' + f'=> {ns}.{funcname}({kwargs})\n' + ) if ns == 'self': if funcname == 'cancel': func: Callable = actor.cancel @@ -2105,17 +2169,18 @@ async def process_messages( # in the lone case where a ``Context`` is not # delivered, it's likely going to be a locally # scoped exception from ``_invoke()`` itself. - if isinstance(ctx, Exception): + if isinstance(err := ctx, Exception): log.warning( - f"Task for RPC func {func} failed with" - f"{ctx}" + 'Task for RPC failed?' + f'|_ {func}()\n\n' + + f'{err}' ) continue else: # mark that we have ongoing rpc tasks actor._ongoing_rpc_tasks = trio.Event() - log.runtime(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested @@ -2126,7 +2191,10 @@ async def process_messages( ) log.runtime( - f"Waiting on next msg for {chan} from {chan.uid}") + 'Waiting on next IPC msg from\n' + f'peer: {chan.uid}\n' + f'|_{chan}\n' + ) # end of async for, channel disconnect vis # ``trio.EndOfChannel`` @@ -2143,9 +2211,12 @@ async def process_messages( # handshake for them (yet) and instead we simply bail out of # the message loop and expect the teardown sequence to clean # up. + # TODO: don't show this msg if it's an emphemeral + # discovery ep call? log.runtime( - f'channel from {chan.uid} closed abruptly:\n' - f'-> {chan.raddr}\n' + f'channel closed abruptly with\n' + f'peer: {chan.uid}\n' + f'|_{chan.raddr}\n' ) # transport **was** disconnected @@ -2187,9 +2258,11 @@ async def process_messages( finally: # msg debugging for when he machinery is brokey log.runtime( - f'Exiting IPC msg loop with {chan.uid} ' - f'final msg: {msg}\n' - f'|_{chan}' + 'Exiting IPC msg loop with\n' + f'peer: {chan.uid}\n' + f'|_{chan}\n\n' + 'final msg:\n' + f'{pformat(msg)}\n' ) # transport **was not** disconnected