diff --git a/tractor/_context.py b/tractor/_context.py index 0df1e80..c14f16b 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -86,30 +86,51 @@ class Context: ''' chan: Channel - cid: str + cid: str # "context id", more or less a unique linked-task-pair id - # these are the "feeder" channels for delivering - # message values to the local task from the runtime - # msg processing loop. + # the "feeder" channels for delivering message values to the + # local task from the runtime's msg processing loop. _recv_chan: trio.MemoryReceiveChannel _send_chan: trio.MemorySendChannel + # the "invocation type" of the far end task-entry-point + # function, normally matching a logic block inside + # `._runtime.invoke()`. _remote_func_type: str | None = None - # only set on the caller side - _portal: Portal | None = None # type: ignore # noqa + # NOTE: (for now) only set (a portal) on the caller side since + # the callee doesn't generally need a ref to one and should + # normally need to explicitly ask for handle to its peer if + # more the the `Context` is needed? + _portal: Portal | None = None + + # NOTE: each side of the context has its own cancel scope + # which is exactly the primitive that allows for + # cross-actor-task-supervision and thus SC. + _scope: trio.CancelScope | None = None _result: Any | int = None _remote_error: BaseException | None = None # cancellation state _cancel_called: bool = False # did WE cancel the far end? _cancelled_remote: tuple[str, str] | None = None - _cancel_msg: str | None = None - _scope: trio.CancelScope | None = None - # NOTE: this is set by the `.devx._debug` machinery - # to indicate whether code in `._runtime` should handle - # cancelled context crashes in the pdbp REPL. + # NOTE: we try to ensure assignment of a "cancel msg" since + # there's always going to be an "underlying reason" that any + # context was closed due to either a remote side error or + # a call to `.cancel()` which triggers `ContextCancelled`. + _cancel_msg: str | dict | None = None + + # NOTE: this state var used by the runtime to determine if the + # `pdbp` REPL is allowed to engage on contexts terminated via + # a `ContextCancelled` due to a call to `.cancel()` triggering + # "graceful closure" on either side: + # - `._runtime._invoke()` will check this flag before engaging + # the crash handler REPL in such cases where the "callee" + # raises the cancellation, + # - `.devx._debug.lock_tty_for_child()` will set it to `False` if + # the global tty-lock has been configured to filter out some + # actors from being able to acquire the debugger lock. _enter_debugger_on_cancel: bool = True @property @@ -177,36 +198,71 @@ class Context: async def _maybe_cancel_and_set_remote_error( self, - error_msg: dict[str, Any], + error: BaseException, ) -> None: ''' - (Maybe) unpack and raise a msg error into the local scope - nursery for this context. + (Maybe) cancel this local scope due to a received remote + error (normally via an IPC msg) which the actor runtime + routes to this context. - Acts as a form of "relay" for a remote error raised - in the corresponding remote callee task. + Acts as a form of "relay" for a remote error raised in the + corresponding remote task's `Context` wherein the next time + the local task exectutes a checkpoint, a `trio.Cancelled` + will be raised and depending on the type and source of the + original remote error, and whether or not the local task + called `.cancel()` itself prior, an equivalent + `ContextCancelled` or `RemoteActorError` wrapping the + remote error may be raised here by any of, + + - `Portal.open_context()` + - `Portal.result()` + - `Context.open_stream()` + - `Context.result()` + + when called/closed by actor local task(s). + + NOTEs & TODOs: + - It is expected that the caller has previously unwrapped + the remote error using a call to `unpack_error()` and + provides that output exception value as the input + `error` argument here. + - If this is an error message from a context opened by + `Portal.open_context()` we want to interrupt any + ongoing local tasks operating within that `Context`'s + cancel-scope so as to be notified ASAP of the remote + error and engage any caller handling (eg. for + cross-process task supervision). + - In some cases we may want to raise the remote error + immediately since there is no guarantee the locally + operating task(s) will attempt to execute a checkpoint + any time soon; in such cases there are 2 possible + approaches depending on the current task's work and + wrapping "thread" type: + + - `trio`-native-and-graceful: only ever wait for tasks + to exec a next `trio.lowlevel.checkpoint()` assuming + that any such task must do so to interact with the + actor runtime and IPC interfaces. + + - (NOT IMPLEMENTED) system-level-aggressive: maybe we + could eventually interrupt sync code (invoked using + `trio.to_thread` or some other adapter layer) with + a signal (a custom unix one for example? + https://stackoverflow.com/a/5744185) depending on the + task's wrapping thread-type such that long running + sync code should never cause the delay of actor + supervision tasks such as cancellation and respawn + logic. ''' - # If this is an error message from a context opened by - # ``Portal.open_context()`` we want to interrupt any ongoing - # (child) tasks within that context to be notified of the remote - # error relayed here. - # - # The reason we may want to raise the remote error immediately - # is that there is no guarantee the associated local task(s) - # will attempt to read from any locally opened stream any time - # soon. - # - # NOTE: this only applies when - # ``Portal.open_context()`` has been called since it is assumed - # (currently) that other portal APIs (``Portal.run()``, - # ``.run_in_actor()``) do their own error checking at the point - # of the call and result processing. - error = unpack_error( - error_msg, - self.chan, - ) + # XXX: currently this should only be used when + # `Portal.open_context()` has been opened since it's + # assumed that other portal APIs like, + # - `Portal.run()`, + # - `ActorNursery.run_in_actor()` + # do their own error checking at their own call points and + # result processing. # XXX: set the remote side's error so that after we cancel # whatever task is the opener of this context it can raise @@ -236,35 +292,25 @@ class Context: else: log.error( f'Remote context error for {self.chan.uid}:{self.cid}:\n' - f'{error_msg["error"]["tb_str"]}' + f'{error}' ) # TODO: tempted to **not** do this by-reraising in a # nursery and instead cancel a surrounding scope, detect # the cancellation, then lookup the error that was set? # YES! this is way better and simpler! - if ( - self._scope - ): + if self._scope: # from trio.testing import wait_all_tasks_blocked # await wait_all_tasks_blocked() # self._cancelled_remote = self.chan.uid self._scope.cancel() - # NOTE: this usage actually works here B) - # from .devx._debug import breakpoint - # await breakpoint() - - # XXX: this will break early callee results sending - # since when `.result()` is finally called, this - # chan will be closed.. - # if self._recv_chan: - # await self._recv_chan.aclose() + # this REPL usage actually works here BD + # from .devx._debug import pause + # await pause() async def cancel( self, - msg: str | None = None, timeout: float = 0.616, - # timeout: float = 1000, ) -> None: ''' @@ -274,15 +320,12 @@ class Context: Timeout quickly in an attempt to sidestep 2-generals... ''' - side = 'caller' if self._portal else 'callee' - if msg: - assert side == 'callee', 'Only callee side can provide cancel msg' + side: str = 'caller' if self._portal else 'callee' + log.cancel( + f'Cancelling {side} side of context to {self.chan.uid}' + ) - log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') - - self._cancel_called = True - # await devx._debug.breakpoint() - # breakpoint() + self._cancel_called: bool = True if side == 'caller': if not self._portal: @@ -290,12 +333,13 @@ class Context: "No portal found, this is likely a callee side context" ) - cid = self.cid + cid: str = self.cid with trio.move_on_after(timeout) as cs: cs.shield = True log.cancel( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") + f'Cancelling stream {cid} to ' + f'{self._portal.channel.uid}' + ) # NOTE: we're telling the far end actor to cancel a task # corresponding to *this actor*. The far end local channel @@ -314,17 +358,17 @@ class Context: # if not self._portal.channel.connected(): if not self.chan.connected(): log.cancel( - "May have failed to cancel remote task " - f"{cid} for {self._portal.channel.uid}") + 'May have failed to cancel remote task ' + f'{cid} for {self._portal.channel.uid}' + ) else: log.cancel( - "Timed out on cancelling remote task " - f"{cid} for {self._portal.channel.uid}") + 'Timed out on cancel request of remote task ' + f'{cid} for {self._portal.channel.uid}' + ) # callee side remote task else: - self._cancel_msg = msg - # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an # {'error': trio.Cancelled, cid: "blah"} enough? @@ -335,7 +379,6 @@ class Context: @acm async def open_stream( - self, allow_overruns: bool | None = False, msg_buffer_size: int | None = None, @@ -354,10 +397,10 @@ class Context: ``Portal.open_context()``. In the future this may change but currently there seems to be no obvious reason to support "re-opening": - - pausing a stream can be done with a message. - - task errors will normally require a restart of the entire - scope of the inter-actor task context due to the nature of - ``trio``'s cancellation system. + - pausing a stream can be done with a message. + - task errors will normally require a restart of the entire + scope of the inter-actor task context due to the nature of + ``trio``'s cancellation system. ''' actor = current_actor() @@ -439,18 +482,19 @@ class Context: self, err: Exception, ) -> None: + ''' + Maybe raise a remote error depending on who (which task from + which actor) requested a cancellation (if any). + + ''' # NOTE: whenever the context's "opener" side (task) **is** # the side which requested the cancellation (likekly via # ``Context.cancel()``), we don't want to re-raise that # cancellation signal locally (would be akin to # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` - # whenever ``CancelScope.cancel()`` was called) and instead - # silently reap the expected cancellation "error"-msg. - # if 'pikerd' in err.msgdata['tb_str']: - # # from . import _debug - # # await _debug.breakpoint() - # breakpoint() - + # whenever ``CancelScope.cancel()`` was called) and + # instead silently reap the expected cancellation + # "error"-msg. if ( isinstance(err, ContextCancelled) and ( @@ -461,7 +505,18 @@ class Context: ): return err - raise err # from None + # NOTE: currently we are masking underlying runtime errors + # which are often superfluous to user handler code. not + # sure if this is still needed / desired for all operation? + # TODO: maybe we can only NOT mask if: + # - [ ] debug mode is enabled or, + # - [ ] a certain log level is set? + # - [ ] consider using `.with_traceback()` to filter out + # runtime frames from the tb explicitly? + # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement + # https://stackoverflow.com/a/24752607 + __tracebackhide__: bool = True + raise err from None async def result(self) -> Any | Exception: ''' @@ -489,12 +544,12 @@ class Context: of the remote cancellation. ''' + __tracebackhide__: bool = True assert self._portal, "Context.result() can not be called from callee!" assert self._recv_chan if re := self._remote_error: - self._maybe_raise_remote_err(re) - return re + return self._maybe_raise_remote_err(re) if ( self._result == id(self) @@ -505,8 +560,8 @@ class Context: # and discarding any bi dir stream msgs still # in transit from the far end. while True: - msg = await self._recv_chan.receive() try: + msg = await self._recv_chan.receive() self._result: Any = msg['return'] # NOTE: we don't need to do this right? @@ -519,17 +574,22 @@ class Context: # NOTE: we get here if the far end was # `ContextCancelled` in 2 cases: - # - we requested the cancellation and thus - # SHOULD NOT raise that far end error, - # - WE DID NOT REQUEST that cancel and thus - # SHOULD RAISE HERE! + # 1. we requested the cancellation and thus + # SHOULD NOT raise that far end error, + # 2. WE DID NOT REQUEST that cancel and thus + # SHOULD RAISE HERE! except trio.Cancelled: - if not self._cancel_called: - raise self._remote_error - else: - # if we DID request the cancel we simply - # continue as normal. - raise + + # CASE 2: mask the local cancelled-error(s) + # only when we are sure the remote error is the + # (likely) source cause of this local runtime + # task's cancellation. + if re := self._remote_error: + self._maybe_raise_remote_err(re) + + # CASE 1: we DID request the cancel we simply + # continue to bubble up as normal. + raise except KeyError: # as msgerr: @@ -544,7 +604,8 @@ class Context: # internal error should never get here assert msg.get('cid'), ( - "Received internal error at portal?") + "Received internal error at portal?" + ) err = unpack_error( msg, @@ -554,7 +615,10 @@ class Context: err = self._maybe_raise_remote_err(err) self._remote_error = err - return self._remote_error or self._result + if re := self._remote_error: + return self._maybe_raise_remote_err(re) + + return self._result async def started( self, @@ -563,7 +627,7 @@ class Context: ) -> None: ''' Indicate to calling actor's task that this linked context - has started and send ``value`` to the other side. + has started and send ``value`` to the other side via IPC. On the calling side ``value`` is the second item delivered in the tuple returned by ``Portal.open_context()``. @@ -571,19 +635,17 @@ class Context: ''' if self._portal: raise RuntimeError( - f"Caller side context {self} can not call started!") + f'Caller side context {self} can not call started!' + ) elif self._started_called: raise RuntimeError( - f"called 'started' twice on context with {self.chan.uid}") + f'called `.started()` twice on context with {self.chan.uid}' + ) await self.chan.send({'started': value, 'cid': self.cid}) self._started_called = True - # TODO: do we need a restart api? - # async def restart(self) -> None: - # pass - async def _drain_overflows( self, ) -> None: @@ -638,10 +700,21 @@ class Context: self, msg: dict, - draining: bool = False, + # draining: bool = False, ) -> bool: + ''' + Deliver an IPC msg received from a transport-channel to + this context's underlying mem chan for handling by + user operating tasks; deliver a bool indicating whether the + msg was immediately sent. + If `._allow_overruns == True` (maybe) append the msg to an + "overflow queue" and start a "drainer task" (inside the + `._scope_nursery: trio.Nursery`) which ensures that such + messages are eventually sent if possible. + + ''' cid = self.cid chan = self.chan uid = chan.uid @@ -652,8 +725,12 @@ class Context: ) error = msg.get('error') - if error: - await self._maybe_cancel_and_set_remote_error(msg) + if error := unpack_error( + msg, + self.chan, + ): + self._cancel_msg = msg + await self._maybe_cancel_and_set_remote_error(error) if ( self._in_overrun @@ -685,6 +762,7 @@ class Context: # the sender; the main motivation is that using bp can block the # msg handling loop which calls into this method! except trio.WouldBlock: + # XXX: always push an error even if the local # receiver is in overrun state. # await self._maybe_cancel_and_set_remote_error(msg)