Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet 668016d37b Absorb EoCs via `Context.open_stream()` silently
I swear long ago it used to operate this way but, I guess this finalizes
the design decision. It makes a lot more sense to *not* propagate any
`trio.EndOfChannel` raised from a `Context.open_stream() as stream:`
block when that EoC is due to graceful-explicit stream termination.
We use the EoC much like a `StopAsyncIteration` where the error
indicates termination of the stream due to either:
- reception of a stop IPC msg indicating the far end ended the stream
  (gracecfully),
- closure of the underlying `Context._recv_chan` either by the runtime
  or due to user code having called `MsgStream.aclose()`.

User code shouldn't expect to handle EoC outside the block since the
`@acm` having closed should indicate the exactly same lifetime state
(of said stream) ;)

Deats:
- add special EoC handler in `.open_stream()` which silently "absorbs"
  the error only when the stream is already marked as closed (meaning
  the EoC indeed corresponds to IPC closure) with an assert for now
  ensuring the error is the same as set to `MsgStream._eoc`.
- in `MsgStream.receive()` break up the handlers for EoC and
  `trio.ClosedResourceError` since the error instances are saved to
  different variables and we **don't** want to rewrite the exception in
  the eoc case (normally to mask `trio` internals in tbs) bc we need the
  instance to be the exact one for doing checks inside
  `.open_stream().__aexit__()` to absorb it.

Other surrounding "improvements":
- start using the new `Context.maybe_raise()` helper where it can easily
  replace existing equivalent block-sections.
- use new `RemoteActorError.src_uid` as required.
2024-03-19 18:40:50 -04:00
Tyler Goodlet 9221c57234 Adjust all `RemoteActorError.type` using tests
To instead use the new `.boxed_type` B)
2024-03-19 18:08:54 -04:00
Tyler Goodlet 78434f6317 Fix `.boxed_type` facepalm, drop `.src_actor_uid`
The misname of `._boxed_type` as `._src_type` was only manifesting as
a reallly strange boxing error with a packed exception-group, not sure
how or why only that but it's fixed now XD

Start refining/cleaning out stuff for sure we don't need (based on
multiple local test runs):

- discard `.src_actor_uid` fully since test set has been moved over to
  `.src_uid`; this means also removing the `.msgdata` insertion from
  `pack_error()`; a patch to all internals is coming next obvi!

- don't pass `boxed_type` to `RemoteActorError.__init__()` from
  `unpack_error()` since it's now set directly via the
  `.msgdata["boxed_type_str"]`/`error_msg: dict` input , but in the case
  where **it is passed as an arg** (only for ctxc in `._rpc._invoke()`
  rn) make sure we only do the `.__init__()` insert when `boxed_type is
  not None`.
2024-03-19 14:20:59 -04:00
Tyler Goodlet 5fb5682269 First try "relayed boxed errors", or "inceptions"
Since adding more complex inter-peer (actor) testing scenarios, we
definitely have an immediate need for `trio`'s style of "inceptions" but
for nesting `RemoteActorError`s as they're relayed through multiple
actor-IPC hops. So for example, a remote error relayed "through" some
proxy actor to another ends up packing a `RemoteActorError` into another
one such that there are 2 layers of RAEs with the first
containing/boxing an original src actor error (type).

In support of this extension to `RemoteActorError` we add:

- `get_err_type()` error type resolver helper (factored fromthe
  body of `unpack_error()`) to be used whenever rendering
  `.src_type`/`.boxed_type`.

- `.src_type_str: str` which is pulled from `.msgdata` and holds the
  above (eventually when unpacked) type as `str`.
- `._src_type: BaseException|None` for the original
  "source" actor's error as unpacked in any remote (actor's) env and
  exposed as a readonly property `.src_type`.

- `.boxed_type_str: str` the same as above but for the "last" boxed
  error's type; when the RAE is unpacked at its first hop this will
  be **the same as** `.src_type_str`.
- `._boxed_type: BaseException` which now similarly should be "rendered"
  from the below type-`str` field instead of passed in as a error-type
  via `boxed_type` (though we still do for the ctxc case atm, see
  notes).
 |_ new sanity checks in `.__init__()` mostly as a reminder to handle
   that ^ ctxc case ^ more elegantly at some point..
 |_ obvi we discard the previous `suberror_type` input arg.

- fully remove the `.type`/`.type_str` properties instead expecting
  usage of `.boxed_/.src_` equivalents.
- start deprecation of `.src_actor_uid` and make it delegate to new
  `.src_uid`
- add `.relay_uid` propery for the last relay/hop's actor uid.
- add `.relay_path: list[str]` which holds the per-hop updated sequence
  of relay actor uid's which consecutively did boxing of an RAE.
- only include `.src_uid` and `.relay_path` in reprol() output.
- factor field-to-str rendering into a new `_mk_fields_str()`
  and use it in `.__repr__()`/`.reprol()`.
- add an `.unwrap()` to (attempt to) render the src error.

- rework `pack_error()` to handle inceptions including,
  - packing the correct field-values for the new `boxed_type_str`, `relay_uid`,
    `src_uid`, `src_type_str`.
  - always updating the `relay_path` sequence with the uid of the
    current actor.

- adjust `unpack_error()` to match all these changes,
  - pulling `boxed_type_str` and passing any resolved `boxed_type` to
    `RemoteActorError.__init__()`.
  - use the new `Context.maybe_raise()` convenience method.

Adjust `._rpc` packing to `ContextCancelled(boxed_type=trio.Cancelled)`
and tweak some more log msg formats.
2024-03-18 14:28:24 -04:00
10 changed files with 437 additions and 184 deletions

View File

@ -32,7 +32,7 @@ async def main():
try: try:
await p1.run(name_error) await p1.run(name_error)
except tractor.RemoteActorError as rae: except tractor.RemoteActorError as rae:
assert rae.type is NameError assert rae.boxed_type is NameError
async for i in stream: async for i in stream:

View File

@ -77,7 +77,7 @@ def test_remote_error(reg_addr, args_err):
# of this actor nursery. # of this actor nursery.
await portal.result() await portal.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == errtype assert err.boxed_type == errtype
print("Look Maa that actor failed hard, hehh") print("Look Maa that actor failed hard, hehh")
raise raise
@ -86,7 +86,7 @@ def test_remote_error(reg_addr, args_err):
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.type == errtype assert excinfo.value.boxed_type == errtype
else: else:
# the root task will also error on the `.result()` call # the root task will also error on the `.result()` call
@ -96,7 +96,7 @@ def test_remote_error(reg_addr, args_err):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.type == errtype assert exc.boxed_type == errtype
def test_multierror(reg_addr): def test_multierror(reg_addr):
@ -117,7 +117,7 @@ def test_multierror(reg_addr):
try: try:
await portal2.result() await portal2.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == AssertionError assert err.boxed_type == AssertionError
print("Look Maa that first actor failed hard, hehh") print("Look Maa that first actor failed hard, hehh")
raise raise
@ -169,7 +169,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions: for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.type == AssertionError assert exc.boxed_type == AssertionError
async def do_nothing(): async def do_nothing():
@ -310,7 +310,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
await portal.run(func, **kwargs) await portal.run(func, **kwargs)
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == err_type assert err.boxed_type == err_type
# we only expect this first error to propogate # we only expect this first error to propogate
# (all other daemons are cancelled before they # (all other daemons are cancelled before they
# can be scheduled) # can be scheduled)
@ -329,11 +329,11 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError): if isinstance(exc, tractor.RemoteActorError):
assert exc.type == err_type assert exc.boxed_type == err_type
else: else:
assert isinstance(exc, trio.Cancelled) assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError): elif isinstance(err, tractor.RemoteActorError):
assert err.type == err_type assert err.boxed_type == err_type
assert n.cancelled is True assert n.cancelled is True
assert not n._children assert not n._children
@ -412,7 +412,7 @@ async def test_nested_multierrors(loglevel, start_method):
elif isinstance(subexc, tractor.RemoteActorError): elif isinstance(subexc, tractor.RemoteActorError):
# on windows it seems we can't exactly be sure wtf # on windows it seems we can't exactly be sure wtf
# will happen.. # will happen..
assert subexc.type in ( assert subexc.boxed_type in (
tractor.RemoteActorError, tractor.RemoteActorError,
trio.Cancelled, trio.Cancelled,
BaseExceptionGroup, BaseExceptionGroup,
@ -422,7 +422,7 @@ async def test_nested_multierrors(loglevel, start_method):
for subsub in subexc.exceptions: for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,): if subsub in (tractor.RemoteActorError,):
subsub = subsub.type subsub = subsub.boxed_type
assert type(subsub) in ( assert type(subsub) in (
trio.Cancelled, trio.Cancelled,
@ -437,16 +437,16 @@ async def test_nested_multierrors(loglevel, start_method):
# we get back the (sent) cancel signal instead # we get back the (sent) cancel signal instead
if is_win(): if is_win():
if isinstance(subexc, tractor.RemoteActorError): if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in ( assert subexc.boxed_type in (
BaseExceptionGroup, BaseExceptionGroup,
tractor.RemoteActorError tractor.RemoteActorError
) )
else: else:
assert isinstance(subexc, BaseExceptionGroup) assert isinstance(subexc, BaseExceptionGroup)
else: else:
assert subexc.type is ExceptionGroup assert subexc.boxed_type is ExceptionGroup
else: else:
assert subexc.type in ( assert subexc.boxed_type in (
tractor.RemoteActorError, tractor.RemoteActorError,
trio.Cancelled trio.Cancelled
) )

View File

@ -171,4 +171,4 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio(
# verify boxed error # verify boxed error
err = excinfo.value err = excinfo.value
assert isinstance(err.type(), NameError) assert err.boxed_type is NameError

View File

@ -795,7 +795,7 @@ async def test_callee_cancels_before_started(
# raises a special cancel signal # raises a special cancel signal
except tractor.ContextCancelled as ce: except tractor.ContextCancelled as ce:
ce.type == trio.Cancelled ce.boxed_type == trio.Cancelled
# the traceback should be informative # the traceback should be informative
assert 'itself' in ce.msgdata['tb_str'] assert 'itself' in ce.msgdata['tb_str']
@ -903,7 +903,7 @@ def test_one_end_stream_not_opened(
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.type == StreamOverrun assert excinfo.value.boxed_type == StreamOverrun
elif overrunner == 'callee': elif overrunner == 'callee':
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
@ -912,7 +912,7 @@ def test_one_end_stream_not_opened(
# TODO: embedded remote errors so that we can verify the source # TODO: embedded remote errors so that we can verify the source
# error? the callee delivers an error which is an overrun # error? the callee delivers an error which is an overrun
# wrapped in a remote actor error. # wrapped in a remote actor error.
assert excinfo.value.type == tractor.RemoteActorError assert excinfo.value.boxed_type == tractor.RemoteActorError
else: else:
trio.run(main) trio.run(main)
@ -1131,7 +1131,7 @@ def test_maybe_allow_overruns_stream(
# NOTE: i tried to isolate to a deterministic case here # NOTE: i tried to isolate to a deterministic case here
# based on timeing, but i was kinda wasted, and i don't # based on timeing, but i was kinda wasted, and i don't
# think it's sane to catch them.. # think it's sane to catch them..
assert err.type in ( assert err.boxed_type in (
tractor.RemoteActorError, tractor.RemoteActorError,
StreamOverrun, StreamOverrun,
) )
@ -1139,10 +1139,10 @@ def test_maybe_allow_overruns_stream(
elif ( elif (
slow_side == 'child' slow_side == 'child'
): ):
assert err.type == StreamOverrun assert err.boxed_type == StreamOverrun
elif slow_side == 'parent': elif slow_side == 'parent':
assert err.type == tractor.RemoteActorError assert err.boxed_type == tractor.RemoteActorError
assert 'StreamOverrun' in err.msgdata['tb_str'] assert 'StreamOverrun' in err.msgdata['tb_str']
else: else:

View File

@ -128,7 +128,7 @@ def test_aio_simple_error(reg_addr):
assert err assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.type == AssertionError assert err.boxed_type == AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(reg_addr):
@ -272,7 +272,7 @@ def test_context_spawns_aio_task_that_errors(
err = excinfo.value err = excinfo.value
assert isinstance(err, expect) assert isinstance(err, expect)
assert err.type == AssertionError assert err.boxed_type == AssertionError
async def aio_cancel(): async def aio_cancel():
@ -314,7 +314,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
assert err assert err
# ensure boxed error is correct # ensure boxed error is correct
assert err.type == to_asyncio.AsyncioCancelled assert err.boxed_type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this.. # TODO: verify open_channel_from will fail on this..
@ -466,7 +466,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.type == Exception assert exc.boxed_type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr): def test_trio_closes_early_and_channel_exits(reg_addr):
@ -500,7 +500,7 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.type == Exception assert exc.boxed_type == Exception
@tractor.context @tractor.context

View File

@ -36,7 +36,7 @@ async def sleep_back_actor(
if not exposed_mods: if not exposed_mods:
expect = tractor.ModuleNotExposed expect = tractor.ModuleNotExposed
assert err.type is expect assert err.boxed_type is expect
raise raise
else: else:
await trio.sleep(float('inf')) await trio.sleep(float('inf'))
@ -150,4 +150,4 @@ def test_rpc_errors(
)) ))
if getattr(value, 'type', None): if getattr(value, 'type', None):
assert value.type is inside_err assert value.boxed_type is inside_err

View File

@ -169,8 +169,7 @@ async def _drain_to_final_msg(
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
# cancellation. # cancellation.
if re := ctx._remote_error: ctx.maybe_raise()
ctx._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply # CASE 1: we DID request the cancel we simply
# continue to bubble up as normal. # continue to bubble up as normal.
@ -257,6 +256,13 @@ async def _drain_to_final_msg(
) )
# XXX fallthrough to handle expected error XXX # XXX fallthrough to handle expected error XXX
# TODO: replace this with `ctx.maybe_raise()`
#
# TODO: would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
re: Exception|None = ctx._remote_error re: Exception|None = ctx._remote_error
if re: if re:
log.critical( log.critical(
@ -595,7 +601,7 @@ class Context:
if not re: if not re:
return False return False
if from_uid := re.src_actor_uid: if from_uid := re.src_uid:
from_uid: tuple = tuple(from_uid) from_uid: tuple = tuple(from_uid)
our_uid: tuple = self._actor.uid our_uid: tuple = self._actor.uid
@ -825,7 +831,7 @@ class Context:
# cancellation. # cancellation.
maybe_error_src: tuple = getattr( maybe_error_src: tuple = getattr(
error, error,
'src_actor_uid', 'src_uid',
None, None,
) )
self._canceller = ( self._canceller = (
@ -1030,8 +1036,8 @@ class Context:
@acm @acm
async def open_stream( async def open_stream(
self, self,
allow_overruns: bool | None = False, allow_overruns: bool|None = False,
msg_buffer_size: int | None = None, msg_buffer_size: int|None = None,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
''' '''
@ -1071,13 +1077,16 @@ class Context:
# absorbed there (silently) and we DO NOT want to # absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already # actually try to stream - a cancel msg was already
# sent to the other side! # sent to the other side!
if self._remote_error: self.maybe_raise(
# NOTE: this is diff then calling raise_ctxc_from_self_call=True,
# `._maybe_raise_remote_err()` specifically )
# because any task entering this `.open_stream()` # NOTE: this is diff then calling
# AFTER cancellation has already been requested, # `._maybe_raise_remote_err()` specifically
# we DO NOT want to absorb any ctxc ACK silently! # because we want to raise a ctxc on any task entering this `.open_stream()`
raise self._remote_error # AFTER cancellation was already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
# if self._remote_error:
# raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded # XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different # back from the other side (yet), we raise a different
@ -1158,7 +1167,6 @@ class Context:
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield stream yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside
@ -1183,12 +1191,23 @@ class Context:
# #
# await stream.aclose() # await stream.aclose()
# if re := ctx._remote_error: # NOTE: absorb and do not raise any
# ctx._maybe_raise_remote_err( # EoC received from the other side such that
# re, # it is not raised inside the surrounding
# raise_ctxc_from_self_call=True, # context block's scope!
# ) except trio.EndOfChannel as eoc:
# await trio.lowlevel.checkpoint() if (
eoc
and stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
# from .devx import pause
# await pause()
log.warning(
'Stream was terminated by EoC\n\n'
f'{repr(eoc)}\n'
)
finally: finally:
if self._portal: if self._portal:
@ -1204,7 +1223,6 @@ class Context:
# TODO: replace all the instances of this!! XD # TODO: replace all the instances of this!! XD
def maybe_raise( def maybe_raise(
self, self,
hide_tb: bool = True, hide_tb: bool = True,
**kwargs, **kwargs,
@ -1388,33 +1406,41 @@ class Context:
f'{drained_msgs}' f'{drained_msgs}'
) )
if ( self.maybe_raise(
(re := self._remote_error) raise_overrun_from_self=(
# and self._result == res_placeholder raise_overrun
): and
self._maybe_raise_remote_err( # only when we ARE NOT the canceller
re, # should we raise overruns, bc ow we're
# NOTE: obvi we don't care if we # raising something we know might happen
# overran the far end if we're already # during cancellation ;)
# waiting on a final result (msg). (not self._cancel_called)
# raise_overrun_from_self=False,
raise_overrun_from_self=(
raise_overrun
and
# only when we ARE NOT the canceller
# should we raise overruns, bc ow we're
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
) )
)
# if (
# (re := self._remote_error)
# # and self._result == res_placeholder
# ):
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=(
# raise_overrun
# and
# # only when we ARE NOT the canceller
# # should we raise overruns, bc ow we're
# # raising something we know might happen
# # during cancellation ;)
# (not self._cancel_called)
# ),
# )
# if maybe_err: # if maybe_err:
# self._result = maybe_err # self._result = maybe_err
return self.outcome return self.outcome
# None if self._result == res_placeholder
# else self._result
# )
# TODO: switch this with above which should be named # TODO: switch this with above which should be named
# `.wait_for_outcome()` and instead do # `.wait_for_outcome()` and instead do
@ -1863,8 +1889,9 @@ async def open_context_from_portal(
# TODO: if we set this the wrapping `@acm` body will # TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally # still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show? # we can similarly annotate that frame to NOT show? for now
hide_tb: bool = True, # we DO SHOW this frame since it's awkward ow..
hide_tb: bool = False,
# proxied to RPC # proxied to RPC
**kwargs, **kwargs,

View File

@ -58,16 +58,44 @@ class InternalError(RuntimeError):
''' '''
_body_fields: list[str] = [ _body_fields: list[str] = [
'src_actor_uid', 'boxed_type',
'src_type',
# TODO: format this better if we're going to include it.
# 'relay_path',
'src_uid',
# only in sub-types
'canceller', 'canceller',
'sender', 'sender',
] ]
_msgdata_keys: list[str] = [ _msgdata_keys: list[str] = [
'type_str', 'boxed_type_str',
] + _body_fields ] + _body_fields
def get_err_type(type_name: str) -> BaseException|None:
'''
Look up an exception type by name from the set of locally
known namespaces:
- `builtins`
- `tractor._exceptions`
- `trio`
'''
for ns in [
builtins,
_this_mod,
trio,
]:
if type_ref := getattr(
ns,
type_name,
False,
):
return type_ref
# TODO: rename to just `RemoteError`? # TODO: rename to just `RemoteError`?
class RemoteActorError(Exception): class RemoteActorError(Exception):
@ -81,13 +109,15 @@ class RemoteActorError(Exception):
''' '''
reprol_fields: list[str] = [ reprol_fields: list[str] = [
'src_actor_uid', 'src_uid',
'relay_path',
# 'relay_uid',
] ]
def __init__( def __init__(
self, self,
message: str, message: str,
suberror_type: Type[BaseException] | None = None, boxed_type: Type[BaseException]|None = None,
**msgdata **msgdata
) -> None: ) -> None:
@ -101,20 +131,112 @@ class RemoteActorError(Exception):
# - .remote_type # - .remote_type
# also pertains to our long long oustanding issue XD # also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5 # https://github.com/goodboy/tractor/issues/5
self.boxed_type: str = suberror_type #
# TODO: always set ._boxed_type` as `None` by default
# and instead render if from `.boxed_type_str`?
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
self.msgdata: dict[str, Any] = msgdata self.msgdata: dict[str, Any] = msgdata
@property # TODO: mask out eventually or place in `pack_error()`
def type(self) -> str: # pre-`return` lines?
return self.boxed_type # sanity on inceptions
if boxed_type is RemoteActorError:
assert self.src_type_str != 'RemoteActorError'
assert self.src_uid not in self.relay_path
# ensure type-str matches and round-tripping from that
# str results in same error type.
#
# TODO NOTE: this is currently exclusively for the
# `ContextCancelled(boxed_type=trio.Cancelled)` case as is
# used inside `._rpc._invoke()` atm though probably we
# should better emphasize that special (one off?) case
# either by customizing `ContextCancelled.__init__()` or
# through a special factor func?
elif boxed_type:
if not self.msgdata.get('boxed_type_str'):
self.msgdata['boxed_type_str'] = str(
type(boxed_type).__name__
)
assert self.boxed_type_str == self.msgdata['boxed_type_str']
assert self.boxed_type is boxed_type
@property @property
def type_str(self) -> str: def src_type_str(self) -> str:
return str(type(self.boxed_type).__name__) '''
String-name of the source error's type.
This should be the same as `.boxed_type_str` when unpacked
at the first relay/hop's receiving actor.
'''
return self.msgdata['src_type_str']
@property @property
def src_actor_uid(self) -> tuple[str, str]|None: def src_type(self) -> str:
return self.msgdata.get('src_actor_uid') '''
Error type raised by original remote faulting actor.
'''
if self._src_type is None:
self._src_type = get_err_type(
self.msgdata['src_type_str']
)
return self._src_type
@property
def boxed_type_str(self) -> str:
'''
String-name of the (last hop's) boxed error type.
'''
return self.msgdata['boxed_type_str']
@property
def boxed_type(self) -> str:
'''
Error type boxed by last actor IPC hop.
'''
if self._boxed_type is None:
self._boxed_type = get_err_type(
self.msgdata['boxed_type_str']
)
return self._boxed_type
@property
def relay_path(self) -> list[tuple]:
'''
Return the list of actors which consecutively relayed
a boxed `RemoteActorError` the src error up until THIS
actor's hop.
NOTE: a `list` field with the same name is expected to be
passed/updated in `.msgdata`.
'''
return self.msgdata['relay_path']
@property
def relay_uid(self) -> tuple[str, str]|None:
return tuple(
self.msgdata['relay_path'][-1]
)
@property
def src_uid(self) -> tuple[str, str]|None:
if src_uid := (
self.msgdata.get('src_uid')
):
return tuple(src_uid)
# TODO: use path lookup instead?
# return tuple(
# self.msgdata['relay_path'][0]
# )
@property @property
def tb_str( def tb_str(
@ -129,28 +251,56 @@ class RemoteActorError(Exception):
return '' return ''
def _mk_fields_str(
self,
fields: list[str],
end_char: str = '\n',
) -> str:
_repr: str = ''
for key in fields:
val: Any|None = (
getattr(self, key, None)
or
self.msgdata.get(key)
)
# TODO: for `.relay_path` on multiline?
# if not isinstance(val, str):
# val_str = pformat(val)
# else:
val_str: str = repr(val)
if val:
_repr += f'{key}={val_str}{end_char}'
return _repr
def reprol(self) -> str: def reprol(self) -> str:
''' '''
Represent this error for "one line" display, like in Represent this error for "one line" display, like in
a field of our `Context.__repr__()` output. a field of our `Context.__repr__()` output.
''' '''
_repr: str = f'{type(self).__name__}(' # TODO: use this matryoshka emjoi XD
for key in self.reprol_fields: # => 🪆
val: Any|None = self.msgdata.get(key) reprol_str: str = f'{type(self).__name__}('
if val: _repr: str = self._mk_fields_str(
_repr += f'{key}={repr(val)} ' self.reprol_fields,
end_char=' ',
return _repr )
return (
reprol_str
+
_repr
)
def __repr__(self) -> str: def __repr__(self) -> str:
'''
Nicely formatted boxed error meta data + traceback.
fields: str = '' '''
for key in _body_fields: fields: str = self._mk_fields_str(
val: str|None = self.msgdata.get(key) _body_fields,
if val: )
fields += f'{key}={val}\n'
fields: str = textwrap.indent( fields: str = textwrap.indent(
fields, fields,
# prefix=' '*2, # prefix=' '*2,
@ -165,8 +315,6 @@ class RemoteActorError(Exception):
f' ------ - ------\n' f' ------ - ------\n'
f' _|\n' f' _|\n'
) )
# f'|\n'
# f' |\n'
if indent: if indent:
body: str = textwrap.indent( body: str = textwrap.indent(
body, body,
@ -178,9 +326,47 @@ class RemoteActorError(Exception):
')>' ')>'
) )
# TODO: local recontruction of remote exception deats def unwrap(
self,
) -> BaseException:
'''
Unpack the inner-most source error from it's original IPC msg data.
We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the
failing actor's remote env.
'''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611
return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given
# "hop" / relay-node in this error's relay_path?
# => so would render a `RAE[RAE[RAE[Exception]]]` instance
# with all inner errors unpacked?
# -[ ] if this is useful shouldn't be too hard to impl right?
# def unbox(self) -> BaseException: # def unbox(self) -> BaseException:
# ... # '''
# Unbox to the prior relays (aka last boxing actor's)
# inner error.
# '''
# if not self.relay_path:
# return self.unwrap()
# # TODO..
# # return self.boxed_type(
# # boxed_type=get_type_ref(..
# raise NotImplementedError
class InternalActorError(RemoteActorError): class InternalActorError(RemoteActorError):
@ -232,7 +418,7 @@ class ContextCancelled(RemoteActorError):
f'{self}' f'{self}'
) )
# to make `.__repr__()` work uniformly # TODO: to make `.__repr__()` work uniformly?
# src_actor_uid = canceller # src_actor_uid = canceller
@ -283,7 +469,8 @@ class MessagingError(Exception):
def pack_error( def pack_error(
exc: BaseException, exc: BaseException|RemoteActorError,
tb: str|None = None, tb: str|None = None,
cid: str|None = None, cid: str|None = None,
@ -300,27 +487,54 @@ def pack_error(
else: else:
tb_str = traceback.format_exc() tb_str = traceback.format_exc()
our_uid: tuple = current_actor().uid
error_msg: dict[ error_msg: dict[
str, str,
str | tuple[str, str] str | tuple[str, str]
] = { ] = {
'tb_str': tb_str, 'tb_str': tb_str,
'type_str': type(exc).__name__, 'relay_uid': our_uid,
'boxed_type': type(exc).__name__,
'src_actor_uid': current_actor().uid,
} }
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if ( if (
isinstance(exc, ContextCancelled) isinstance(exc, RemoteActorError)
or isinstance(exc, StreamOverrun)
): ):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
# an onion/inception we need to pack
if (
type(exc) is RemoteActorError
and (boxed := exc.boxed_type)
and boxed != RemoteActorError
):
# sanity on source error (if needed when tweaking this)
assert (src_type := exc.src_type) != RemoteActorError
assert error_msg['src_type_str'] != 'RemoteActorError'
assert error_msg['src_type_str'] == src_type.__name__
assert error_msg['src_uid'] != our_uid
# set the boxed type to be another boxed type thus
# creating an "inception" when unpacked by
# `unpack_error()` in another actor who gets "relayed"
# this error Bo
#
# NOTE on WHY: since we are re-boxing and already
# boxed src error, we want to overwrite the original
# `boxed_type_str` and instead set it to the type of
# the input `exc` type.
error_msg['boxed_type_str'] = 'RemoteActorError'
else:
error_msg['src_uid'] = our_uid
error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__
# XXX alawys append us the last relay in error propagation path
error_msg.setdefault(
'relay_path',
[],
).append(our_uid)
pkt: dict = {'error': error_msg} pkt: dict = {'error': error_msg}
if cid: if cid:
pkt['cid'] = cid pkt['cid'] = cid
@ -329,7 +543,6 @@ def pack_error(
def unpack_error( def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan: Channel|None = None, chan: Channel|None = None,
@ -357,35 +570,32 @@ def unpack_error(
# retrieve the remote error's msg encoded details # retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '') tb_str: str = error_dict.get('tb_str', '')
message: str = f'{chan.uid}\n' + tb_str message: str = (
type_name: str = ( f'{chan.uid}\n'
error_dict.get('type_str') +
or error_dict['boxed_type'] tb_str
) )
suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled': # try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance.
boxed_type_str: str = error_dict['boxed_type_str']
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
if boxed_type_str == 'ContextCancelled':
box_type = ContextCancelled box_type = ContextCancelled
suberror_type = box_type assert boxed_type is box_type
else: # try to lookup a suitable local error type # TODO: already included by `_this_mod` in else loop right?
for ns in [ #
builtins, # we have an inception/onion-error so ensure
_this_mod, # we include the relay_path info and the
trio, # original source error.
]: elif boxed_type_str == 'RemoteActorError':
if suberror_type := getattr( assert boxed_type is RemoteActorError
ns, assert len(error_dict['relay_path']) >= 1
type_name,
False,
):
break
exc = box_type( exc = box_type(
message, message,
suberror_type=suberror_type,
# unpack other fields into error type init
**error_dict, **error_dict,
) )
@ -501,6 +711,11 @@ def _raise_from_no_key_in_msg(
# destined for the `Context.result()` call during ctx-exit! # destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc stream._eoc: Exception = eoc
# in case there already is some underlying remote error
# that arrived which is probably the source of this stream
# closure
ctx.maybe_raise()
raise eoc from src_err raise eoc from src_err
if ( if (

View File

@ -273,7 +273,10 @@ async def _errors_relayed_via_ipc(
entered_debug = await _debug._maybe_enter_pm(err) entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug: if not entered_debug:
log.exception('Actor crashed:\n') log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# always (try to) ship RPC errors back to caller # always (try to) ship RPC errors back to caller
if is_rpc: if is_rpc:
@ -613,7 +616,8 @@ async def _invoke(
# other side. # other side.
ctxc = ContextCancelled( ctxc = ContextCancelled(
msg, msg,
suberror_type=trio.Cancelled, boxed_type=trio.Cancelled,
# boxed_type_str='Cancelled',
canceller=canceller, canceller=canceller,
) )
# assign local error so that the `.outcome` # assign local error so that the `.outcome`
@ -671,7 +675,7 @@ async def _invoke(
f'`{repr(ctx.outcome)}`', f'`{repr(ctx.outcome)}`',
) )
) )
log.cancel( log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n' f'{ctx}\n'
) )
@ -704,12 +708,6 @@ async def try_ship_error_to_remote(
# TODO: special tb fmting for ctxc cases? # TODO: special tb fmting for ctxc cases?
# tb=tb, # tb=tb,
) )
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# actor: Actor = _state.current_actor()
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
await channel.send(msg) await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things # XXX NOTE XXX in SC terms this is one of the worst things

View File

@ -136,7 +136,7 @@ class MsgStream(trio.abc.Channel):
# return await self.receive() # return await self.receive()
# except trio.EndOfChannel: # except trio.EndOfChannel:
# raise StopAsyncIteration # raise StopAsyncIteration
#
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
# introducing this # introducing this
if self._eoc: if self._eoc:
@ -152,7 +152,6 @@ class MsgStream(trio.abc.Channel):
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
# log.exception('GOT KEYERROR')
src_err = kerr src_err = kerr
# NOTE: may raise any of the below error types # NOTE: may raise any of the below error types
@ -166,30 +165,20 @@ class MsgStream(trio.abc.Channel):
stream=self, stream=self,
) )
# XXX: we close the stream on any of these error conditions: # XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure
# by the rpc-runtime OR,
# - via a received `{'stop': ...}` msg from remote side.
# |_ NOTE: previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel inside
# `Actor._push_result()`, but now the 'stop' message handling
# has been put just above inside `_raise_from_no_key_in_msg()`.
except ( except (
# trio.ClosedResourceError, # by self._rx_chan trio.EndOfChannel,
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
) as eoc: ) as eoc:
# log.exception('GOT EOC')
src_err = eoc src_err = eoc
self._eoc = eoc self._eoc = eoc
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
# an ``EndOfChannel`` indicates either the internal recv
# memchan exhausted **or** we raisesd it just above after
# receiving a `stop` message from the far end of the stream.
# Previously this was triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()`` (should still be commented code
# there - which should eventually get removed), but now the
# 'stop' message handling has been put just above.
# TODO: Locally, we want to close this stream gracefully, by # TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically. # terminating any local consumers tasks deterministically.
# Once we have broadcast support, we **don't** want to be # Once we have broadcast support, we **don't** want to be
@ -210,8 +199,11 @@ class MsgStream(trio.abc.Channel):
# raise eoc # raise eoc
except trio.ClosedResourceError as cre: # by self._rx_chan # a ``ClosedResourceError`` indicates that the internal
# log.exception('GOT CRE') # feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
src_err = cre src_err = cre
log.warning( log.warning(
'`Context._rx_chan` was already closed?' '`Context._rx_chan` was already closed?'
@ -237,15 +229,30 @@ class MsgStream(trio.abc.Channel):
# over the end-of-stream connection error since likely # over the end-of-stream connection error since likely
# the remote error was the source cause? # the remote error was the source cause?
ctx: Context = self._ctx ctx: Context = self._ctx
if re := ctx._remote_error: ctx.maybe_raise(
ctx._maybe_raise_remote_err( raise_ctxc_from_self_call=True,
re, )
raise_ctxc_from_self_call=True,
)
# propagate any error but hide low-level frames from # propagate any error but hide low-level frame details
# caller by default. # from the caller by default for debug noise reduction.
if hide_tb: if (
hide_tb
# XXX NOTE XXX don't reraise on certain
# stream-specific internal error types like,
#
# - `trio.EoC` since we want to use the exact instance
# to ensure that it is the error that bubbles upward
# for silent absorption by `Context.open_stream()`.
and not self._eoc
# - `RemoteActorError` (or `ContextCancelled`) if it gets
# raised from `_raise_from_no_key_in_msg()` since we
# want the same (as the above bullet) for any
# `.open_context()` block bubbled error raised by
# any nearby ctx API remote-failures.
# and not isinstance(src_err, RemoteActorError)
):
raise type(src_err)(*src_err.args) from src_err raise type(src_err)(*src_err.args) from src_err
else: else:
raise src_err raise src_err
@ -370,6 +377,10 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose() # await rx_chan.aclose()
if not self._eoc: if not self._eoc:
log.cancel(
'Stream closed before it received an EoC?\n'
'Setting eoc manually..\n..'
)
self._eoc: bool = trio.EndOfChannel( self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by {self._ctx.side}\n' f'Context stream closed by {self._ctx.side}\n'
f'|_{self}\n' f'|_{self}\n'
@ -414,13 +425,11 @@ class MsgStream(trio.abc.Channel):
@property @property
def closed(self) -> bool: def closed(self) -> bool:
if (
(rxc := self._rx_chan._closed) rxc: bool = self._rx_chan._closed
or _closed: bool|Exception = self._closed
(_closed := self._closed) _eoc: bool|trio.EndOfChannel = self._eoc
or if rxc or _closed or _eoc:
(_eoc := self._eoc)
):
log.runtime( log.runtime(
f'`MsgStream` is already closed\n' f'`MsgStream` is already closed\n'
f'{self}\n' f'{self}\n'
@ -496,7 +505,11 @@ class MsgStream(trio.abc.Channel):
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# raise any alreay known error immediately
self._ctx.maybe_raise() self._ctx.maybe_raise()
if self._eoc:
raise self._eoc
if self._closed: if self._closed:
raise self._closed raise self._closed