forked from goodboy/tractor
				
			Set `._cancel_msg` to RPC `{cmd: 'self._cancel_task', ..}` msg
Like how we set `Context._cancel_msg` in `._deliver_msg()` (in
which case normally it's an `{'error': ..}` msg), do the same when any
RPC task is remotely cancelled via `Actor._cancel_task` where that task
doesn't yet have a cancel msg set yet.
This makes is much easier to distinguish between ctx cancellations due
to some remote error vs. Explicit remote requests via any of
`Actor.cancel()`, `Portal.cancel_actor()` or `Context.cancel()`.
			
			
				remotes/1757153874605917753/main
			
			
		
							parent
							
								
									9966dbdfc1
								
							
						
					
					
						commit
						c5228e7be5
					
				|  | @ -299,7 +299,7 @@ async def _errors_relayed_via_ipc( | |||
|                     ) | ||||
|                 ) | ||||
|             ): | ||||
|                 # await pause() | ||||
|                 # await _debug.pause() | ||||
|                 # XXX QUESTION XXX: is there any case where we'll | ||||
|                 # want to debug IPC disconnects as a default? | ||||
|                 # => I can't think of a reason that inspecting this | ||||
|  | @ -319,6 +319,12 @@ async def _errors_relayed_via_ipc( | |||
|             cid=ctx.cid, | ||||
|         ) | ||||
| 
 | ||||
|         # NOTE: the src actor should always be packed into the | ||||
|         # error.. but how should we verify this? | ||||
|         # assert err_msg['src_actor_uid'] | ||||
|         # if not err_msg['error'].get('src_actor_uid'): | ||||
|         #     import pdbp; pdbp.set_trace() | ||||
| 
 | ||||
