forked from goodboy/tractor
				
			Tweak `Actor` cancel method signatures
Besides improving a bunch more log msg contents similarly as before this changes the cancel method signatures slightly with different arg names: for `.cancel()`: - instead of `requesting_uid: str` take in a `req_chan: Channel` since we can always just read its `.uid: tuple` for logging and further we can then offer the `chan=None` case indicating a "self cancel" (since there's no "requesting channel"). - the semantics of "requesting" here better indicate that the IPC connection is an IPC peer and further (eventually) will allow permission checking against given peers for cancellation requests. - when `chan==None` we also define a meth-internal `requester_type: str` differently for logging content :) - add much more detailed `.cancel()` content around the requester, its type, and any debugger related locking steps. for `._cancel_task()`: - change the `chan` arg to `parent_chan: Channel` since "parent" correctly indicates that the channel is the parent of the locally spawned rpc task to cancel; in fact no other chan should be able to cancel tasks parented/spawned by other channels obvi! - also add more extensive meth-internal `.cancel()` logging with a #TODO around showing only the "relevant/lasest" `Context` state vars in such logging content. for `.cancel_rpc_tasks()`: - shorten `requesting_uid` -> `req_uid`. - add `parent_chan: Channel` to be similar as above in `._cancel_task()` (since it's internally delegated to anyway) which replaces the prior `only_chan` and use it to filter to only tasks spawned by this channel (thus as their "parent") as before. - instead of `if tasks:` to enter, invert and `return` early on `if not tasks`, for less indentation B) - add WIP str-repr format (for `.cancel()` emissions) to show a multi-address (maddr) + task func (via the new `Context._nsf`) and report all cancel task targets with it a "tree"; include #TODO to finalize and implement some utils for all this! To match ensure we adjust `process_messages()` self/`Actor` cancel handling blocks to provide the new `kwargs` (now with `dict`-merge syntax) to `._invoke()`.remotes/1757153874605917753/main
							parent
							
								
									9194e5774b
								
							
						
					
					
						commit
						9420ea0c14
					
				|  | @ -315,7 +315,7 @@ async def _invoke( | |||
|                     else f'result: {ctx._result}' | ||||
|                 ) | ||||
|                 log.cancel( | ||||
|                     f'IPC context terminated with final {res_str}\n' | ||||
|                     f'IPC context terminated with final {res_str}\n\n' | ||||
|                     f'|_{pformat(ctx)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|  | @ -957,14 +957,14 @@ class Actor: | |||
|                         poll = getattr(proc, 'poll', None) | ||||
|                         if poll and poll() is None: | ||||
|                             log.cancel( | ||||
|                                 f'Peer actor IPC broke but proc is alive?\n' | ||||
|                                 f'uid: {uid}\n' | ||||
|                                 f'|_{proc}\n' | ||||
|                                 f'Peer IPC broke but subproc is alive?\n\n' | ||||
| 
 | ||||
|                                 f'<=x @{chan.raddr}\n' | ||||
|                                 f'   |_{proc}\n' | ||||
|                             ) | ||||
| 
 | ||||
|             # ``Channel`` teardown and closure sequence | ||||
| 
 | ||||
|             # Drop ref to channel so it can be gc-ed and disconnected | ||||
|             # drop ref to channel so it can be gc-ed and disconnected | ||||
|             log.runtime( | ||||
|                 f'Disconnected IPC channel:\n' | ||||
|                 f'uid: {chan.uid}\n' | ||||
|  | @ -1112,8 +1112,12 @@ class Actor: | |||
|             ctx: Context = self._contexts[(uid, cid)] | ||||
|         except KeyError: | ||||
|             log.warning( | ||||
|                 f'Ignoring msg from [no-longer/un]known context {uid}:' | ||||
|                 f'\n{msg}') | ||||
|                 'Ignoring invalid IPC ctx msg!\n\n' | ||||
|                 f'<= sender: {uid}\n' | ||||
|                 f'=> cid: {cid}\n\n' | ||||
| 
 | ||||
|                 f'{msg}\n' | ||||
|             ) | ||||
|             return | ||||
| 
 | ||||
|         return await ctx._deliver_msg(msg) | ||||
|  | @ -1310,9 +1314,12 @@ class Actor: | |||
| 
 | ||||
|         except OSError:  # failed to connect | ||||
|             log.warning( | ||||
|                 f"Failed to connect to parent @ {parent_addr}," | ||||
|                 " closing server") | ||||
|             await self.cancel(requesting_uid=self.uid) | ||||
|                 f'Failed to connect to parent!?\n\n' | ||||
|                 'Closing IPC [TCP] transport server to\n' | ||||
|                 f'{parent_addr}\n' | ||||
|                 f'|_{self}\n\n' | ||||
|             ) | ||||
|             await self.cancel(chan=None)  # self cancel | ||||
|             raise | ||||
| 
 | ||||
|     async def _serve_forever( | ||||
|  | @ -1368,28 +1375,55 @@ class Actor: | |||
| 
 | ||||
|         ''' | ||||
|         assert self._service_n | ||||
|         self._service_n.start_soon(self.cancel) | ||||
|         self._service_n.start_soon( | ||||
|             self.cancel, | ||||
|             None,  # self cancel all rpc tasks | ||||
|         ) | ||||
| 
 | ||||
|     async def cancel( | ||||
|         self, | ||||
|         requesting_uid: tuple[str, str], | ||||
| 
 | ||||
|         # chan whose lifetime limits the lifetime of its remotely | ||||
|         # requested and locally spawned RPC tasks - similar to the | ||||
|         # supervision semantics of a nursery wherein the actual | ||||
|         # implementation does start all such tasks in | ||||
|         # a sub-nursery. | ||||
|         req_chan: Channel|None, | ||||
| 
 | ||||
|     ) -> bool: | ||||
|         ''' | ||||
|         Cancel this actor's runtime. | ||||
|         Cancel this actor's runtime, eventually resulting in | ||||
|         the exit its containing process. | ||||
| 
 | ||||
|         The "deterministic" teardown sequence in order is: | ||||
|             - cancel all ongoing rpc tasks by cancel scope | ||||
|             - cancel the channel server to prevent new inbound | ||||
|               connections | ||||
|             - cancel the "service" nursery reponsible for | ||||
|               spawning new rpc tasks | ||||
|             - return control the parent channel message loop | ||||
|         The ideal "deterministic" teardown sequence in order is: | ||||
|          - cancel all ongoing rpc tasks by cancel scope | ||||
|          - cancel the channel server to prevent new inbound | ||||
|            connections | ||||
|          - cancel the "service" nursery reponsible for | ||||
|            spawning new rpc tasks | ||||
|          - return control the parent channel message loop | ||||
| 
 | ||||
|         ''' | ||||
|         log.cancel( | ||||
|             f'{self.uid} requested to cancel by:\n' | ||||
|             f'{requesting_uid}' | ||||
|         ( | ||||
|             requesting_uid, | ||||
|             requester_type, | ||||
|             req_chan, | ||||
| 
 | ||||
|         ) = ( | ||||
|             req_chan.uid, | ||||
|             'peer', | ||||
|             req_chan, | ||||
| 
 | ||||
|         ) if req_chan else ( | ||||
| 
 | ||||
|             # a self cancel of ALL rpc tasks | ||||
|             self.uid, | ||||
|             'self', | ||||
|             self | ||||
|         ) | ||||
|         msg: str = ( | ||||
|             f'`Actor.cancel()` request from {requester_type}:\n' | ||||
|             f'<= {requesting_uid}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # TODO: what happens here when we self-cancel tho? | ||||
|  | @ -1403,12 +1437,16 @@ class Actor: | |||
|             # with the root actor in this tree | ||||
|             dbcs = _debug.Lock._debugger_request_cs | ||||
|             if dbcs is not None: | ||||
|                 log.cancel("Cancelling active debugger request") | ||||
|                 msg += ( | ||||
|                     '>> Cancelling active debugger request..\n' | ||||
|                     f'|_{_debug.Lock}\n' | ||||
|                 ) | ||||
|                 dbcs.cancel() | ||||
| 
 | ||||
|             # kill all ongoing tasks | ||||
|             # self-cancel **all** ongoing RPC tasks | ||||
|             await self.cancel_rpc_tasks( | ||||
|                 requesting_uid=requesting_uid, | ||||
|                 req_uid=requesting_uid, | ||||
|                 parent_chan=None, | ||||
|             ) | ||||
| 
 | ||||
|             # stop channel server | ||||
|  | @ -1417,13 +1455,14 @@ class Actor: | |||
|                 await self._server_down.wait() | ||||
|             else: | ||||
|                 log.warning( | ||||
|                     f'{self.uid} was likely cancelled before it started') | ||||
|                     'Transport[TCP] server was cancelled start?' | ||||
|                 ) | ||||
| 
 | ||||
|             # cancel all rpc tasks permanently | ||||
|             if self._service_n: | ||||
|                 self._service_n.cancel_scope.cancel() | ||||
| 
 | ||||
|         log.cancel(f"{self.uid} called `Actor.cancel()`") | ||||
|         log.cancel(msg) | ||||
|         self._cancel_complete.set() | ||||
|         return True | ||||
| 
 | ||||
|  | @ -1438,7 +1477,7 @@ class Actor: | |||
|     async def _cancel_task( | ||||
|         self, | ||||
|         cid: str, | ||||
|         chan: Channel, | ||||
|         parent_chan: Channel, | ||||
|         requesting_uid: tuple[str, str] | None = None, | ||||
| 
 | ||||
|     ) -> bool: | ||||
|  | @ -1450,13 +1489,25 @@ class Actor: | |||
|         in the signature (for now). | ||||
| 
 | ||||
|         ''' | ||||
|         # right now this is only implicitly called by | ||||
|         # this ctx based lookup ensures the requested task to | ||||
|         # be cancelled was indeed spawned by a request from | ||||
|         # this channel | ||||
|         ctx: Context | ||||
|         func: Callable | ||||
|         is_complete: trio.Event | ||||
| 
 | ||||
|         # NOTE: right now this is only implicitly called by | ||||
|         # streaming IPC but it should be called | ||||
|         # to cancel any remotely spawned task | ||||
|         try: | ||||
|             # this ctx based lookup ensures the requested task to | ||||
|             # be cancelled was indeed spawned by a request from this channel | ||||
|             ctx, func, is_complete = self._rpc_tasks[(chan, cid)] | ||||
|             ( | ||||
|                 ctx, | ||||
|                 func, | ||||
|                 is_complete, | ||||
|             ) = self._rpc_tasks[( | ||||
|                 parent_chan, | ||||
|                 cid, | ||||
|             )] | ||||
|             scope: CancelScope = ctx._scope | ||||
| 
 | ||||
|         except KeyError: | ||||
|  | @ -1467,17 +1518,28 @@ class Actor: | |||
|             # - callee errors prior to cancel req. | ||||
|             log.cancel( | ||||
|                 'Cancel request invalid, RPC task already completed?\n' | ||||
|                 f'<= canceller: {requesting_uid}\n' | ||||
|                 f'  |_{chan}\n\n' | ||||
| 
 | ||||
|                 f'=> ctx id: {cid}\n' | ||||
|                 f'<= canceller: {requesting_uid}\n\n' | ||||
|                 f'=>{parent_chan}\n' | ||||
|                 f'  |_ctx-id: {cid}\n' | ||||
|             ) | ||||
|             return True | ||||
| 
 | ||||
|         log.cancel( | ||||
|             f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" | ||||
|             f"peer: {chan.uid}\n") | ||||
|             'Cancel request for RPC task\n' | ||||
|             f'<= canceller: {requesting_uid}\n\n' | ||||
| 
 | ||||
|             # TODO: better ascii repr for "supervisor" like | ||||
|             # a nursery or context scope? | ||||
|             f'=> ipc-parent: {parent_chan}\n' | ||||
|             # TODO: simplified `Context.__repr__()` fields output | ||||
|             # shows only application state-related stuff like, | ||||
|             # - ._stream | ||||
|             # - .closed | ||||
|             # - .started_called | ||||
|             # - .. etc. | ||||
|             f'  |_ctx: {cid}\n' | ||||
|             f'    >> {ctx._nsf}()\n' | ||||
|         ) | ||||
|         if ( | ||||
|             ctx._canceller is None | ||||
|             and requesting_uid | ||||
|  | @ -1487,6 +1549,7 @@ class Actor: | |||
|         # don't allow cancelling this function mid-execution | ||||
|         # (is this necessary?) | ||||
|         if func is self._cancel_task: | ||||
|             log.error('Do not cancel a cancel!?') | ||||
|             return True | ||||
| 
 | ||||
|         # TODO: shouldn't we eventually be calling ``Context.cancel()`` | ||||
|  | @ -1496,23 +1559,29 @@ class Actor: | |||
|         scope.cancel() | ||||
| 
 | ||||
|         # wait for _invoke to mark the task complete | ||||
|         flow_info: str = ( | ||||
|             f'<= canceller: {requesting_uid}\n' | ||||
|             f'=> ipc-parent: {parent_chan}\n' | ||||
|             f'  |_{ctx}\n' | ||||
|         ) | ||||
|         log.runtime( | ||||
|             'Waiting on task to cancel:\n' | ||||
|             f'cid: {cid}\nfunc: {func}\n' | ||||
|             f'peer: {chan.uid}\n' | ||||
|             'Waiting on RPC task to cancel\n' | ||||
|             f'{flow_info}' | ||||
|         ) | ||||
|         await is_complete.wait() | ||||
| 
 | ||||
|         log.runtime( | ||||
|             f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" | ||||
|             f"peer: {chan.uid}\n") | ||||
| 
 | ||||
|             f'Sucessfully cancelled RPC task\n' | ||||
|             f'{flow_info}' | ||||
|         ) | ||||
|         return True | ||||
| 
 | ||||
|     async def cancel_rpc_tasks( | ||||
|         self, | ||||
|         only_chan: Channel | None = None, | ||||
|         requesting_uid: tuple[str, str] | None = None, | ||||
|         req_uid: tuple[str, str], | ||||
| 
 | ||||
|         # NOTE: when None is passed we cancel **all** rpc | ||||
|         # tasks running in this actor! | ||||
|         parent_chan: Channel|None, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|  | @ -1521,38 +1590,76 @@ class Actor: | |||
| 
 | ||||
|         ''' | ||||
|         tasks: dict = self._rpc_tasks | ||||
|         if tasks: | ||||
|             tasks_str: str = '' | ||||
|             for (ctx, func, _) in tasks.values(): | ||||
|                 tasks_str += ( | ||||
|                     f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n' | ||||
|                 ) | ||||
| 
 | ||||
|             log.cancel( | ||||
|                 f'Cancelling all {len(tasks)} rpc tasks:\n' | ||||
|                 f'{tasks_str}' | ||||
|         if not tasks: | ||||
|             log.warning( | ||||
|                 'Actor has no cancellable RPC tasks?\n' | ||||
|                 f'<= cancel requester: {req_uid}\n' | ||||
|                 f'=> {self}\n\n' | ||||
|             ) | ||||
|             for ( | ||||
|                 (chan, cid), | ||||
|                 (ctx, func, is_complete), | ||||
|             ) in tasks.copy().items(): | ||||
|                 if only_chan is not None: | ||||
|                     if only_chan != chan: | ||||
|                         continue | ||||
|             return | ||||
| 
 | ||||
|                 # TODO: this should really done in a nursery batch | ||||
|                 if func != self._cancel_task: | ||||
|                     await self._cancel_task( | ||||
|                         cid, | ||||
|                         chan, | ||||
|                         requesting_uid=requesting_uid, | ||||
|                     ) | ||||
|         # TODO: seriously factor this into some helper funcs XD | ||||
|         tasks_str: str = '' | ||||
|         for (ctx, func, _) in tasks.values(): | ||||
| 
 | ||||
|             log.cancel( | ||||
|                 'Waiting for remaining rpc tasks to complete:\n' | ||||
|                 f'{tasks}' | ||||
|             # TODO: std repr of all primitives in | ||||
|             # a hierarchical tree format, since we can!! | ||||
|             # like => repr for funcs/addrs/msg-typing: | ||||
|             # | ||||
|             # -[ ] use a proper utf8 "arm" like | ||||
|             #     `stackscope` has! | ||||
|             # -[ ] for typed msging, show the | ||||
|             #      py-type-annot style? | ||||
|             #  - maybe auto-gen via `inspect` / `typing` type-sig: | ||||
|             #   https://stackoverflow.com/a/57110117 | ||||
|             #   => see ex. code pasted into `.msg.types` | ||||
|             # | ||||
|             # -[ ] proper .maddr() for IPC primitives? | ||||
|             #   - `Channel.maddr() -> str:` obvi! | ||||
|             #   - `Context.maddr() -> str:` | ||||
|             tasks_str += ( | ||||
|                 f' |_@ /ipv4/tcp/cid="{ctx.cid[-16:]} .."\n' | ||||
|                 f'   |>> {ctx._nsf}() -> dict:\n' | ||||
|             ) | ||||
|             await self._ongoing_rpc_tasks.wait() | ||||
| 
 | ||||
|         log.cancel( | ||||
|             f'Cancelling all {len(tasks)} rpc tasks:\n\n' | ||||
|             f'<= .cancel() from {req_uid}\n' | ||||
|             f'{self}\n' | ||||
|             f'{tasks_str}' | ||||
|         ) | ||||
|         for ( | ||||
|             (task_caller_chan, cid), | ||||
|             (ctx, func, is_complete), | ||||
|         ) in tasks.copy().items(): | ||||
| 
 | ||||
|             if ( | ||||
|                 # maybe filter to specific IPC channel? | ||||
|                 (parent_chan | ||||
|                  and | ||||
|                  task_caller_chan != parent_chan) | ||||
| 
 | ||||
|                 # never "cancel-a-cancel" XD | ||||
|                 or (func == self._cancel_task) | ||||
|             ): | ||||
|                 continue | ||||
| 
 | ||||
|             # if func == self._cancel_task: | ||||
|             #     continue | ||||
| 
 | ||||
|             # TODO: this maybe block on the task cancellation | ||||
|             # and so should really done in a nursery batch? | ||||
|             await self._cancel_task( | ||||
|                 cid, | ||||
|                 task_caller_chan, | ||||
|                 requesting_uid=req_uid, | ||||
|             ) | ||||
| 
 | ||||
|         log.cancel( | ||||
|             'Waiting for remaining rpc tasks to complete\n' | ||||
|             f'|_{tasks}' | ||||
|         ) | ||||
|         await self._ongoing_rpc_tasks.wait() | ||||
| 
 | ||||
|     def cancel_server(self) -> None: | ||||
|         ''' | ||||
|  | @ -1943,10 +2050,11 @@ async def process_messages( | |||
|                     f'=> {ns}.{funcname}({kwargs})\n' | ||||
|                 ) | ||||
|                 if ns == 'self': | ||||
|                     uid: tuple = chan.uid | ||||
|                     if funcname == 'cancel': | ||||
|                         func: Callable = actor.cancel | ||||
|                         kwargs['requesting_uid'] = uid | ||||
|                         kwargs |= { | ||||
|                             'req_chan': chan, | ||||
|                         } | ||||
| 
 | ||||
|                         # don't start entire actor runtime cancellation | ||||
|                         # if this actor is currently in debug mode! | ||||
|  | @ -1960,11 +2068,6 @@ async def process_messages( | |||
|                         # and immediately start the core runtime | ||||
|                         # machinery shutdown! | ||||
|                         with CancelScope(shield=True): | ||||
|                             log.cancel( | ||||
|                                 f'Cancel request for `Actor` runtime\n' | ||||
|                                 f'<= canceller: {uid}\n' | ||||
|                                 # f'=> uid: {actor.uid}\n' | ||||
|                             ) | ||||
|                             await _invoke( | ||||
|                                 actor, | ||||
|                                 cid, | ||||
|  | @ -1974,25 +2077,32 @@ async def process_messages( | |||
|                                 is_rpc=False, | ||||
|                             ) | ||||
| 
 | ||||
|                         log.cancel( | ||||
|                             f'Cancelling IPC msg-loop with {chan.uid}' | ||||
|                         log.runtime( | ||||
|                             'Cancelling IPC transport msg-loop with peer:\n' | ||||
|                             f'|_{chan}\n' | ||||
|                         ) | ||||
|                         loop_cs.cancel() | ||||
|                         break | ||||
| 
 | ||||
|                     if funcname == '_cancel_task': | ||||
|                         func = actor._cancel_task | ||||
|                         func: Callable = actor._cancel_task | ||||
| 
 | ||||
|                         # we immediately start the runtime machinery | ||||
|                         # shutdown | ||||
|                         # with CancelScope(shield=True): | ||||
|                         kwargs['chan'] = chan | ||||
|                         target_cid = kwargs['cid'] | ||||
|                         kwargs['requesting_uid'] = chan.uid | ||||
|                         target_cid: str = kwargs['cid'] | ||||
|                         kwargs |= { | ||||
|                             # NOTE: ONLY the rpc-task-owning | ||||
|                             # parent IPC channel should be able to | ||||
|                             # cancel it! | ||||
|                             'parent_chan': chan, | ||||
|                             'requesting_uid': chan.uid, | ||||
|                         } | ||||
|                         log.cancel( | ||||
|                             f'Rx task cancel request\n' | ||||
|                             f'<= canceller: {chan.uid}\n' | ||||
|                             f'=> uid: {actor.uid}\n' | ||||
|                             f'  |_{chan}\n\n' | ||||
|                             f'=> {actor}\n' | ||||
|                             f'  |_cid: {target_cid}\n' | ||||
|                         ) | ||||
|                         try: | ||||
|  | @ -2005,8 +2115,13 @@ async def process_messages( | |||
|                                 is_rpc=False, | ||||
|                             ) | ||||
|                         except BaseException: | ||||
|                             log.exception("failed to cancel task?") | ||||
| 
 | ||||
|                             log.exception( | ||||
|                                 'Failed to cancel task?\n' | ||||
|                                 f'<= canceller: {chan.uid}\n' | ||||
|                                 f'  |_{chan}\n\n' | ||||
|                                 f'=> {actor}\n' | ||||
|                                 f'  |_cid: {target_cid}\n' | ||||
|                             ) | ||||
|                         continue | ||||
|                     else: | ||||
|                         # normally registry methods, eg. | ||||
|  | @ -2025,9 +2140,25 @@ async def process_messages( | |||
|                         await chan.send(err_msg) | ||||
|                         continue | ||||
| 
 | ||||
|                 # spin up a task for the requested function | ||||
|                 log.runtime(f"Spawning task for {func}") | ||||
|                 assert actor._service_n | ||||
|                 # schedule a task for the requested RPC function | ||||
|                 # in the actor's main "service nursery". | ||||
|                 # TODO: possibly a service-tn per IPC channel for | ||||
|                 # supervision isolation? would avoid having to | ||||
|                 # manage RPC tasks individually in `._rpc_tasks` | ||||
|                 # table? | ||||
|                 log.runtime( | ||||
|                     f'Spawning task for RPC request\n' | ||||
|                     f'<= caller: {chan.uid}\n' | ||||
|                     f'  |_{chan}\n\n' | ||||
|                     # TODO: maddr style repr? | ||||
|                     # f'  |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' | ||||
|                     # f'cid="{cid[-16:]} .."\n\n' | ||||
| 
 | ||||
|                     f'=> {actor}\n' | ||||
|                     f'  |_cid: {cid}\n' | ||||
|                     f'   |>> {func}()\n' | ||||
|                 ) | ||||
|                 assert actor._service_n  # wait why? do it at top? | ||||
|                 try: | ||||
|                     ctx: Context = await actor._service_n.start( | ||||
|                         partial( | ||||
|  | @ -2085,7 +2216,13 @@ async def process_messages( | |||
|             log.runtime( | ||||
|                 f"{chan} for {chan.uid} disconnected, cancelling tasks" | ||||
|             ) | ||||
|             await actor.cancel_rpc_tasks(chan) | ||||
|             await actor.cancel_rpc_tasks( | ||||
|                 req_uid=actor.uid, | ||||
|                 # a "self cancel" in terms of the lifetime of the | ||||
|                 # IPC connection which is presumed to be the | ||||
|                 # source of any requests for spawned tasks. | ||||
|                 parent_chan=chan, | ||||
|             ) | ||||
| 
 | ||||
|     except ( | ||||
|         TransportClosed, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue