diff --git a/tractor/_context.py b/tractor/_context.py index 6a63416..69f28ac 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -49,20 +49,21 @@ from ._exceptions import ( InternalError, RemoteActorError, StreamOverrun, - pack_error, + pack_from_raise, unpack_error, _raise_from_no_key_in_msg, ) from .log import get_logger from .msg import ( + Error, + MsgType, + MsgCodec, NamespacePath, - Msg, Return, Started, Stop, Yield, current_codec, - MsgCodec, pretty_struct, ) from ._ipc import Channel @@ -107,8 +108,7 @@ async def _drain_to_final_msg( # wait for a final context result by collecting (but # basically ignoring) any bi-dir-stream msgs still in transit # from the far end. - # pre_result_drained: list[dict] = [] - pre_result_drained: list[Msg] = [] + pre_result_drained: list[MsgType] = [] while not ( ctx.maybe_error and not ctx._final_result_is_set() @@ -168,7 +168,7 @@ async def _drain_to_final_msg( # pray to the `trio` gawds that we're corrent with this # msg: dict = await ctx._recv_chan.receive() - msg: Msg = await ctx._recv_chan.receive() + msg: MsgType = await ctx._recv_chan.receive() # always capture unexpected/non-result msgs pre_result_drained.append(msg) @@ -191,13 +191,12 @@ async def _drain_to_final_msg( raise match msg: + + # final result arrived! case Return( - cid=cid, + # cid=cid, pld=res, ): - # try: - # ctx._result: Any = msg['return'] - # ctx._result: Any = msg.pld ctx._result: Any = res log.runtime( 'Context delivered final draining msg:\n' @@ -210,13 +209,9 @@ async def _drain_to_final_msg( # TODO: ^ we don't need it right? break - # except KeyError: - # except AttributeError: + # far end task is still streaming to us so discard + # and report depending on local ctx state. case Yield(): - # if 'yield' in msg: - - # far end task is still streaming to us so discard - # and report per local context state. if ( (ctx._stream.closed and (reason := 'stream was already closed') @@ -257,45 +252,34 @@ async def _drain_to_final_msg( ) continue + # stream terminated, but no result yet.. + # # TODO: work out edge cases here where # a stream is open but the task also calls # this? # -[ ] should be a runtime error if a stream is open right? # Stop() case Stop(): - # elif 'stop' in msg: log.cancel( 'Remote stream terminated due to "stop" msg:\n\n' f'{pformat(msg)}\n' ) continue - # It's an internal error if any other msg type without - # a`'cid'` field arrives here! - case _: - # if not msg.get('cid'): - if not msg.cid: - raise InternalError( - 'Unexpected cid-missing msg?\n\n' - f'{msg}\n' - ) + # remote error msg, likely already handled inside + # `Context._deliver_msg()` + case Error(): - # XXX fallthrough to handle expected error XXX - # TODO: replace this with `ctx.maybe_raise()` + # TODO: can we replace this with `ctx.maybe_raise()`? + # -[ ] would this be handier for this case maybe? + # async with maybe_raise_on_exit() as raises: + # if raises: + # log.error('some msg about raising..') # - # TODO: would this be handier for this case maybe? - # async with maybe_raise_on_exit() as raises: - # if raises: - # log.error('some msg about raising..') - re: Exception|None = ctx._remote_error if re: - log.critical( - 'Remote ctx terminated due to "error" msg:\n' - f'{re}' - ) assert msg is ctx._cancel_msg - # NOTE: this solved a super dupe edge case XD + # NOTE: this solved a super duper edge case XD # this was THE super duper edge case of: # - local task opens a remote task, # - requests remote cancellation of far end @@ -312,9 +296,10 @@ async def _drain_to_final_msg( # does not re-raise any ctxc it receives # IFF **it** was the cancellation # requester.. - # will raise if necessary, ow break from - # loop presuming any error terminates the - # context! + # + # XXX will raise if necessary but ow break + # from loop presuming any supressed error + # (ctxc) should terminate the context! ctx._maybe_raise_remote_err( re, # NOTE: obvi we don't care if we @@ -338,6 +323,7 @@ async def _drain_to_final_msg( log.critical('SHOULD NEVER GET HERE!?') assert msg is ctx._cancel_msg assert error.msgdata == ctx._remote_error.msgdata + assert error.ipc_msg == ctx._remote_error.ipc_msg from .devx._debug import pause await pause() ctx._maybe_cancel_and_set_remote_error(error) @@ -346,6 +332,20 @@ async def _drain_to_final_msg( else: # bubble the original src key error raise + + # XXX should pretty much never get here unless someone + # overrides the default `MsgType` spec. + case _: + # It's definitely an internal error if any other + # msg type without a`'cid'` field arrives here! + if not msg.cid: + raise InternalError( + 'Unexpected cid-missing msg?\n\n' + f'{msg}\n' + ) + + raise RuntimeError('Unknown msg type: {msg}') + else: log.cancel( 'Skipping `MsgStream` drain since final outcome is set\n\n' @@ -1342,8 +1342,11 @@ class Context: # `._cancel_called == True`. not raise_overrun_from_self and isinstance(remote_error, RemoteActorError) - and remote_error.msgdata['boxed_type_str'] == 'StreamOverrun' - and tuple(remote_error.msgdata['sender']) == our_uid + + and remote_error.boxed_type_str == 'StreamOverrun' + + # and tuple(remote_error.msgdata['sender']) == our_uid + and tuple(remote_error.sender) == our_uid ): # NOTE: we set the local scope error to any "self # cancellation" error-response thus "absorbing" @@ -1412,16 +1415,11 @@ class Context: assert self._recv_chan raise_overrun: bool = not self._allow_overruns - # res_placeholder: int = id(self) if ( - # self._result == res_placeholder - # and not self._remote_error self.maybe_error is None - # not self._remote_error - # and not self._local_error - and not self._recv_chan._closed # type: ignore + and + not self._recv_chan._closed # type: ignore ): - # wait for a final context result/error by "draining" # (by more or less ignoring) any bi-dir-stream "yield" # msgs still in transit from the far end. @@ -1432,7 +1430,6 @@ class Context: for msg in drained_msgs: # TODO: mask this by default.. - # if 'return' in msg: if isinstance(msg, Return): # from .devx import pause # await pause() @@ -1448,6 +1445,9 @@ class Context: ) self.maybe_raise( + # NOTE: obvi we don't care if we + # overran the far end if we're already + # waiting on a final result (msg). raise_overrun_from_self=( raise_overrun and @@ -1458,34 +1458,12 @@ class Context: (not self._cancel_called) ) ) - # if ( - # (re := self._remote_error) - # # and self._result == res_placeholder - # ): - # self._maybe_raise_remote_err( - # re, - # # NOTE: obvi we don't care if we - # # overran the far end if we're already - # # waiting on a final result (msg). - # # raise_overrun_from_self=False, - # raise_overrun_from_self=( - # raise_overrun - # and - # # only when we ARE NOT the canceller - # # should we raise overruns, bc ow we're - # # raising something we know might happen - # # during cancellation ;) - # (not self._cancel_called) - # ), - # ) - # if maybe_err: - # self._result = maybe_err - return self.outcome - # TODO: switch this with above which should be named - # `.wait_for_outcome()` and instead do - # a `.outcome.Outcome.unwrap()` ? + # TODO: switch this with above! + # -[ ] should be named `.wait_for_outcome()` and instead do + # a `.outcome.Outcome.unwrap()` ? + # # @property # def result(self) -> Any|None: # if self._final_result_is_set(): @@ -1544,7 +1522,6 @@ class Context: return None def _final_result_is_set(self) -> bool: - # return not (self._result == id(self)) return self._result is not Unresolved # def get_result_nowait(self) -> Any|None: @@ -1761,8 +1738,7 @@ class Context: async def _deliver_msg( self, - # msg: dict, - msg: Msg, + msg: MsgType, ) -> bool: ''' @@ -1776,6 +1752,20 @@ class Context: `._scope_nursery: trio.Nursery`) which ensures that such messages are queued up and eventually sent if possible. + XXX RULES XXX + ------ - ------ + - NEVER raise remote errors from this method; a runtime task caller. + An error "delivered" to a ctx should always be raised by + the corresponding local task operating on the + `Portal`/`Context` APIs. + + - NEVER `return` early before delivering the msg! + bc if the error is a ctxc and there is a task waiting on + `.result()` we need the msg to be + `send_chan.send_nowait()`-ed over the `._recv_chan` so + that the error is relayed to that waiter task and thus + raised in user code! + ''' cid: str = self.cid chan: Channel = self.chan @@ -1806,28 +1796,14 @@ class Context: ) self._cancel_msg: dict = msg - # NOTE: this will not raise an error, merely set + # XXX NOTE: this will not raise an error, merely set # `._remote_error` and maybe cancel any task currently # entered in `Portal.open_context()` presuming the # error is "cancel causing" (i.e. a `ContextCancelled` # or `RemoteActorError`). self._maybe_cancel_and_set_remote_error(re) - # XXX NEVER do this XXX..!! - # bc if the error is a ctxc and there is a task - # waiting on `.result()` we need the msg to be sent - # over the `send_chan`/`._recv_chan` so that the error - # is relayed to that waiter task.. - # return True - # - # XXX ALSO NO!! XXX - # => NEVER raise remote errors from the calling - # runtime task, they should always be raised by - # consumer side tasks operating on the - # `Portal`/`Context` APIs. - # if self._remote_error: - # self._maybe_raise_remote_err(error) - + # XXX only case where returning early is fine! if self._in_overrun: log.warning( f'Queueing OVERRUN msg on caller task:\n' @@ -1946,31 +1922,27 @@ class Context: # anything different. return False else: - # txt += f'\n{msg}\n' # raise local overrun and immediately pack as IPC # msg for far end. - try: - raise StreamOverrun( + err_msg: Error = pack_from_raise( + local_err=StreamOverrun( txt, sender=from_uid, - ) - except StreamOverrun as err: - err_msg: dict[str, dict] = pack_error( - err, - cid=cid, - ) - try: - # relay condition to sender side remote task - await chan.send(err_msg) - return True + ), + cid=cid, + ) + try: + # relay condition to sender side remote task + await chan.send(err_msg) + return True - except trio.BrokenResourceError: - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning( - 'Channel for ctx is already closed?\n' - f'|_{chan}\n' - ) + # XXX: local consumer has closed their side of + # the IPC so cancel the far end streaming task + except trio.BrokenResourceError: + log.warning( + 'Channel for ctx is already closed?\n' + f'|_{chan}\n' + ) # ow, indicate unable to deliver by default return False @@ -2379,28 +2351,17 @@ async def open_context_from_portal( # an exception type boxed in a `RemoteActorError` # is returned (meaning it was obvi not raised) # that we want to log-report on. - msgdata: str|None = getattr( - result_or_err, - 'msgdata', - None - ) - match (msgdata, result_or_err): - case ( - {'tb_str': tbstr}, - ContextCancelled(), - ): - log.cancel(tbstr) + match result_or_err: + case ContextCancelled() as ctxc: + log.cancel(ctxc.tb_str) - case ( - {'tb_str': tbstr}, - RemoteActorError(), - ): + case RemoteActorError() as rae: log.exception( 'Context remotely errored!\n' f'<= peer: {uid}\n' f' |_ {nsf}()\n\n' - f'{tbstr}' + f'{rae.tb_str}' ) case (None, _): log.runtime( @@ -2410,7 +2371,6 @@ async def open_context_from_portal( f'`{result_or_err}`\n' ) - finally: # XXX: (MEGA IMPORTANT) if this is a root opened process we # wait for any immediate child in debug before popping the diff --git a/tractor/_portal.py b/tractor/_portal.py index 957eae5..e4db93a 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -46,6 +46,7 @@ from ._state import ( from ._ipc import Channel from .log import get_logger from .msg import ( + Error, NamespacePath, Return, ) @@ -69,8 +70,7 @@ log = get_logger(__name__) # `._raise_from_no_key_in_msg()` (after tweak to # accept a `chan: Channel` arg) in key block! def _unwrap_msg( - # msg: dict[str, Any], - msg: Return, + msg: Return|Error, channel: Channel, hide_tb: bool = True, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 4be5ea1..4d90c59 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -49,7 +49,6 @@ from pprint import pformat import signal import sys from typing import ( - Any, Callable, TYPE_CHECKING, )