|         if is_rpc: | ||||
|             try: | ||||
|                 await chan.send(err_msg) | ||||
|  | @ -559,6 +565,7 @@ async def _invoke( | |||
|             #   inside ._context._drain_to_final_msg()`.. | ||||
|             #   # TODO: remove this ^ right? | ||||
|             if ctx._scope.cancelled_caught: | ||||
|                 our_uid: tuple = actor.uid | ||||
| 
 | ||||
|                 # first check for and raise any remote error | ||||
|                 # before raising any context cancelled case | ||||
|  | @ -568,8 +575,9 @@ async def _invoke( | |||
|                     ctx._maybe_raise_remote_err(re) | ||||
| 
 | ||||
|                 cs: CancelScope = ctx._scope | ||||
| 
 | ||||
|                 if cs.cancel_called: | ||||
|                     our_uid: tuple = actor.uid | ||||
| 
 | ||||
|                     canceller: tuple = ctx.canceller | ||||
|                     msg: str = ( | ||||
|                         'actor was cancelled by ' | ||||
|  | @ -625,15 +633,6 @@ async def _invoke( | |||
|                         # f'  |_{ctx}' | ||||
|                     ) | ||||
| 
 | ||||
|                     # TODO: does this ever get set any more or can | ||||
|                     # we remove it? | ||||
|                     if ctx._cancel_msg: | ||||
|                         msg += ( | ||||
|                             # '------ - ------\n' | ||||
|                             # 'IPC msg:\n' | ||||
|                             f'\n\n{ctx._cancel_msg}' | ||||
|                         ) | ||||
| 
 | ||||
|                     # task-contex was either cancelled by request using | ||||
|                     # ``Portal.cancel_actor()`` or ``Context.cancel()`` | ||||
|                     # on the far end, or it was cancelled by the local | ||||
|  | @ -1677,7 +1676,9 @@ class Actor: | |||
|         self, | ||||
|         cid: str, | ||||
|         parent_chan: Channel, | ||||
|         requesting_uid: tuple[str, str] | None = None, | ||||
| 
 | ||||
|         requesting_uid: tuple[str, str]|None = None, | ||||
|         ipc_msg: dict|None|bool = False, | ||||
| 
 | ||||
|     ) -> bool: | ||||
|         ''' | ||||
|  | @ -1688,16 +1689,13 @@ class Actor: | |||
|         in the signature (for now). | ||||
| 
 | ||||
|         ''' | ||||
|         # this ctx based lookup ensures the requested task to | ||||
|         # be cancelled was indeed spawned by a request from | ||||
|         # this channel | ||||
| 
 | ||||
|         # this ctx based lookup ensures the requested task to be | ||||
|         # cancelled was indeed spawned by a request from its | ||||
|         # parent (or some grandparent's) channel | ||||
|         ctx: Context | ||||
|         func: Callable | ||||
|         is_complete: trio.Event | ||||
| 
 | ||||
|         # NOTE: right now this is only implicitly called by | ||||
|         # streaming IPC but it should be called | ||||
|         # to cancel any remotely spawned task | ||||
|         try: | ||||
|             ( | ||||
|                 ctx, | ||||
|  | @ -1725,20 +1723,23 @@ class Actor: | |||
| 
 | ||||
|         log.cancel( | ||||
|             'Cancel request for RPC task\n\n' | ||||
|             f'<= ._cancel_task(): {requesting_uid}\n' | ||||
|             f'  |_ @{ctx.dmaddr}\n\n' | ||||
|             f'<= Actor.cancel_task(): {requesting_uid}\n\n' | ||||
|             f'=> {ctx._task}\n' | ||||
|             f'  |_ >> {ctx.repr_rpc}\n' | ||||
|             # f'  >> Actor._cancel_task() => {ctx._task}\n' | ||||
|             # f'  |_ {ctx._task}\n\n' | ||||
| 
 | ||||
|             # TODO: better ascii repr for "supervisor" like | ||||
|             # a nursery or context scope? | ||||
|             # f'=> {parent_chan}\n' | ||||
|             f'=> {ctx._task}\n' | ||||
|             # f'   |_{ctx._task}\n' | ||||
|             # TODO: simplified `Context.__repr__()` fields output | ||||
|             # shows only application state-related stuff like, | ||||
|             # - ._stream | ||||
|             # - .closed | ||||
|             # - .started_called | ||||
|             # - .. etc. | ||||
|             f'  >> {ctx.repr_rpc}\n' | ||||
|             # f'     >> {ctx.repr_rpc}\n' | ||||
|             # f'  |_ctx: {cid}\n' | ||||
|             # f'    >> {ctx._nsf}()\n' | ||||
|         ) | ||||
|  | @ -1748,6 +1749,16 @@ class Actor: | |||
|         ): | ||||
|             ctx._canceller: tuple = requesting_uid | ||||
| 
 | ||||
|         # TODO: pack the RPC `{'cmd': <blah>}` msg into a ctxc and | ||||
|         # then raise and pack it here? | ||||
|         if ( | ||||
|             ipc_msg | ||||
|             and ctx._cancel_msg is None | ||||
|         ): | ||||
|             # assign RPC msg directly from the loop which usually | ||||
|             # the case with `ctx.cancel()` on the other side. | ||||
|             ctx._cancel_msg = ipc_msg | ||||
| 
 | ||||
|         # don't allow cancelling this function mid-execution | ||||
|         # (is this necessary?) | ||||
|         if func is self._cancel_task: | ||||
|  | @ -1828,10 +1839,15 @@ class Actor: | |||
|             else | ||||
|             "IPC channel's " | ||||
|         ) | ||||
| 
 | ||||
|         rent_chan_repr: str = ( | ||||
|             f'|_{parent_chan}' | ||||
|             if parent_chan | ||||
|             else '' | ||||
|         ) | ||||
|         log.cancel( | ||||
|             f'Cancelling {descr} {len(tasks)} rpc tasks\n\n' | ||||
|             f'<= .cancel_rpc_tasks(): {req_uid}\n' | ||||
|             f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n' | ||||
|             f'    {rent_chan_repr}\n' | ||||
|             # f'{self}\n' | ||||
|             # f'{tasks_str}' | ||||
|         ) | ||||
|  | @ -1851,9 +1867,6 @@ class Actor: | |||
|             ): | ||||
|                 continue | ||||
| 
 | ||||
|             # if func == self._cancel_task: | ||||
|             #     continue | ||||
| 
 | ||||
|             # TODO: this maybe block on the task cancellation | ||||
|             # and so should really done in a nursery batch? | ||||
|             await self._cancel_task( | ||||
|  | @ -2198,6 +2211,8 @@ async def process_messages( | |||
|                             await actor._cancel_task( | ||||
|                                 cid, | ||||
|                                 channel, | ||||
| 
 | ||||
|                                 ipc_msg=msg, | ||||
|                             ) | ||||
|                     break | ||||
| 
 | ||||
|  | @ -2308,6 +2323,7 @@ async def process_messages( | |||
|                             # cancel it! | ||||
|                             'parent_chan': chan, | ||||
|                             'requesting_uid': chan.uid, | ||||
|                             'ipc_msg': msg, | ||||
|                         } | ||||
|                         # TODO: remove? already have emit in meth. | ||||
|                         # log.runtime( | ||||
|  | @ -2575,7 +2591,7 @@ class Arbiter(Actor): | |||
|         sockaddr: tuple[str, int] | ||||
| 
 | ||||
|         for (aname, _), sockaddr in self._registry.items(): | ||||
|             log.info( | ||||
|             log.runtime( | ||||
|                 f'Actor mailbox info:\n' | ||||
|                 f'aname: {aname}\n' | ||||
|                 f'sockaddr: {sockaddr}\n' | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue