forked from goodboy/tractor
				
			Use `NamespacePath` in `Context` mgmt internals
The only case where we can't is in `Portal.run_from_ns()` usage (since we pass a path with `self:<Actor.meth>`) and because `.to_tuple()` internally uses `.load_ref()` which will of course fail on such a path.. So or now impl as, - mk `Actor.start_remote_task()` take a `nsf: NamespacePath` but also offer a `load_nsf: bool = False` such that by default we bypass ref loading (maybe this is fine for perf long run as well?) for the `Actor`/'self:'` case mentioned above. - mk `.get_context()` take an instance `nsf` obvi. More logging msg format tweaks: - change msg-flow related content to show the `Context._nsf`, which, right, is coming follow up commit.. - bunch more `.runtime()` format updates to show `msg: dict` contents and internal primitives with trailing `'\n'` for easier reading. - report import loading `stackscope` in subactors.remotes/1757153874605917753/main
							parent
							
								
									b28df738fe
								
							
						
					
					
						commit
						3c385c6949
					
				|  | @ -49,15 +49,12 @@ import trio | |||
| from trio import ( | ||||
|     CancelScope, | ||||
| ) | ||||
| from trio.lowlevel import ( | ||||
|     current_task, | ||||
|     Task, | ||||
| ) | ||||
| from trio_typing import ( | ||||
|     Nursery, | ||||
|     TaskStatus, | ||||
| ) | ||||
| 
 | ||||
| from .msg import NamespacePath | ||||
| from ._ipc import Channel | ||||
| from ._context import ( | ||||
|     mk_context, | ||||
|  | @ -138,8 +135,9 @@ async def _invoke( | |||
|     cs: CancelScope | None = None | ||||
| 
 | ||||
|     ctx = actor.get_context( | ||||
|         chan, | ||||
|         cid, | ||||
|         chan=chan, | ||||
|         cid=cid, | ||||
|         nsf=NamespacePath.from_ref(func), | ||||
|         # We shouldn't ever need to pass this through right? | ||||
|         # it's up to the soon-to-be called rpc task to | ||||
|         # open the stream with this option. | ||||
|  | @ -269,8 +267,8 @@ async def _invoke( | |||
| 
 | ||||
|                     # TODO: should would be nice to have our | ||||
|                     # `TaskMngr` nursery here! | ||||
|                     # res: Any = await coro | ||||
|                     res = await coro | ||||
|                     res: Any = await coro | ||||
|                     ctx._result = res | ||||
| 
 | ||||
|                     # deliver final result to caller side. | ||||
|                     await chan.send({ | ||||
|  | @ -308,11 +306,13 @@ async def _invoke( | |||
|                 # associated child isn't in debug any more | ||||
|                 await _debug.maybe_wait_for_debugger() | ||||
|                 ctx: Context = actor._contexts.pop((chan.uid, cid)) | ||||
|                 log.cancel( | ||||
|                     f'Context task was terminated:\n' | ||||
|                     f'func: {func}\n' | ||||
|                     f'ctx: {pformat(ctx)}' | ||||
|                 res_msg: str = ( | ||||
|                     'IPC context terminated with result:\n' | ||||
|                     f'result={ctx._result}\n' | ||||
|                     f'error={ctx._local_error}\n' | ||||
|                     f'|_{pformat(ctx)}\n\n' | ||||
|                 ) | ||||
|                 log.cancel(res_msg) | ||||
| 
 | ||||
|             if ctx.cancelled_caught: | ||||
| 
 | ||||
|  | @ -324,7 +324,6 @@ async def _invoke( | |||
|                     ctx._maybe_raise_remote_err(re) | ||||
| 
 | ||||
|                 # fname: str = func.__name__ | ||||
|                 task: Task = current_task() | ||||
|                 cs: CancelScope = ctx._scope | ||||
|                 if cs.cancel_called: | ||||
|                     our_uid: tuple = actor.uid | ||||
|  | @ -371,16 +370,16 @@ async def _invoke( | |||
|                         div_str + | ||||
|                         f'<= canceller: {canceller}\n' | ||||
|                         f'=> uid: {our_uid}\n' | ||||
|                         f'  |_ task: `{task.name}()`' | ||||
|                         f'  |_{ctx._task}()\n' | ||||
|                     ) | ||||
| 
 | ||||
|                     # TODO: does this ever get set any more or can | ||||
|                     # we remove it? | ||||
|                     if ctx._cancel_msg: | ||||
|                         msg += ( | ||||
|                             '------ - ------\n' | ||||
|                             'IPC msg:\n' | ||||
|                             f'{ctx._cancel_msg}' | ||||
|                             # '------ - ------\n' | ||||
|                             # 'IPC msg:\n' | ||||
|                             f'\n{ctx._cancel_msg}' | ||||
|                         ) | ||||
| 
 | ||||
|                     # task-contex was either cancelled by request using | ||||
|  | @ -428,7 +427,12 @@ async def _invoke( | |||
|                 task_status.started(ctx) | ||||
|                 result = await coro | ||||
|                 fname: str = func.__name__ | ||||
|                 log.runtime(f'{fname}() result: {result}') | ||||
|                 log.runtime( | ||||
|                     'RPC complete:\n' | ||||
|                     f'task: {ctx._task}\n' | ||||
|                     f'|_cid={ctx.cid}\n' | ||||
|                     f'|_{fname}() -> {pformat(result)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 # NOTE: only send result if we know IPC isn't down | ||||
|                 if ( | ||||
|  | @ -903,7 +907,7 @@ class Actor: | |||
|                     # and bail after timeout (2-generals on closure). | ||||
|                     assert chan.msgstream | ||||
| 
 | ||||
|                     log.runtime( | ||||
|                     log.warning( | ||||
|                         f'Draining lingering msgs from stream {chan.msgstream}' | ||||
|                     ) | ||||
| 
 | ||||
|  | @ -915,12 +919,24 @@ class Actor: | |||
|                         # making sure any RPC response to that call is | ||||
|                         # delivered the local calling task. | ||||
|                         # TODO: factor this into a helper? | ||||
|                         log.runtime(f'drained {msg} for {chan.uid}') | ||||
|                         log.warning( | ||||
|                             'Draining msg from disconnected\n' | ||||
|                             f'peer:  {chan.uid}]\n\n' | ||||
|                             f'{pformat(msg)}\n' | ||||
|                         ) | ||||
|                         cid = msg.get('cid') | ||||
|                         if cid: | ||||
|                             # deliver response to local caller/waiter | ||||
|                             await self._push_result(chan, cid, msg) | ||||
|                             await self._push_result( | ||||
|                                 chan, | ||||
|                                 cid, | ||||
|                                 msg, | ||||
|                             ) | ||||
| 
 | ||||
|                     log.runtime( | ||||
|                         'Waiting on local actor nursery to exit..\n' | ||||
|                         f'|_{local_nursery}\n' | ||||
|                     ) | ||||
|                     await local_nursery.exited.wait() | ||||
| 
 | ||||
|                 if disconnected: | ||||
|  | @ -1101,6 +1117,7 @@ class Actor: | |||
|         self, | ||||
|         chan: Channel, | ||||
|         cid: str, | ||||
|         nsf: NamespacePath, | ||||
| 
 | ||||
|         msg_buffer_size: int | None = None, | ||||
|         allow_overruns: bool = False, | ||||
|  | @ -1114,11 +1131,15 @@ class Actor: | |||
|         task-as-function invocation. | ||||
| 
 | ||||
|         ''' | ||||
|         log.runtime(f"Getting result queue for {chan.uid} cid {cid}") | ||||
|         actor_uid = chan.uid | ||||
|         assert actor_uid | ||||
|         try: | ||||
|             ctx = self._contexts[(actor_uid, cid)] | ||||
|             log.runtime( | ||||
|                 f'Retreived cached IPC ctx for\n' | ||||
|                 f'peer: {chan.uid}\n' | ||||
|                 f'cid:{cid}\n' | ||||
|             ) | ||||
|             ctx._allow_overruns = allow_overruns | ||||
| 
 | ||||
|             # adjust buffer size if specified | ||||
|  | @ -1127,9 +1148,15 @@ class Actor: | |||
|                 state.max_buffer_size = msg_buffer_size | ||||
| 
 | ||||
|         except KeyError: | ||||
|             log.runtime( | ||||
|                 f'Creating NEW IPC ctx for\n' | ||||
|                 f'peer: {chan.uid}\n' | ||||
|                 f'cid: {cid}\n' | ||||
|             ) | ||||
|             ctx = mk_context( | ||||
|                 chan, | ||||
|                 cid, | ||||
|                 nsf=nsf, | ||||
|                 msg_buffer_size=msg_buffer_size or self.msg_buffer_size, | ||||
|                 _allow_overruns=allow_overruns, | ||||
|             ) | ||||
|  | @ -1140,11 +1167,13 @@ class Actor: | |||
|     async def start_remote_task( | ||||
|         self, | ||||
|         chan: Channel, | ||||
|         ns: str, | ||||
|         func: str, | ||||
|         nsf: NamespacePath, | ||||
|         kwargs: dict, | ||||
| 
 | ||||
|         # IPC channel config | ||||
|         msg_buffer_size: int | None = None, | ||||
|         allow_overruns: bool = False, | ||||
|         load_nsf: bool = False, | ||||
| 
 | ||||
|     ) -> Context: | ||||
|         ''' | ||||
|  | @ -1159,20 +1188,43 @@ class Actor: | |||
|         cid = str(uuid.uuid4()) | ||||
|         assert chan.uid | ||||
|         ctx = self.get_context( | ||||
|             chan, | ||||
|             cid, | ||||
|             chan=chan, | ||||
|             cid=cid, | ||||
|             nsf=nsf, | ||||
|             msg_buffer_size=msg_buffer_size, | ||||
|             allow_overruns=allow_overruns, | ||||
|         ) | ||||
|         log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") | ||||
| 
 | ||||
|         if ( | ||||
|             'self' in nsf | ||||
|             or not load_nsf | ||||
|         ): | ||||
|             ns, _, func = nsf.partition(':') | ||||
|         else: | ||||
|             # TODO: pass nsf directly over wire! | ||||
|             # -[ ] but, how to do `self:<Actor.meth>`?? | ||||
|             ns, func = nsf.to_tuple() | ||||
| 
 | ||||
|         log.runtime( | ||||
|             'Sending cmd to\n' | ||||
|             f'peer: {chan.uid} => \n' | ||||
|             '\n' | ||||
|             f'=> {ns}.{func}({kwargs})\n' | ||||
|         ) | ||||
|         await chan.send( | ||||
|             {'cmd': (ns, func, kwargs, self.uid, cid)} | ||||
|             {'cmd': ( | ||||
|                 ns, | ||||
|                 func, | ||||
|                 kwargs, | ||||
|                 self.uid, | ||||
|                 cid, | ||||
|             )} | ||||
|         ) | ||||
| 
 | ||||
|         # Wait on first response msg and validate; this should be | ||||
|         # immediate. | ||||
|         first_msg = await ctx._recv_chan.receive() | ||||
|         functype = first_msg.get('functype') | ||||
|         first_msg: dict = await ctx._recv_chan.receive() | ||||
|         functype: str = first_msg.get('functype') | ||||
| 
 | ||||
|         if 'error' in first_msg: | ||||
|             raise unpack_error(first_msg, chan) | ||||
|  | @ -1210,8 +1262,12 @@ class Actor: | |||
|                 parent_data: dict[str, Any] | ||||
|                 parent_data = await chan.recv() | ||||
|                 log.runtime( | ||||
|                     "Received state from parent:\n" | ||||
|                     f"{parent_data}" | ||||
|                     'Received state from parent:\n\n' | ||||
|                     # TODO: eventually all these msgs as | ||||
|                     # `msgspec.Struct` with a special mode that | ||||
|                     # pformats them in multi-line mode, BUT only | ||||
|                     # if "trace"/"util" mode is enabled? | ||||
|                     f'{pformat(parent_data)}\n' | ||||
|                 ) | ||||
|                 accept_addr = ( | ||||
|                     parent_data.pop('bind_host'), | ||||
|  | @ -1221,6 +1277,7 @@ class Actor: | |||
| 
 | ||||
|                 if rvs['_debug_mode']: | ||||
|                     try: | ||||
|                         log.info('Enabling `stackscope` traces on SIGUSR1') | ||||
|                         from .devx import enable_stack_on_sig | ||||
|                         enable_stack_on_sig() | ||||
|                     except ImportError: | ||||
|  | @ -1288,7 +1345,9 @@ class Actor: | |||
|                     for listener in listeners | ||||
|                 ] | ||||
|                 log.runtime( | ||||
|                     f'Started tcp server(s) on {sockets}') | ||||
|                     'Started TCP server(s)\n' | ||||
|                     f'|_{sockets}\n' | ||||
|                 ) | ||||
|                 self._listeners.extend(listeners) | ||||
|                 task_status.started(server_n) | ||||
|         finally: | ||||
|  | @ -1772,7 +1831,7 @@ async def process_messages( | |||
|     log.runtime( | ||||
|         'Entering IPC msg loop:\n' | ||||
|         f'peer: {chan.uid}\n' | ||||
|         f'|_{chan}' | ||||
|         f'|_{chan}\n' | ||||
|     ) | ||||
|     nursery_cancelled_before_task: bool = False | ||||
|     msg: dict | None = None | ||||
|  | @ -1818,12 +1877,17 @@ async def process_messages( | |||
|                 if cid: | ||||
|                     # deliver response to local caller/waiter | ||||
|                     # via its per-remote-context memory channel. | ||||
|                     await actor._push_result(chan, cid, msg) | ||||
|                     await actor._push_result( | ||||
|                         chan, | ||||
|                         cid, | ||||
|                         msg, | ||||
|                     ) | ||||
| 
 | ||||
|                     log.runtime( | ||||
|                         f'Waiting on next IPC msg from {chan.uid}:\n' | ||||
|                         'Waiting on next IPC msg from\n' | ||||
|                         f'peer: {chan.uid}:\n' | ||||
|                         f'|_{chan}\n' | ||||
|                         # f'last msg: {msg}\n' | ||||
|                         f'|_{chan}' | ||||
|                     ) | ||||
|                     continue | ||||
| 
 | ||||
|  | @ -1843,9 +1907,11 @@ async def process_messages( | |||
|                     raise exc | ||||
| 
 | ||||
|                 log.runtime( | ||||
|                     f"Processing request from {actorid}\n" | ||||
|                     f"{ns}.{funcname}({kwargs})") | ||||
| 
 | ||||
|                     'Handling RPC cmd from\n' | ||||
|                     f'peer: {actorid}\n' | ||||
|                     '\n' | ||||
|                     f'=> {ns}.{funcname}({kwargs})\n' | ||||
|                 ) | ||||
|                 if ns == 'self': | ||||
|                     if funcname == 'cancel': | ||||
|                         func: Callable = actor.cancel | ||||
|  | @ -1954,17 +2020,18 @@ async def process_messages( | |||
|                 # in the lone case where a ``Context`` is not | ||||
|                 # delivered, it's likely going to be a locally | ||||
|                 # scoped exception from ``_invoke()`` itself. | ||||
|                 if isinstance(ctx, Exception): | ||||
|                 if isinstance(err := ctx, Exception): | ||||
|                     log.warning( | ||||
|                         f"Task for RPC func {func} failed with" | ||||
|                         f"{ctx}" | ||||
|                         'Task for RPC failed?' | ||||
|                         f'|_ {func}()\n\n' | ||||
| 
 | ||||
|                         f'{err}' | ||||
|                     ) | ||||
|                     continue | ||||
| 
 | ||||
|                 else: | ||||
|                     # mark that we have ongoing rpc tasks | ||||
|                     actor._ongoing_rpc_tasks = trio.Event() | ||||
|                     log.runtime(f"RPC func is {func}") | ||||
| 
 | ||||
|                     # store cancel scope such that the rpc task can be | ||||
|                     # cancelled gracefully if requested | ||||
|  | @ -1975,7 +2042,10 @@ async def process_messages( | |||
|                     ) | ||||
| 
 | ||||
|                 log.runtime( | ||||
|                     f"Waiting on next msg for {chan} from {chan.uid}") | ||||
|                     'Waiting on next IPC msg from\n' | ||||
|                     f'peer: {chan.uid}\n' | ||||
|                     f'|_{chan}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # end of async for, channel disconnect vis | ||||
|             # ``trio.EndOfChannel`` | ||||
|  | @ -1992,9 +2062,12 @@ async def process_messages( | |||
|         # handshake for them (yet) and instead we simply bail out of | ||||
|         # the message loop and expect the teardown sequence to clean | ||||
|         # up. | ||||
|         # TODO: don't show this msg if it's an emphemeral | ||||
|         # discovery ep call? | ||||
|         log.runtime( | ||||
|             f'channel from {chan.uid} closed abruptly:\n' | ||||
|             f'-> {chan.raddr}\n' | ||||
|             f'channel closed abruptly with\n' | ||||
|             f'peer: {chan.uid}\n'  | ||||
|             f'|_{chan.raddr}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # transport **was** disconnected | ||||
|  | @ -2036,9 +2109,11 @@ async def process_messages( | |||
|     finally: | ||||
|         # msg debugging for when he machinery is brokey | ||||
|         log.runtime( | ||||
|             f'Exiting IPC msg loop with {chan.uid} ' | ||||
|             f'final msg: {msg}\n' | ||||
|             f'|_{chan}' | ||||
|             'Exiting IPC msg loop with\n' | ||||
|             f'peer: {chan.uid}\n' | ||||
|             f'|_{chan}\n\n' | ||||
|             'final msg:\n' | ||||
|             f'{pformat(msg)}\n' | ||||
|         ) | ||||
| 
 | ||||
|     # transport **was not** disconnected | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue