Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 668016d37b Absorb EoCs via `Context.open_stream()` silently
I swear long ago it used to operate this way but, I guess this finalizes
the design decision. It makes a lot more sense to *not* propagate any
`trio.EndOfChannel` raised from a `Context.open_stream() as stream:`
block when that EoC is due to graceful-explicit stream termination.
We use the EoC much like a `StopAsyncIteration` where the error
indicates termination of the stream due to either:
- reception of a stop IPC msg indicating the far end ended the stream
  (gracecfully),
- closure of the underlying `Context._recv_chan` either by the runtime
  or due to user code having called `MsgStream.aclose()`.

User code shouldn't expect to handle EoC outside the block since the
`@acm` having closed should indicate the exactly same lifetime state
(of said stream) ;)

Deats:
- add special EoC handler in `.open_stream()` which silently "absorbs"
  the error only when the stream is already marked as closed (meaning
  the EoC indeed corresponds to IPC closure) with an assert for now
  ensuring the error is the same as set to `MsgStream._eoc`.
- in `MsgStream.receive()` break up the handlers for EoC and
  `trio.ClosedResourceError` since the error instances are saved to
  different variables and we **don't** want to rewrite the exception in
  the eoc case (normally to mask `trio` internals in tbs) bc we need the
  instance to be the exact one for doing checks inside
  `.open_stream().__aexit__()` to absorb it.

Other surrounding "improvements":
- start using the new `Context.maybe_raise()` helper where it can easily
  replace existing equivalent block-sections.
- use new `RemoteActorError.src_uid` as required.
2024-03-19 18:40:50 -04:00
Tyler Goodlet 9221c57234 Adjust all `RemoteActorError.type` using tests
To instead use the new `.boxed_type` B)
2024-03-19 18:08:54 -04:00
Tyler Goodlet 78434f6317 Fix `.boxed_type` facepalm, drop `.src_actor_uid`
The misname of `._boxed_type` as `._src_type` was only manifesting as
a reallly strange boxing error with a packed exception-group, not sure
how or why only that but it's fixed now XD

Start refining/cleaning out stuff for sure we don't need (based on
multiple local test runs):

- discard `.src_actor_uid` fully since test set has been moved over to
  `.src_uid`; this means also removing the `.msgdata` insertion from
  `pack_error()`; a patch to all internals is coming next obvi!

- don't pass `boxed_type` to `RemoteActorError.__init__()` from
  `unpack_error()` since it's now set directly via the
  `.msgdata["boxed_type_str"]`/`error_msg: dict` input , but in the case
  where **it is passed as an arg** (only for ctxc in `._rpc._invoke()`
  rn) make sure we only do the `.__init__()` insert when `boxed_type is
  not None`.
2024-03-19 14:20:59 -04:00
Tyler Goodlet 5fb5682269 First try "relayed boxed errors", or "inceptions"
Since adding more complex inter-peer (actor) testing scenarios, we
definitely have an immediate need for `trio`'s style of "inceptions" but
for nesting `RemoteActorError`s as they're relayed through multiple
actor-IPC hops. So for example, a remote error relayed "through" some
proxy actor to another ends up packing a `RemoteActorError` into another
one such that there are 2 layers of RAEs with the first
containing/boxing an original src actor error (type).

In support of this extension to `RemoteActorError` we add:

- `get_err_type()` error type resolver helper (factored fromthe
  body of `unpack_error()`) to be used whenever rendering
  `.src_type`/`.boxed_type`.

- `.src_type_str: str` which is pulled from `.msgdata` and holds the
  above (eventually when unpacked) type as `str`.
- `._src_type: BaseException|None` for the original
  "source" actor's error as unpacked in any remote (actor's) env and
  exposed as a readonly property `.src_type`.

- `.boxed_type_str: str` the same as above but for the "last" boxed
  error's type; when the RAE is unpacked at its first hop this will
  be **the same as** `.src_type_str`.
- `._boxed_type: BaseException` which now similarly should be "rendered"
  from the below type-`str` field instead of passed in as a error-type
  via `boxed_type` (though we still do for the ctxc case atm, see
  notes).
 |_ new sanity checks in `.__init__()` mostly as a reminder to handle
   that ^ ctxc case ^ more elegantly at some point..
 |_ obvi we discard the previous `suberror_type` input arg.

- fully remove the `.type`/`.type_str` properties instead expecting
  usage of `.boxed_/.src_` equivalents.
- start deprecation of `.src_actor_uid` and make it delegate to new
  `.src_uid`
- add `.relay_uid` propery for the last relay/hop's actor uid.
- add `.relay_path: list[str]` which holds the per-hop updated sequence
  of relay actor uid's which consecutively did boxing of an RAE.
- only include `.src_uid` and `.relay_path` in reprol() output.
- factor field-to-str rendering into a new `_mk_fields_str()`
  and use it in `.__repr__()`/`.reprol()`.
- add an `.unwrap()` to (attempt to) render the src error.

- rework `pack_error()` to handle inceptions including,
  - packing the correct field-values for the new `boxed_type_str`, `relay_uid`,
    `src_uid`, `src_type_str`.
  - always updating the `relay_path` sequence with the uid of the
    current actor.

- adjust `unpack_error()` to match all these changes,
  - pulling `boxed_type_str` and passing any resolved `boxed_type` to
    `RemoteActorError.__init__()`.
  - use the new `Context.maybe_raise()` convenience method.

Adjust `._rpc` packing to `ContextCancelled(boxed_type=trio.Cancelled)`
and tweak some more log msg formats.
2024-03-18 14:28:24 -04:00
10 changed files with 437 additions and 184 deletions

View File

@ -32,7 +32,7 @@ async def main():
try:
await p1.run(name_error)
except tractor.RemoteActorError as rae:
assert rae.type is NameError
assert rae.boxed_type is NameError
async for i in stream:

View File

@ -77,7 +77,7 @@ def test_remote_error(reg_addr, args_err):
# of this actor nursery.
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
assert err.boxed_type == errtype
print("Look Maa that actor failed hard, hehh")
raise
@ -86,7 +86,7 @@ def test_remote_error(reg_addr, args_err):
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == errtype
assert excinfo.value.boxed_type == errtype
else:
# the root task will also error on the `.result()` call
@ -96,7 +96,7 @@ def test_remote_error(reg_addr, args_err):
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
assert exc.boxed_type == errtype
def test_multierror(reg_addr):
@ -117,7 +117,7 @@ def test_multierror(reg_addr):
try:
await portal2.result()
except tractor.RemoteActorError as err:
assert err.type == AssertionError
assert err.boxed_type == AssertionError
print("Look Maa that first actor failed hard, hehh")
raise
@ -169,7 +169,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError)
assert exc.type == AssertionError
assert exc.boxed_type == AssertionError
async def do_nothing():
@ -310,7 +310,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
await portal.run(func, **kwargs)
except tractor.RemoteActorError as err:
assert err.type == err_type
assert err.boxed_type == err_type
# we only expect this first error to propogate
# (all other daemons are cancelled before they
# can be scheduled)
@ -329,11 +329,11 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError):
assert exc.type == err_type
assert exc.boxed_type == err_type
else:
assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError):
assert err.type == err_type
assert err.boxed_type == err_type
assert n.cancelled is True
assert not n._children
@ -412,7 +412,7 @@ async def test_nested_multierrors(loglevel, start_method):
elif isinstance(subexc, tractor.RemoteActorError):
# on windows it seems we can't exactly be sure wtf
# will happen..
assert subexc.type in (
assert subexc.boxed_type in (
tractor.RemoteActorError,
trio.Cancelled,
BaseExceptionGroup,
@ -422,7 +422,7 @@ async def test_nested_multierrors(loglevel, start_method):
for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,):
subsub = subsub.type
subsub = subsub.boxed_type
assert type(subsub) in (
trio.Cancelled,
@ -437,16 +437,16 @@ async def test_nested_multierrors(loglevel, start_method):
# we get back the (sent) cancel signal instead
if is_win():
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in (
assert subexc.boxed_type in (
BaseExceptionGroup,
tractor.RemoteActorError
)
else:
assert isinstance(subexc, BaseExceptionGroup)
else:
assert subexc.type is ExceptionGroup
assert subexc.boxed_type is ExceptionGroup
else:
assert subexc.type in (
assert subexc.boxed_type in (
tractor.RemoteActorError,
trio.Cancelled
)

View File

@ -171,4 +171,4 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio(
# verify boxed error
err = excinfo.value
assert isinstance(err.type(), NameError)
assert err.boxed_type is NameError

View File

@ -795,7 +795,7 @@ async def test_callee_cancels_before_started(
# raises a special cancel signal
except tractor.ContextCancelled as ce:
ce.type == trio.Cancelled
ce.boxed_type == trio.Cancelled
# the traceback should be informative
assert 'itself' in ce.msgdata['tb_str']
@ -903,7 +903,7 @@ def test_one_end_stream_not_opened(
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == StreamOverrun
assert excinfo.value.boxed_type == StreamOverrun
elif overrunner == 'callee':
with pytest.raises(tractor.RemoteActorError) as excinfo:
@ -912,7 +912,7 @@ def test_one_end_stream_not_opened(
# TODO: embedded remote errors so that we can verify the source
# error? the callee delivers an error which is an overrun
# wrapped in a remote actor error.
assert excinfo.value.type == tractor.RemoteActorError
assert excinfo.value.boxed_type == tractor.RemoteActorError
else:
trio.run(main)
@ -1131,7 +1131,7 @@ def test_maybe_allow_overruns_stream(
# NOTE: i tried to isolate to a deterministic case here
# based on timeing, but i was kinda wasted, and i don't
# think it's sane to catch them..
assert err.type in (
assert err.boxed_type in (
tractor.RemoteActorError,
StreamOverrun,
)
@ -1139,10 +1139,10 @@ def test_maybe_allow_overruns_stream(
elif (
slow_side == 'child'
):
assert err.type == StreamOverrun
assert err.boxed_type == StreamOverrun
elif slow_side == 'parent':
assert err.type == tractor.RemoteActorError
assert err.boxed_type == tractor.RemoteActorError
assert 'StreamOverrun' in err.msgdata['tb_str']
else:

View File

@ -128,7 +128,7 @@ def test_aio_simple_error(reg_addr):
assert err
assert isinstance(err, RemoteActorError)
assert err.type == AssertionError
assert err.boxed_type == AssertionError
def test_tractor_cancels_aio(reg_addr):
@ -272,7 +272,7 @@ def test_context_spawns_aio_task_that_errors(
err = excinfo.value
assert isinstance(err, expect)
assert err.type == AssertionError
assert err.boxed_type == AssertionError
async def aio_cancel():
@ -314,7 +314,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
assert err
# ensure boxed error is correct
assert err.type == to_asyncio.AsyncioCancelled
assert err.boxed_type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this..
@ -466,7 +466,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
assert exc.boxed_type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr):
@ -500,7 +500,7 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
assert exc.boxed_type == Exception
@tractor.context

View File

@ -36,7 +36,7 @@ async def sleep_back_actor(
if not exposed_mods:
expect = tractor.ModuleNotExposed
assert err.type is expect
assert err.boxed_type is expect
raise
else:
await trio.sleep(float('inf'))
@ -150,4 +150,4 @@ def test_rpc_errors(
))
if getattr(value, 'type', None):
assert value.type is inside_err
assert value.boxed_type is inside_err

View File

@ -169,8 +169,7 @@ async def _drain_to_final_msg(
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
ctx.maybe_raise()
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
@ -257,6 +256,13 @@ async def _drain_to_final_msg(
)
# XXX fallthrough to handle expected error XXX
# TODO: replace this with `ctx.maybe_raise()`
#
# TODO: would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
re: Exception|None = ctx._remote_error
if re:
log.critical(
@ -595,7 +601,7 @@ class Context:
if not re:
return False
if from_uid := re.src_actor_uid:
if from_uid := re.src_uid:
from_uid: tuple = tuple(from_uid)
our_uid: tuple = self._actor.uid
@ -825,7 +831,7 @@ class Context:
# cancellation.
maybe_error_src: tuple = getattr(
error,
'src_actor_uid',
'src_uid',
None,
)
self._canceller = (
@ -1030,8 +1036,8 @@ class Context:
@acm
async def open_stream(
self,
allow_overruns: bool | None = False,
msg_buffer_size: int | None = None,
allow_overruns: bool|None = False,
msg_buffer_size: int|None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
@ -1071,13 +1077,16 @@ class Context:
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
if self._remote_error:
self.maybe_raise(
raise_ctxc_from_self_call=True,
)
# NOTE: this is diff then calling
# `._maybe_raise_remote_err()` specifically
# because any task entering this `.open_stream()`
# AFTER cancellation has already been requested,
# because we want to raise a ctxc on any task entering this `.open_stream()`
# AFTER cancellation was already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
raise self._remote_error
# if self._remote_error:
# raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
@ -1158,7 +1167,6 @@ class Context:
# await trio.lowlevel.checkpoint()
yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
@ -1183,12 +1191,23 @@ class Context:
#
# await stream.aclose()
# if re := ctx._remote_error:
# ctx._maybe_raise_remote_err(
# re,
# raise_ctxc_from_self_call=True,
# )
# await trio.lowlevel.checkpoint()
# NOTE: absorb and do not raise any
# EoC received from the other side such that
# it is not raised inside the surrounding
# context block's scope!
except trio.EndOfChannel as eoc:
if (
eoc
and stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
# from .devx import pause
# await pause()
log.warning(
'Stream was terminated by EoC\n\n'
f'{repr(eoc)}\n'
)
finally:
if self._portal:
@ -1204,7 +1223,6 @@ class Context:
# TODO: replace all the instances of this!! XD
def maybe_raise(
self,
hide_tb: bool = True,
**kwargs,
@ -1388,16 +1406,7 @@ class Context:
f'{drained_msgs}'
)
if (
(re := self._remote_error)
# and self._result == res_placeholder
):
self._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
self.maybe_raise(
raise_overrun_from_self=(
raise_overrun
and
@ -1406,15 +1415,32 @@ class Context:
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
)
)
# if (
# (re := self._remote_error)
# # and self._result == res_placeholder
# ):
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=(
# raise_overrun
# and
# # only when we ARE NOT the canceller
# # should we raise overruns, bc ow we're
# # raising something we know might happen
# # during cancellation ;)
# (not self._cancel_called)
# ),
# )
# if maybe_err:
# self._result = maybe_err
return self.outcome
# None if self._result == res_placeholder
# else self._result
# )
# TODO: switch this with above which should be named
# `.wait_for_outcome()` and instead do
@ -1863,8 +1889,9 @@ async def open_context_from_portal(
# TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show?
hide_tb: bool = True,
# we can similarly annotate that frame to NOT show? for now
# we DO SHOW this frame since it's awkward ow..
hide_tb: bool = False,
# proxied to RPC
**kwargs,

View File

@ -58,16 +58,44 @@ class InternalError(RuntimeError):
'''
_body_fields: list[str] = [
'src_actor_uid',
'boxed_type',
'src_type',
# TODO: format this better if we're going to include it.
# 'relay_path',
'src_uid',
# only in sub-types
'canceller',
'sender',
]
_msgdata_keys: list[str] = [
'type_str',
'boxed_type_str',
] + _body_fields
def get_err_type(type_name: str) -> BaseException|None:
'''
Look up an exception type by name from the set of locally
known namespaces:
- `builtins`
- `tractor._exceptions`
- `trio`
'''
for ns in [
builtins,
_this_mod,
trio,
]:
if type_ref := getattr(
ns,
type_name,
False,
):
return type_ref
# TODO: rename to just `RemoteError`?
class RemoteActorError(Exception):
@ -81,13 +109,15 @@ class RemoteActorError(Exception):
'''
reprol_fields: list[str] = [
'src_actor_uid',
'src_uid',
'relay_path',
# 'relay_uid',
]
def __init__(
self,
message: str,
suberror_type: Type[BaseException] | None = None,
boxed_type: Type[BaseException]|None = None,
**msgdata
) -> None:
@ -101,20 +131,112 @@ class RemoteActorError(Exception):
# - .remote_type
# also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5
self.boxed_type: str = suberror_type
#
# TODO: always set ._boxed_type` as `None` by default
# and instead render if from `.boxed_type_str`?
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
self.msgdata: dict[str, Any] = msgdata
@property
def type(self) -> str:
return self.boxed_type
# TODO: mask out eventually or place in `pack_error()`
# pre-`return` lines?
# sanity on inceptions
if boxed_type is RemoteActorError:
assert self.src_type_str != 'RemoteActorError'
assert self.src_uid not in self.relay_path
# ensure type-str matches and round-tripping from that
# str results in same error type.
#
# TODO NOTE: this is currently exclusively for the
# `ContextCancelled(boxed_type=trio.Cancelled)` case as is
# used inside `._rpc._invoke()` atm though probably we
# should better emphasize that special (one off?) case
# either by customizing `ContextCancelled.__init__()` or
# through a special factor func?
elif boxed_type:
if not self.msgdata.get('boxed_type_str'):
self.msgdata['boxed_type_str'] = str(
type(boxed_type).__name__
)
assert self.boxed_type_str == self.msgdata['boxed_type_str']
assert self.boxed_type is boxed_type
@property
def type_str(self) -> str:
return str(type(self.boxed_type).__name__)
def src_type_str(self) -> str:
'''
String-name of the source error's type.
This should be the same as `.boxed_type_str` when unpacked
at the first relay/hop's receiving actor.
'''
return self.msgdata['src_type_str']
@property
def src_actor_uid(self) -> tuple[str, str]|None:
return self.msgdata.get('src_actor_uid')
def src_type(self) -> str:
'''
Error type raised by original remote faulting actor.
'''
if self._src_type is None:
self._src_type = get_err_type(
self.msgdata['src_type_str']
)
return self._src_type
@property
def boxed_type_str(self) -> str:
'''
String-name of the (last hop's) boxed error type.
'''
return self.msgdata['boxed_type_str']
@property
def boxed_type(self) -> str:
'''
Error type boxed by last actor IPC hop.
'''
if self._boxed_type is None:
self._boxed_type = get_err_type(
self.msgdata['boxed_type_str']
)
return self._boxed_type
@property
def relay_path(self) -> list[tuple]:
'''
Return the list of actors which consecutively relayed
a boxed `RemoteActorError` the src error up until THIS
actor's hop.
NOTE: a `list` field with the same name is expected to be
passed/updated in `.msgdata`.
'''
return self.msgdata['relay_path']
@property
def relay_uid(self) -> tuple[str, str]|None:
return tuple(
self.msgdata['relay_path'][-1]
)
@property
def src_uid(self) -> tuple[str, str]|None:
if src_uid := (
self.msgdata.get('src_uid')
):
return tuple(src_uid)
# TODO: use path lookup instead?
# return tuple(
# self.msgdata['relay_path'][0]
# )
@property
def tb_str(
@ -129,28 +251,56 @@ class RemoteActorError(Exception):
return ''
def _mk_fields_str(
self,
fields: list[str],
end_char: str = '\n',
) -> str:
_repr: str = ''
for key in fields:
val: Any|None = (
getattr(self, key, None)
or
self.msgdata.get(key)
)
# TODO: for `.relay_path` on multiline?
# if not isinstance(val, str):
# val_str = pformat(val)
# else:
val_str: str = repr(val)
if val:
_repr += f'{key}={val_str}{end_char}'
return _repr
def reprol(self) -> str:
'''
Represent this error for "one line" display, like in
a field of our `Context.__repr__()` output.
'''
_repr: str = f'{type(self).__name__}('
for key in self.reprol_fields:
val: Any|None = self.msgdata.get(key)
if val:
_repr += f'{key}={repr(val)} '
return _repr
# TODO: use this matryoshka emjoi XD
# => 🪆
reprol_str: str = f'{type(self).__name__}('
_repr: str = self._mk_fields_str(
self.reprol_fields,
end_char=' ',
)
return (
reprol_str
+
_repr
)
def __repr__(self) -> str:
'''
Nicely formatted boxed error meta data + traceback.
fields: str = ''
for key in _body_fields:
val: str|None = self.msgdata.get(key)
if val:
fields += f'{key}={val}\n'
'''
fields: str = self._mk_fields_str(
_body_fields,
)
fields: str = textwrap.indent(
fields,
# prefix=' '*2,
@ -165,8 +315,6 @@ class RemoteActorError(Exception):
f' ------ - ------\n'
f' _|\n'
)
# f'|\n'
# f' |\n'
if indent:
body: str = textwrap.indent(
body,
@ -178,9 +326,47 @@ class RemoteActorError(Exception):
')>'
)
# TODO: local recontruction of remote exception deats
def unwrap(
self,
) -> BaseException:
'''
Unpack the inner-most source error from it's original IPC msg data.
We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the
failing actor's remote env.
'''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611
return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given
# "hop" / relay-node in this error's relay_path?
# => so would render a `RAE[RAE[RAE[Exception]]]` instance
# with all inner errors unpacked?
# -[ ] if this is useful shouldn't be too hard to impl right?
# def unbox(self) -> BaseException:
# ...
# '''
# Unbox to the prior relays (aka last boxing actor's)
# inner error.
# '''
# if not self.relay_path:
# return self.unwrap()
# # TODO..
# # return self.boxed_type(
# # boxed_type=get_type_ref(..
# raise NotImplementedError
class InternalActorError(RemoteActorError):
@ -232,7 +418,7 @@ class ContextCancelled(RemoteActorError):
f'{self}'
)
# to make `.__repr__()` work uniformly
# TODO: to make `.__repr__()` work uniformly?
# src_actor_uid = canceller
@ -283,7 +469,8 @@ class MessagingError(Exception):
def pack_error(
exc: BaseException,
exc: BaseException|RemoteActorError,
tb: str|None = None,
cid: str|None = None,
@ -300,27 +487,54 @@ def pack_error(
else:
tb_str = traceback.format_exc()
our_uid: tuple = current_actor().uid
error_msg: dict[
str,
str | tuple[str, str]
] = {
'tb_str': tb_str,
'type_str': type(exc).__name__,
'boxed_type': type(exc).__name__,
'src_actor_uid': current_actor().uid,
'relay_uid': our_uid,
}
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if (
isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
isinstance(exc, RemoteActorError)
):
error_msg.update(exc.msgdata)
# an onion/inception we need to pack
if (
type(exc) is RemoteActorError
and (boxed := exc.boxed_type)
and boxed != RemoteActorError
):
# sanity on source error (if needed when tweaking this)
assert (src_type := exc.src_type) != RemoteActorError
assert error_msg['src_type_str'] != 'RemoteActorError'
assert error_msg['src_type_str'] == src_type.__name__
assert error_msg['src_uid'] != our_uid
# set the boxed type to be another boxed type thus
# creating an "inception" when unpacked by
# `unpack_error()` in another actor who gets "relayed"
# this error Bo
#
# NOTE on WHY: since we are re-boxing and already
# boxed src error, we want to overwrite the original
# `boxed_type_str` and instead set it to the type of
# the input `exc` type.
error_msg['boxed_type_str'] = 'RemoteActorError'
else:
error_msg['src_uid'] = our_uid
error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__
# XXX alawys append us the last relay in error propagation path
error_msg.setdefault(
'relay_path',
[],
).append(our_uid)
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
@ -329,7 +543,6 @@ def pack_error(
def unpack_error(
msg: dict[str, Any],
chan: Channel|None = None,
@ -357,35 +570,32 @@ def unpack_error(
# retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '')
message: str = f'{chan.uid}\n' + tb_str
type_name: str = (
error_dict.get('type_str')
or error_dict['boxed_type']
message: str = (
f'{chan.uid}\n'
+
tb_str
)
suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled':
# try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance.
boxed_type_str: str = error_dict['boxed_type_str']
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
if boxed_type_str == 'ContextCancelled':
box_type = ContextCancelled
suberror_type = box_type
assert boxed_type is box_type
else: # try to lookup a suitable local error type
for ns in [
builtins,
_this_mod,
trio,
]:
if suberror_type := getattr(
ns,
type_name,
False,
):
break
# TODO: already included by `_this_mod` in else loop right?
#
# we have an inception/onion-error so ensure
# we include the relay_path info and the
# original source error.
elif boxed_type_str == 'RemoteActorError':
assert boxed_type is RemoteActorError
assert len(error_dict['relay_path']) >= 1
exc = box_type(
message,
suberror_type=suberror_type,
# unpack other fields into error type init
**error_dict,
)
@ -501,6 +711,11 @@ def _raise_from_no_key_in_msg(
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
# in case there already is some underlying remote error
# that arrived which is probably the source of this stream
# closure
ctx.maybe_raise()
raise eoc from src_err
if (

View File

@ -273,7 +273,10 @@ async def _errors_relayed_via_ipc(
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception('Actor crashed:\n')
log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# always (try to) ship RPC errors back to caller
if is_rpc:
@ -613,7 +616,8 @@ async def _invoke(
# other side.
ctxc = ContextCancelled(
msg,
suberror_type=trio.Cancelled,
boxed_type=trio.Cancelled,
# boxed_type_str='Cancelled',
canceller=canceller,
)
# assign local error so that the `.outcome`
@ -671,7 +675,7 @@ async def _invoke(
f'`{repr(ctx.outcome)}`',
)
)
log.cancel(
log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n'
)
@ -704,12 +708,6 @@ async def try_ship_error_to_remote(
# TODO: special tb fmting for ctxc cases?
# tb=tb,
)
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# actor: Actor = _state.current_actor()
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things

View File

@ -136,7 +136,7 @@ class MsgStream(trio.abc.Channel):
# return await self.receive()
# except trio.EndOfChannel:
# raise StopAsyncIteration
#
# see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc:
@ -152,7 +152,6 @@ class MsgStream(trio.abc.Channel):
return msg['yield']
except KeyError as kerr:
# log.exception('GOT KEYERROR')
src_err = kerr
# NOTE: may raise any of the below error types
@ -166,30 +165,20 @@ class MsgStream(trio.abc.Channel):
stream=self,
)
# XXX: we close the stream on any of these error conditions:
# XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure
# by the rpc-runtime OR,
# - via a received `{'stop': ...}` msg from remote side.
# |_ NOTE: previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel inside
# `Actor._push_result()`, but now the 'stop' message handling
# has been put just above inside `_raise_from_no_key_in_msg()`.
except (
# trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
trio.EndOfChannel,
) as eoc:
# log.exception('GOT EOC')
src_err = eoc
self._eoc = eoc
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
# an ``EndOfChannel`` indicates either the internal recv
# memchan exhausted **or** we raisesd it just above after
# receiving a `stop` message from the far end of the stream.
# Previously this was triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()`` (should still be commented code
# there - which should eventually get removed), but now the
# 'stop' message handling has been put just above.
# TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically.
# Once we have broadcast support, we **don't** want to be
@ -210,8 +199,11 @@ class MsgStream(trio.abc.Channel):
# raise eoc
except trio.ClosedResourceError as cre: # by self._rx_chan
# log.exception('GOT CRE')
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
src_err = cre
log.warning(
'`Context._rx_chan` was already closed?'
@ -237,15 +229,30 @@ class MsgStream(trio.abc.Channel):
# over the end-of-stream connection error since likely
# the remote error was the source cause?
ctx: Context = self._ctx
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(
re,
ctx.maybe_raise(
raise_ctxc_from_self_call=True,
)
# propagate any error but hide low-level frames from
# caller by default.
if hide_tb:
# propagate any error but hide low-level frame details
# from the caller by default for debug noise reduction.
if (
hide_tb
# XXX NOTE XXX don't reraise on certain
# stream-specific internal error types like,
#
# - `trio.EoC` since we want to use the exact instance
# to ensure that it is the error that bubbles upward
# for silent absorption by `Context.open_stream()`.
and not self._eoc
# - `RemoteActorError` (or `ContextCancelled`) if it gets
# raised from `_raise_from_no_key_in_msg()` since we
# want the same (as the above bullet) for any
# `.open_context()` block bubbled error raised by
# any nearby ctx API remote-failures.
# and not isinstance(src_err, RemoteActorError)
):
raise type(src_err)(*src_err.args) from src_err
else:
raise src_err
@ -370,6 +377,10 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose()
if not self._eoc:
log.cancel(
'Stream closed before it received an EoC?\n'
'Setting eoc manually..\n..'
)
self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by {self._ctx.side}\n'
f'|_{self}\n'
@ -414,13 +425,11 @@ class MsgStream(trio.abc.Channel):
@property
def closed(self) -> bool:
if (
(rxc := self._rx_chan._closed)
or
(_closed := self._closed)
or
(_eoc := self._eoc)
):
rxc: bool = self._rx_chan._closed
_closed: bool|Exception = self._closed
_eoc: bool|trio.EndOfChannel = self._eoc
if rxc or _closed or _eoc:
log.runtime(
f'`MsgStream` is already closed\n'
f'{self}\n'
@ -496,7 +505,11 @@ class MsgStream(trio.abc.Channel):
'''
__tracebackhide__: bool = hide_tb
# raise any alreay known error immediately
self._ctx.maybe_raise()
if self._eoc:
raise self._eoc
if self._closed:
raise self._closed