Compare commits
	
		
			4 Commits 
		
	
	
		
			af3745684c
			...
			9be821a5cf
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 9be821a5cf | |
|  | b46400a86f | |
|  | 02812b9f51 | |
|  | 3c5816c977 | 
|  | @ -933,13 +933,14 @@ class Context: | |||
|         self.cancel_called = True | ||||
| 
 | ||||
|         header: str = ( | ||||
|             f'Cancelling ctx with peer from {side.upper()} side\n\n' | ||||
|             f'Cancelling ctx from {side.upper()}-side\n' | ||||
|         ) | ||||
|         reminfo: str = ( | ||||
|             # ' =>\n' | ||||
|             f'Context.cancel() => {self.chan.uid}\n' | ||||
|             # f'Context.cancel() => {self.chan.uid}\n' | ||||
|             f'c)=> {self.chan.uid}\n' | ||||
|             # f'{self.chan.uid}\n' | ||||
|             f'  |_ @{self.dst_maddr}\n' | ||||
|             f' |_ @{self.dst_maddr}\n' | ||||
|             f'    >> {self.repr_rpc}\n' | ||||
|             # f'    >> {self._nsf}() -> {codec}[dict]:\n\n' | ||||
|             # TODO: pull msg-type from spec re #320 | ||||
|  | @ -1267,6 +1268,12 @@ class Context: | |||
| 
 | ||||
|     @property | ||||
|     def maybe_error(self) -> BaseException|None: | ||||
|         ''' | ||||
|         Return the (remote) error as outcome or `None`. | ||||
| 
 | ||||
|         Remote errors take precedence over local ones. | ||||
| 
 | ||||
|         ''' | ||||
|         le: BaseException|None = self._local_error | ||||
|         re: RemoteActorError|ContextCancelled|None = self._remote_error | ||||
| 
 | ||||
|  | @ -2182,9 +2189,16 @@ async def open_context_from_portal( | |||
|         # handled in the block above ^^^ !! | ||||
|         # await _debug.pause() | ||||
|         # log.cancel( | ||||
|         log.exception( | ||||
|             f'{ctx.side}-side of `Context` terminated with ' | ||||
|             f'.outcome => {ctx.repr_outcome()}\n' | ||||
|         match scope_err: | ||||
|             case trio.Cancelled: | ||||
|                 logmeth = log.cancel | ||||
| 
 | ||||
|             # XXX explicitly report on any non-graceful-taskc cases | ||||
|             case _: | ||||
|                 logmeth = log.exception | ||||
| 
 | ||||
|         logmeth( | ||||
|             f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n' | ||||
|         ) | ||||
| 
 | ||||
|         if debug_mode(): | ||||
|  |  | |||
|  | @ -265,7 +265,7 @@ def _trio_main( | |||
|     except BaseException as err: | ||||
|         logmeth = log.error | ||||
|         exit_status: str = ( | ||||
|             'Main actor task crashed during exit?\n' | ||||
|             'Main actor task exited due to crash?\n' | ||||
|             + | ||||
|             nest_from_op( | ||||
|                 input_op='x)>',  # closed by error | ||||
|  |  | |||
|  | @ -97,7 +97,7 @@ class Portal: | |||
|         channel: Channel, | ||||
|     ) -> None: | ||||
| 
 | ||||
|         self.chan = channel | ||||
|         self._chan: Channel = channel | ||||
|         # during the portal's lifetime | ||||
|         self._final_result_pld: Any|None = None | ||||
|         self._final_result_msg: PayloadMsg|None = None | ||||
|  | @ -109,6 +109,10 @@ class Portal: | |||
|         self._streams: set[MsgStream] = set() | ||||
|         self.actor: Actor = current_actor() | ||||
| 
 | ||||
|     @property | ||||
|     def chan(self) -> Channel: | ||||
|         return self._chan | ||||
| 
 | ||||
|     @property | ||||
|     def channel(self) -> Channel: | ||||
|         ''' | ||||
|  |  | |||
|  | @ -66,10 +66,11 @@ from trio import ( | |||
| ) | ||||
| 
 | ||||
| from tractor.msg import ( | ||||
|     pretty_struct, | ||||
|     NamespacePath, | ||||
|     types as msgtypes, | ||||
|     MsgType, | ||||
|     NamespacePath, | ||||
|     Stop, | ||||
|     pretty_struct, | ||||
|     types as msgtypes, | ||||
| ) | ||||
| from ._ipc import Channel | ||||
| from ._context import ( | ||||
|  | @ -545,7 +546,8 @@ class Actor: | |||
|             ): | ||||
|                 log.cancel( | ||||
|                     'Waiting on cancel request to peer\n' | ||||
|                     f'`Portal.cancel_actor()` => {chan.uid}\n' | ||||
|                     f'c)=>\n' | ||||
|                     f' |_{chan.uid}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 # XXX: this is a soft wait on the channel (and its | ||||
|  | @ -642,12 +644,14 @@ class Actor: | |||
|                         # and | ||||
|                         an_exit_cs.cancelled_caught | ||||
|                     ): | ||||
|                         log.warning( | ||||
|                         report: str = ( | ||||
|                             'Timed out waiting on local actor-nursery to exit?\n' | ||||
|                             f'{local_nursery}\n' | ||||
|                             f' |_{pformat(local_nursery._children)}\n' | ||||
|                         ) | ||||
|                         # await _debug.pause() | ||||
|                         if children := local_nursery._children: | ||||
|                             report += f' |_{pformat(children)}\n' | ||||
| 
 | ||||
|                         log.warning(report) | ||||
| 
 | ||||
|                 if disconnected: | ||||
|                     # if the transport died and this actor is still | ||||
|  | @ -819,14 +823,17 @@ class Actor: | |||
|                 # side, | ||||
|             )] | ||||
|         except KeyError: | ||||
|             log.warning( | ||||
|             report: str = ( | ||||
|                 'Ignoring invalid IPC ctx msg!\n\n' | ||||
|                 f'<= sender: {uid}\n\n' | ||||
|                 # XXX don't need right since it's always in msg? | ||||
|                 # f'=> cid: {cid}\n\n' | ||||
| 
 | ||||
|                 f'{pretty_struct.pformat(msg)}\n' | ||||
|                 f'<=? {uid}\n\n' | ||||
|                 f'  |_{pretty_struct.pformat(msg)}\n' | ||||
|             ) | ||||
|             match msg: | ||||
|                 case Stop(): | ||||
|                     log.runtime(report) | ||||
|                 case _: | ||||
|                     log.warning(report) | ||||
| 
 | ||||
|             return | ||||
| 
 | ||||
|         # if isinstance(msg, MsgTypeError): | ||||
|  | @ -1338,10 +1345,11 @@ class Actor: | |||
|             return True | ||||
| 
 | ||||
|         log.cancel( | ||||
|             'Cancel request for RPC task\n\n' | ||||
|             f'<= Actor._cancel_task(): {requesting_uid}\n\n' | ||||
|             f'=> {ctx._task}\n' | ||||
|             f'  |_ >> {ctx.repr_rpc}\n' | ||||
|             'Rxed cancel request for RPC task\n' | ||||
|             f'<=c) {requesting_uid}\n' | ||||
|             f'  |_{ctx._task}\n' | ||||
|             f'    >> {ctx.repr_rpc}\n' | ||||
|             # f'=> {ctx._task}\n' | ||||
|             # f'  >> Actor._cancel_task() => {ctx._task}\n' | ||||
|             # f'  |_ {ctx._task}\n\n' | ||||
| 
 | ||||
|  |  | |||
|  | @ -246,8 +246,9 @@ async def hard_kill( | |||
| 
 | ||||
|     ''' | ||||
|     log.cancel( | ||||
|         'Terminating sub-proc:\n' | ||||
|         f'|_{proc}\n' | ||||
|         'Terminating sub-proc\n' | ||||
|         f'>x)\n' | ||||
|         f' |_{proc}\n' | ||||
|     ) | ||||
|     # NOTE: this timeout used to do nothing since we were shielding | ||||
|     # the ``.wait()`` inside ``new_proc()`` which will pretty much | ||||
|  | @ -293,8 +294,8 @@ async def hard_kill( | |||
|         log.critical( | ||||
|             # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' | ||||
|             '#T-800 deployed to collect zombie B0\n' | ||||
|             f'|\n' | ||||
|             f'|_{proc}\n' | ||||
|             f'>x)\n' | ||||
|             f' |_{proc}\n' | ||||
|         ) | ||||
|         proc.kill() | ||||
| 
 | ||||
|  | @ -322,8 +323,9 @@ async def soft_kill( | |||
|     uid: tuple[str, str] = portal.channel.uid | ||||
|     try: | ||||
|         log.cancel( | ||||
|             'Soft killing sub-actor via `Portal.cancel_actor()`\n' | ||||
|             f'|_{proc}\n' | ||||
|             'Soft killing sub-actor via portal request\n' | ||||
|             f'c)> {portal.chan.uid}\n' | ||||
|             f' |_{proc}\n' | ||||
|         ) | ||||
|         # wait on sub-proc to signal termination | ||||
|         await wait_func(proc) | ||||
|  | @ -552,8 +554,9 @@ async def trio_proc( | |||
|             # cancel result waiter that may have been spawned in | ||||
|             # tandem if not done already | ||||
|             log.cancel( | ||||
|                 'Cancelling existing result waiter task for ' | ||||
|                 f'{subactor.uid}' | ||||
|                 'Cancelling portal result reaper task\n' | ||||
|                 f'>c)\n' | ||||
|                 f' |_{subactor.uid}\n' | ||||
|             ) | ||||
|             nursery.cancel_scope.cancel() | ||||
| 
 | ||||
|  | @ -562,7 +565,11 @@ async def trio_proc( | |||
|         # allowed! Do this **after** cancellation/teardown to avoid | ||||
|         # killing the process too early. | ||||
|         if proc: | ||||
|             log.cancel(f'Hard reap sequence starting for {subactor.uid}') | ||||
|             log.cancel( | ||||
|                 f'Hard reap sequence starting for subactor\n' | ||||
|                 f'>x)\n' | ||||
|                 f' |_{subactor}@{subactor.uid}\n' | ||||
|             ) | ||||
| 
 | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 # don't clobber an ongoing pdb | ||||
|  |  | |||
|  | @ -36,8 +36,8 @@ import warnings | |||
| import trio | ||||
| 
 | ||||
| from ._exceptions import ( | ||||
|     # _raise_from_no_key_in_msg, | ||||
|     ContextCancelled, | ||||
|     RemoteActorError, | ||||
| ) | ||||
| from .log import get_logger | ||||
| from .trionics import ( | ||||
|  | @ -101,7 +101,7 @@ class MsgStream(trio.abc.Channel): | |||
|     @property | ||||
|     def ctx(self) -> Context: | ||||
|         ''' | ||||
|         This stream's IPC `Context` ref. | ||||
|         A read-only ref to this stream's inter-actor-task `Context`. | ||||
| 
 | ||||
|         ''' | ||||
|         return self._ctx | ||||
|  | @ -145,9 +145,8 @@ class MsgStream(trio.abc.Channel): | |||
|         ''' | ||||
|         __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|         # NOTE: `trio.ReceiveChannel` implements | ||||
|         # EOC handling as follows (aka uses it | ||||
|         # to gracefully exit async for loops): | ||||
|         # NOTE FYI: `trio.ReceiveChannel` implements EOC handling as | ||||
|         # follows (aka uses it to gracefully exit async for loops): | ||||
|         # | ||||
|         # async def __anext__(self) -> ReceiveType: | ||||
|         #     try: | ||||
|  | @ -165,48 +164,29 @@ class MsgStream(trio.abc.Channel): | |||
| 
 | ||||
|         src_err: Exception|None = None  # orig tb | ||||
|         try: | ||||
| 
 | ||||
|             ctx: Context = self._ctx | ||||
|             return await ctx._pld_rx.recv_pld(ipc=self) | ||||
| 
 | ||||
|         # XXX: the stream terminates on either of: | ||||
|         # - via `self._rx_chan.receive()` raising  after manual closure | ||||
|         #   by the rpc-runtime OR, | ||||
|         # - via a received `{'stop': ...}` msg from remote side. | ||||
|         #   |_ NOTE: previously this was triggered by calling | ||||
|         #   ``._rx_chan.aclose()`` on the send side of the channel inside | ||||
|         #   `Actor._deliver_ctx_payload()`, but now the 'stop' message handling | ||||
|         #   has been put just above inside `_raise_from_no_key_in_msg()`. | ||||
|         except ( | ||||
|             trio.EndOfChannel, | ||||
|         ) as eoc: | ||||
|             src_err = eoc | ||||
|         # - `self._rx_chan.receive()` raising  after manual closure | ||||
|         #   by the rpc-runtime, | ||||
|         #   OR | ||||
|         # - via a `Stop`-msg received from remote peer task. | ||||
|         #   NOTE | ||||
|         #   |_ previously this was triggered by calling | ||||
|         #   ``._rx_chan.aclose()`` on the send side of the channel | ||||
|         #   inside `Actor._deliver_ctx_payload()`, but now the 'stop' | ||||
|         #   message handling gets delegated to `PldRFx.recv_pld()` | ||||
|         #   internals. | ||||
|         except trio.EndOfChannel as eoc: | ||||
|             # a graceful stream finished signal | ||||
|             self._eoc = eoc | ||||
|             src_err = eoc | ||||
| 
 | ||||
|             # TODO: Locally, we want to close this stream gracefully, by | ||||
|             # terminating any local consumers tasks deterministically. | ||||
|             # Once we have broadcast support, we **don't** want to be | ||||
|             # closing this stream and not flushing a final value to | ||||
|             # remaining (clone) consumers who may not have been | ||||
|             # scheduled to receive it yet. | ||||
|             # try: | ||||
|             #     maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait() | ||||
|             #     if maybe_err_msg_or_res: | ||||
|             #         log.warning( | ||||
|             #             'Discarding un-processed msg:\n' | ||||
|             #             f'{maybe_err_msg_or_res}' | ||||
|             #         ) | ||||
|             # except trio.WouldBlock: | ||||
|             #     # no queued msgs that might be another remote | ||||
|             #     # error, so just raise the original EoC | ||||
|             #     pass | ||||
| 
 | ||||
|             # raise eoc | ||||
| 
 | ||||
|         # a ``ClosedResourceError`` indicates that the internal | ||||
|         # feeder memory receive channel was closed likely by the | ||||
|         # runtime after the associated transport-channel | ||||
|         # disconnected or broke. | ||||
|         # a `ClosedResourceError` indicates that the internal feeder | ||||
|         # memory receive channel was closed likely by the runtime | ||||
|         # after the associated transport-channel disconnected or | ||||
|         # broke. | ||||
|         except trio.ClosedResourceError as cre:  # by self._rx_chan.receive() | ||||
|             src_err = cre | ||||
|             log.warning( | ||||
|  | @ -218,14 +198,15 @@ class MsgStream(trio.abc.Channel): | |||
|         # terminated and signal this local iterator to stop | ||||
|         drained: list[Exception|dict] = await self.aclose() | ||||
|         if drained: | ||||
|             # ?TODO? pass these to the `._ctx._drained_msgs: deque` | ||||
|             # and then iterate them as part of any `.wait_for_result()` call? | ||||
|             # | ||||
|             # from .devx import pause | ||||
|             # await pause() | ||||
|             log.warning( | ||||
|                 'Drained context msgs during closure:\n' | ||||
|                 'Drained context msgs during closure\n\n' | ||||
|                 f'{drained}' | ||||
|             ) | ||||
|         # TODO: pass these to the `._ctx._drained_msgs: deque` | ||||
|         # and then iterate them as part of any `.result()` call? | ||||
| 
 | ||||
|         # NOTE XXX: if the context was cancelled or remote-errored | ||||
|         # but we received the stream close msg first, we | ||||
|  | @ -238,28 +219,36 @@ class MsgStream(trio.abc.Channel): | |||
|             from_src_exc=src_err, | ||||
|         ) | ||||
| 
 | ||||
|         # propagate any error but hide low-level frame details | ||||
|         # from the caller by default for debug noise reduction. | ||||
|         # propagate any error but hide low-level frame details from | ||||
|         # the caller by default for console/debug-REPL noise | ||||
|         # reduction. | ||||
|         if ( | ||||
|             hide_tb | ||||
|             and ( | ||||
| 
 | ||||
|             # XXX NOTE XXX don't reraise on certain | ||||
|             # stream-specific internal error types like, | ||||
|             # | ||||
|             # - `trio.EoC` since we want to use the exact instance | ||||
|             #   to ensure that it is the error that bubbles upward | ||||
|             #   for silent absorption by `Context.open_stream()`. | ||||
|             and not self._eoc | ||||
|                 # XXX NOTE special conditions: don't reraise on | ||||
|                 # certain stream-specific internal error types like, | ||||
|                 # | ||||
|                 # - `trio.EoC` since we want to use the exact instance | ||||
|                 #   to ensure that it is the error that bubbles upward | ||||
|                 #   for silent absorption by `Context.open_stream()`. | ||||
|                 not self._eoc | ||||
| 
 | ||||
|             # - `RemoteActorError` (or `ContextCancelled`) if it gets | ||||
|             #   raised from `_raise_from_no_key_in_msg()` since we | ||||
|             #   want the same (as the above bullet) for any | ||||
|             #   `.open_context()` block bubbled error raised by | ||||
|             #   any nearby ctx API remote-failures. | ||||
|             # and not isinstance(src_err, RemoteActorError) | ||||
|                 # - `RemoteActorError` (or subtypes like ctxc) | ||||
|                 #    since we want to present the error as though it is | ||||
|                 #    "sourced" directly from this `.receive()` call and | ||||
|                 #    generally NOT include the stack frames raised from | ||||
|                 #    inside the `PldRx` and/or the transport stack | ||||
|                 #    layers. | ||||
|                 or isinstance(src_err, RemoteActorError) | ||||
|             ) | ||||
|         ): | ||||
|             raise type(src_err)(*src_err.args) from src_err | ||||
|         else: | ||||
|             # for any non-graceful-EOC we want to NOT hide this frame | ||||
|             if not self._eoc: | ||||
|                 __tracebackhide__: bool = False | ||||
| 
 | ||||
|             raise src_err | ||||
| 
 | ||||
|     async def aclose(self) -> list[Exception|dict]: | ||||
|  | @ -385,6 +374,8 @@ class MsgStream(trio.abc.Channel): | |||
|         if not self._eoc: | ||||
|             message: str = ( | ||||
|                 f'Stream self-closed by {self._ctx.side!r}-side before EoC\n' | ||||
|                 # } bc a stream is a "scope"/msging-phase inside an IPC | ||||
|                 f'x}}>\n' | ||||
|                 f'|_{self}\n' | ||||
|             ) | ||||
|             log.cancel(message) | ||||
|  |  | |||
|  | @ -299,7 +299,6 @@ class Lock: | |||
|     @pdbp.hideframe | ||||
|     def release( | ||||
|         cls, | ||||
|         force: bool = False, | ||||
|         raise_on_thread: bool = True, | ||||
| 
 | ||||
|     ) -> bool: | ||||
|  | @ -347,12 +346,9 @@ class Lock: | |||
|             lock: trio.StrictFIFOLock = cls._debug_lock | ||||
|             owner: Task = lock.statistics().owner | ||||
|             if ( | ||||
|                 (lock.locked() or force) | ||||
|                 # ^-TODO-NOTE-^ should we just remove this, since the | ||||
|                 # RTE case above will always happen when you force | ||||
|                 # from the wrong task? | ||||
| 
 | ||||
|                 and (owner is task) | ||||
|                 lock.locked() | ||||
|                 and | ||||
|                 (owner is task) | ||||
|                 # ^-NOTE-^ if we do NOT ensure this, `trio` will | ||||
|                 # raise a RTE when a non-owner tries to releasee the | ||||
|                 # lock. | ||||
|  | @ -553,6 +549,7 @@ async def lock_stdio_for_peer( | |||
|     # can try to avoid clobbering any connection from a child | ||||
|     # that's currently relying on it. | ||||
|     we_finished = Lock.req_handler_finished = trio.Event() | ||||
|     lock_blocked: bool = False | ||||
|     try: | ||||
|         if ctx.cid in Lock._blocked: | ||||
|             raise RuntimeError( | ||||
|  | @ -565,7 +562,8 @@ async def lock_stdio_for_peer( | |||
|                 'Consider that an internal bug exists given the TTY ' | ||||
|                 '`Lock`ing IPC dialog..\n' | ||||
|             ) | ||||
| 
 | ||||
|         Lock._blocked.add(ctx.cid) | ||||
|         lock_blocked = True | ||||
|         root_task_name: str = current_task().name | ||||
|         if tuple(subactor_uid) in Lock._blocked: | ||||
|             log.warning( | ||||
|  | @ -575,7 +573,11 @@ async def lock_stdio_for_peer( | |||
|             ) | ||||
|             ctx._enter_debugger_on_cancel: bool = False | ||||
|             message: str = ( | ||||
|                 f'Debug lock blocked for {subactor_uid}\n' | ||||
|                 f'Debug lock blocked for subactor\n\n' | ||||
|                 f'x)<= {subactor_uid}\n\n' | ||||
| 
 | ||||
|                 f'Likely because the root actor already started shutdown and is ' | ||||
|                 'closing IPC connections for this child!\n\n' | ||||
|                 'Cancelling debug request!\n' | ||||
|             ) | ||||
|             log.cancel(message) | ||||
|  | @ -589,7 +591,6 @@ async def lock_stdio_for_peer( | |||
|             f'remote task: {subactor_task_uid}\n' | ||||
|         ) | ||||
|         DebugStatus.shield_sigint() | ||||
|         Lock._blocked.add(ctx.cid) | ||||
| 
 | ||||
|         # NOTE: we use the IPC ctx's cancel scope directly in order to | ||||
|         # ensure that on any transport failure, or cancellation request | ||||
|  | @ -648,31 +649,34 @@ async def lock_stdio_for_peer( | |||
|         ) | ||||
| 
 | ||||
|     except BaseException as req_err: | ||||
|         message: str = ( | ||||
|             f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n' | ||||
|             'Forcing `Lock.release()` for req-ctx since likely an ' | ||||
|             'internal error!\n\n' | ||||
|             f'{ctx}' | ||||
|         fail_reason: str = ( | ||||
|             f'on behalf of peer\n\n' | ||||
|             f'x)<=\n' | ||||
|             f'  |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n\n' | ||||
| 
 | ||||
|             'Forcing `Lock.release()` due to acquire failure!\n\n' | ||||
|             f'x)=> {ctx}\n' | ||||
|         ) | ||||
|         if isinstance(req_err, trio.Cancelled): | ||||
|             message = ( | ||||
|                 'Cancelled during root TTY-lock dialog\n' | ||||
|             fail_reason = ( | ||||
|                 'Cancelled during stdio-mutex request ' | ||||
|                 + | ||||
|                 message | ||||
|                 fail_reason | ||||
|             ) | ||||
|         else: | ||||
|             message = ( | ||||
|                 'Errored during root TTY-lock dialog\n' | ||||
|             fail_reason = ( | ||||
|                 'Failed to deliver stdio-mutex request ' | ||||
|                 + | ||||
|                 message | ||||
|                 fail_reason | ||||
|             ) | ||||
| 
 | ||||
|         log.exception(message) | ||||
|         Lock.release() #force=True) | ||||
|         log.exception(fail_reason) | ||||
|         Lock.release() | ||||
|         raise | ||||
| 
 | ||||
|     finally: | ||||
|         Lock._blocked.remove(ctx.cid) | ||||
|         if lock_blocked: | ||||
|             Lock._blocked.remove(ctx.cid) | ||||
| 
 | ||||
|         # wakeup any waiters since the lock was (presumably) | ||||
|         # released, possibly only temporarily. | ||||
|  | @ -1167,7 +1171,7 @@ async def request_root_stdio_lock( | |||
|             ): | ||||
|                 log.cancel( | ||||
|                     'Debug lock request was CANCELLED?\n\n' | ||||
|                     f'{req_ctx}\n' | ||||
|                     f'<=c) {req_ctx}\n' | ||||
|                     # f'{pformat_cs(req_cs, var_name="req_cs")}\n\n' | ||||
|                     # f'{pformat_cs(req_ctx._scope, var_name="req_ctx._scope")}\n\n' | ||||
|                 ) | ||||
|  | @ -1179,22 +1183,26 @@ async def request_root_stdio_lock( | |||
|                 message: str = ( | ||||
|                     'Failed during debug request dialog with root actor?\n\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 if req_ctx: | ||||
|                 if (req_ctx := DebugStatus.req_ctx): | ||||
|                     message += ( | ||||
|                         f'{req_ctx}\n' | ||||
|                         f'<=x) {req_ctx}\n\n' | ||||
|                         f'Cancelling IPC ctx!\n' | ||||
|                     ) | ||||
|                     await req_ctx.cancel() | ||||
|                     try: | ||||
|                         await req_ctx.cancel() | ||||
|                     except trio.ClosedResourceError  as terr: | ||||
|                         ctx_err.add_note( | ||||
|                             # f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` ' | ||||
|                             f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} ' | ||||
|                         ) | ||||
| 
 | ||||
|                 else: | ||||
|                     message += 'Failed during `Portal.open_context()` ?\n' | ||||
|                     message += 'Failed in `Portal.open_context()` call ??\n' | ||||
| 
 | ||||
|                 log.exception(message) | ||||
|                 ctx_err.add_note(message) | ||||
|                 raise ctx_err | ||||
| 
 | ||||
| 
 | ||||
|     except ( | ||||
|         tractor.ContextCancelled, | ||||
|         trio.Cancelled, | ||||
|  | @ -1218,9 +1226,10 @@ async def request_root_stdio_lock( | |||
|         # -[ ]FURTHER, after we 'continue', we should be able to | ||||
|         #   ctl-c out of the currently hanging task!  | ||||
|         raise DebugRequestError( | ||||
|             'Failed to lock stdio from subactor IPC ctx!\n\n' | ||||
|             'Failed during stdio-locking dialog from root actor\n\n' | ||||
| 
 | ||||
|             f'req_ctx: {DebugStatus.req_ctx}\n' | ||||
|             f'<=x)\n' | ||||
|             f'|_{DebugStatus.req_ctx}\n' | ||||
|         ) from req_err | ||||
| 
 | ||||
|     finally: | ||||
|  | @ -1998,10 +2007,10 @@ async def _pause( | |||
|         # sanity, for when hackin on all this? | ||||
|         if not isinstance(pause_err, trio.Cancelled): | ||||
|             req_ctx: Context = DebugStatus.req_ctx | ||||
|             if req_ctx: | ||||
|                 # XXX, bc the child-task in root might cancel it? | ||||
|                 # assert req_ctx._scope.cancel_called | ||||
|                 assert req_ctx.maybe_error | ||||
|             # if req_ctx: | ||||
|             #     # XXX, bc the child-task in root might cancel it? | ||||
|             #     # assert req_ctx._scope.cancel_called | ||||
|             #     assert req_ctx.maybe_error | ||||
| 
 | ||||
|         raise | ||||
| 
 | ||||
|  | @ -2041,11 +2050,12 @@ def _set_trace( | |||
|     # root here? Bo | ||||
|     log.pdb( | ||||
|         f'{_pause_msg}\n' | ||||
|         '|\n' | ||||
|         # TODO: more compact pformating? | ||||
|         # '|\n' | ||||
|         f'>(\n' | ||||
|         f' |_ {task} @ {actor.uid}\n' | ||||
|         # ^-TODO-^ more compact pformating? | ||||
|         # -[ ] make an `Actor.__repr()__` | ||||
|         # -[ ] should we use `log.pformat_task_uid()`? | ||||
|         f'|_ {task} @ {actor.uid}\n' | ||||
|     ) | ||||
|     # presuming the caller passed in the "api frame" | ||||
|     # (the last frame before user code - like `.pause()`) | ||||
|  | @ -2541,9 +2551,9 @@ def _post_mortem( | |||
|     # here! Bo | ||||
|     log.pdb( | ||||
|         f'{_crash_msg}\n' | ||||
|         '|\n' | ||||
|         # f'|_ {current_task()}\n' | ||||
|         f'|_ {current_task()} @ {actor.uid}\n' | ||||
|         # '|\n' | ||||
|         f'x>(\n' | ||||
|         f'  |_ {current_task()} @ {actor.uid}\n' | ||||
| 
 | ||||
|         # f'|_ @{actor.uid}\n' | ||||
|         # TODO: make an `Actor.__repr()__` | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue