Compare commits

..

No commits in common. "668016d37b89d3e140d45e11914d606bc371f4f0" and "71de56b09a73dcaf14d5222f49bc68bedcfcb4c9" have entirely different histories.

10 changed files with 184 additions and 437 deletions

View File

@ -32,7 +32,7 @@ async def main():
try: try:
await p1.run(name_error) await p1.run(name_error)
except tractor.RemoteActorError as rae: except tractor.RemoteActorError as rae:
assert rae.boxed_type is NameError assert rae.type is NameError
async for i in stream: async for i in stream:

View File

@ -77,7 +77,7 @@ def test_remote_error(reg_addr, args_err):
# of this actor nursery. # of this actor nursery.
await portal.result() await portal.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.boxed_type == errtype assert err.type == errtype
print("Look Maa that actor failed hard, hehh") print("Look Maa that actor failed hard, hehh")
raise raise
@ -86,7 +86,7 @@ def test_remote_error(reg_addr, args_err):
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.boxed_type == errtype assert excinfo.value.type == errtype
else: else:
# the root task will also error on the `.result()` call # the root task will also error on the `.result()` call
@ -96,7 +96,7 @@ def test_remote_error(reg_addr, args_err):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.boxed_type == errtype assert exc.type == errtype
def test_multierror(reg_addr): def test_multierror(reg_addr):
@ -117,7 +117,7 @@ def test_multierror(reg_addr):
try: try:
await portal2.result() await portal2.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.boxed_type == AssertionError assert err.type == AssertionError
print("Look Maa that first actor failed hard, hehh") print("Look Maa that first actor failed hard, hehh")
raise raise
@ -169,7 +169,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions: for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.boxed_type == AssertionError assert exc.type == AssertionError
async def do_nothing(): async def do_nothing():
@ -310,7 +310,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
await portal.run(func, **kwargs) await portal.run(func, **kwargs)
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.boxed_type == err_type assert err.type == err_type
# we only expect this first error to propogate # we only expect this first error to propogate
# (all other daemons are cancelled before they # (all other daemons are cancelled before they
# can be scheduled) # can be scheduled)
@ -329,11 +329,11 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError): if isinstance(exc, tractor.RemoteActorError):
assert exc.boxed_type == err_type assert exc.type == err_type
else: else:
assert isinstance(exc, trio.Cancelled) assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError): elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type assert err.type == err_type
assert n.cancelled is True assert n.cancelled is True
assert not n._children assert not n._children
@ -412,7 +412,7 @@ async def test_nested_multierrors(loglevel, start_method):
elif isinstance(subexc, tractor.RemoteActorError): elif isinstance(subexc, tractor.RemoteActorError):
# on windows it seems we can't exactly be sure wtf # on windows it seems we can't exactly be sure wtf
# will happen.. # will happen..
assert subexc.boxed_type in ( assert subexc.type in (
tractor.RemoteActorError, tractor.RemoteActorError,
trio.Cancelled, trio.Cancelled,
BaseExceptionGroup, BaseExceptionGroup,
@ -422,7 +422,7 @@ async def test_nested_multierrors(loglevel, start_method):
for subsub in subexc.exceptions: for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,): if subsub in (tractor.RemoteActorError,):
subsub = subsub.boxed_type subsub = subsub.type
assert type(subsub) in ( assert type(subsub) in (
trio.Cancelled, trio.Cancelled,
@ -437,16 +437,16 @@ async def test_nested_multierrors(loglevel, start_method):
# we get back the (sent) cancel signal instead # we get back the (sent) cancel signal instead
if is_win(): if is_win():
if isinstance(subexc, tractor.RemoteActorError): if isinstance(subexc, tractor.RemoteActorError):
assert subexc.boxed_type in ( assert subexc.type in (
BaseExceptionGroup, BaseExceptionGroup,
tractor.RemoteActorError tractor.RemoteActorError
) )
else: else:
assert isinstance(subexc, BaseExceptionGroup) assert isinstance(subexc, BaseExceptionGroup)
else: else:
assert subexc.boxed_type is ExceptionGroup assert subexc.type is ExceptionGroup
else: else:
assert subexc.boxed_type in ( assert subexc.type in (
tractor.RemoteActorError, tractor.RemoteActorError,
trio.Cancelled trio.Cancelled
) )

View File

@ -171,4 +171,4 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio(
# verify boxed error # verify boxed error
err = excinfo.value err = excinfo.value
assert err.boxed_type is NameError assert isinstance(err.type(), NameError)

View File

@ -795,7 +795,7 @@ async def test_callee_cancels_before_started(
# raises a special cancel signal # raises a special cancel signal
except tractor.ContextCancelled as ce: except tractor.ContextCancelled as ce:
ce.boxed_type == trio.Cancelled ce.type == trio.Cancelled
# the traceback should be informative # the traceback should be informative
assert 'itself' in ce.msgdata['tb_str'] assert 'itself' in ce.msgdata['tb_str']
@ -903,7 +903,7 @@ def test_one_end_stream_not_opened(
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.boxed_type == StreamOverrun assert excinfo.value.type == StreamOverrun
elif overrunner == 'callee': elif overrunner == 'callee':
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
@ -912,7 +912,7 @@ def test_one_end_stream_not_opened(
# TODO: embedded remote errors so that we can verify the source # TODO: embedded remote errors so that we can verify the source
# error? the callee delivers an error which is an overrun # error? the callee delivers an error which is an overrun
# wrapped in a remote actor error. # wrapped in a remote actor error.
assert excinfo.value.boxed_type == tractor.RemoteActorError assert excinfo.value.type == tractor.RemoteActorError
else: else:
trio.run(main) trio.run(main)
@ -1131,7 +1131,7 @@ def test_maybe_allow_overruns_stream(
# NOTE: i tried to isolate to a deterministic case here # NOTE: i tried to isolate to a deterministic case here
# based on timeing, but i was kinda wasted, and i don't # based on timeing, but i was kinda wasted, and i don't
# think it's sane to catch them.. # think it's sane to catch them..
assert err.boxed_type in ( assert err.type in (
tractor.RemoteActorError, tractor.RemoteActorError,
StreamOverrun, StreamOverrun,
) )
@ -1139,10 +1139,10 @@ def test_maybe_allow_overruns_stream(
elif ( elif (
slow_side == 'child' slow_side == 'child'
): ):
assert err.boxed_type == StreamOverrun assert err.type == StreamOverrun
elif slow_side == 'parent': elif slow_side == 'parent':
assert err.boxed_type == tractor.RemoteActorError assert err.type == tractor.RemoteActorError
assert 'StreamOverrun' in err.msgdata['tb_str'] assert 'StreamOverrun' in err.msgdata['tb_str']
else: else:

View File

@ -128,7 +128,7 @@ def test_aio_simple_error(reg_addr):
assert err assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.boxed_type == AssertionError assert err.type == AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(reg_addr):
@ -272,7 +272,7 @@ def test_context_spawns_aio_task_that_errors(
err = excinfo.value err = excinfo.value
assert isinstance(err, expect) assert isinstance(err, expect)
assert err.boxed_type == AssertionError assert err.type == AssertionError
async def aio_cancel(): async def aio_cancel():
@ -314,7 +314,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
assert err assert err
# ensure boxed error is correct # ensure boxed error is correct
assert err.boxed_type == to_asyncio.AsyncioCancelled assert err.type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this.. # TODO: verify open_channel_from will fail on this..
@ -466,7 +466,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr): def test_trio_closes_early_and_channel_exits(reg_addr):
@ -500,7 +500,7 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception assert exc.type == Exception
@tractor.context @tractor.context

View File

@ -36,7 +36,7 @@ async def sleep_back_actor(
if not exposed_mods: if not exposed_mods:
expect = tractor.ModuleNotExposed expect = tractor.ModuleNotExposed
assert err.boxed_type is expect assert err.type is expect
raise raise
else: else:
await trio.sleep(float('inf')) await trio.sleep(float('inf'))
@ -150,4 +150,4 @@ def test_rpc_errors(
)) ))
if getattr(value, 'type', None): if getattr(value, 'type', None):
assert value.boxed_type is inside_err assert value.type is inside_err

View File

@ -169,7 +169,8 @@ async def _drain_to_final_msg(
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
# cancellation. # cancellation.
ctx.maybe_raise() if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply # CASE 1: we DID request the cancel we simply
# continue to bubble up as normal. # continue to bubble up as normal.
@ -256,13 +257,6 @@ async def _drain_to_final_msg(
) )
# XXX fallthrough to handle expected error XXX # XXX fallthrough to handle expected error XXX
# TODO: replace this with `ctx.maybe_raise()`
#
# TODO: would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
re: Exception|None = ctx._remote_error re: Exception|None = ctx._remote_error
if re: if re:
log.critical( log.critical(
@ -601,7 +595,7 @@ class Context:
if not re: if not re:
return False return False
if from_uid := re.src_uid: if from_uid := re.src_actor_uid:
from_uid: tuple = tuple(from_uid) from_uid: tuple = tuple(from_uid)
our_uid: tuple = self._actor.uid our_uid: tuple = self._actor.uid
@ -831,7 +825,7 @@ class Context:
# cancellation. # cancellation.
maybe_error_src: tuple = getattr( maybe_error_src: tuple = getattr(
error, error,
'src_uid', 'src_actor_uid',
None, None,
) )
self._canceller = ( self._canceller = (
@ -1036,8 +1030,8 @@ class Context:
@acm @acm
async def open_stream( async def open_stream(
self, self,
allow_overruns: bool|None = False, allow_overruns: bool | None = False,
msg_buffer_size: int|None = None, msg_buffer_size: int | None = None,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
''' '''
@ -1077,16 +1071,13 @@ class Context:
# absorbed there (silently) and we DO NOT want to # absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already # actually try to stream - a cancel msg was already
# sent to the other side! # sent to the other side!
self.maybe_raise( if self._remote_error:
raise_ctxc_from_self_call=True, # NOTE: this is diff then calling
) # `._maybe_raise_remote_err()` specifically
# NOTE: this is diff then calling # because any task entering this `.open_stream()`
# `._maybe_raise_remote_err()` specifically # AFTER cancellation has already been requested,
# because we want to raise a ctxc on any task entering this `.open_stream()` # we DO NOT want to absorb any ctxc ACK silently!
# AFTER cancellation was already been requested, raise self._remote_error
# we DO NOT want to absorb any ctxc ACK silently!
# if self._remote_error:
# raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded # XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different # back from the other side (yet), we raise a different
@ -1167,6 +1158,7 @@ class Context:
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield stream yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside
@ -1191,23 +1183,12 @@ class Context:
# #
# await stream.aclose() # await stream.aclose()
# NOTE: absorb and do not raise any # if re := ctx._remote_error:
# EoC received from the other side such that # ctx._maybe_raise_remote_err(
# it is not raised inside the surrounding # re,
# context block's scope! # raise_ctxc_from_self_call=True,
except trio.EndOfChannel as eoc: # )
if ( # await trio.lowlevel.checkpoint()
eoc
and stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
# from .devx import pause
# await pause()
log.warning(
'Stream was terminated by EoC\n\n'
f'{repr(eoc)}\n'
)
finally: finally:
if self._portal: if self._portal:
@ -1223,6 +1204,7 @@ class Context:
# TODO: replace all the instances of this!! XD # TODO: replace all the instances of this!! XD
def maybe_raise( def maybe_raise(
self, self,
hide_tb: bool = True, hide_tb: bool = True,
**kwargs, **kwargs,
@ -1406,41 +1388,33 @@ class Context:
f'{drained_msgs}' f'{drained_msgs}'
) )
self.maybe_raise( if (
raise_overrun_from_self=( (re := self._remote_error)
raise_overrun # and self._result == res_placeholder
and ):
# only when we ARE NOT the canceller self._maybe_raise_remote_err(
# should we raise overruns, bc ow we're re,
# raising something we know might happen # NOTE: obvi we don't care if we
# during cancellation ;) # overran the far end if we're already
(not self._cancel_called) # waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=(
raise_overrun
and
# only when we ARE NOT the canceller
# should we raise overruns, bc ow we're
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
) )
)
# if (
# (re := self._remote_error)
# # and self._result == res_placeholder
# ):
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=(
# raise_overrun
# and
# # only when we ARE NOT the canceller
# # should we raise overruns, bc ow we're
# # raising something we know might happen
# # during cancellation ;)
# (not self._cancel_called)
# ),
# )
# if maybe_err: # if maybe_err:
# self._result = maybe_err # self._result = maybe_err
return self.outcome return self.outcome
# None if self._result == res_placeholder
# else self._result
# )
# TODO: switch this with above which should be named # TODO: switch this with above which should be named
# `.wait_for_outcome()` and instead do # `.wait_for_outcome()` and instead do
@ -1889,9 +1863,8 @@ async def open_context_from_portal(
# TODO: if we set this the wrapping `@acm` body will # TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally # still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show? for now # we can similarly annotate that frame to NOT show?
# we DO SHOW this frame since it's awkward ow.. hide_tb: bool = True,
hide_tb: bool = False,
# proxied to RPC # proxied to RPC
**kwargs, **kwargs,

View File

@ -58,44 +58,16 @@ class InternalError(RuntimeError):
''' '''
_body_fields: list[str] = [ _body_fields: list[str] = [
'boxed_type', 'src_actor_uid',
'src_type',
# TODO: format this better if we're going to include it.
# 'relay_path',
'src_uid',
# only in sub-types
'canceller', 'canceller',
'sender', 'sender',
] ]
_msgdata_keys: list[str] = [ _msgdata_keys: list[str] = [
'boxed_type_str', 'type_str',
] + _body_fields ] + _body_fields
def get_err_type(type_name: str) -> BaseException|None:
'''
Look up an exception type by name from the set of locally
known namespaces:
- `builtins`
- `tractor._exceptions`
- `trio`
'''
for ns in [
builtins,
_this_mod,
trio,
]:
if type_ref := getattr(
ns,
type_name,
False,
):
return type_ref
# TODO: rename to just `RemoteError`? # TODO: rename to just `RemoteError`?
class RemoteActorError(Exception): class RemoteActorError(Exception):
@ -109,15 +81,13 @@ class RemoteActorError(Exception):
''' '''
reprol_fields: list[str] = [ reprol_fields: list[str] = [
'src_uid', 'src_actor_uid',
'relay_path',
# 'relay_uid',
] ]
def __init__( def __init__(
self, self,
message: str, message: str,
boxed_type: Type[BaseException]|None = None, suberror_type: Type[BaseException] | None = None,
**msgdata **msgdata
) -> None: ) -> None:
@ -131,112 +101,20 @@ class RemoteActorError(Exception):
# - .remote_type # - .remote_type
# also pertains to our long long oustanding issue XD # also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5 # https://github.com/goodboy/tractor/issues/5
# self.boxed_type: str = suberror_type
# TODO: always set ._boxed_type` as `None` by default
# and instead render if from `.boxed_type_str`?
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
self.msgdata: dict[str, Any] = msgdata self.msgdata: dict[str, Any] = msgdata
# TODO: mask out eventually or place in `pack_error()` @property
# pre-`return` lines? def type(self) -> str:
# sanity on inceptions return self.boxed_type
if boxed_type is RemoteActorError:
assert self.src_type_str != 'RemoteActorError'
assert self.src_uid not in self.relay_path
# ensure type-str matches and round-tripping from that
# str results in same error type.
#
# TODO NOTE: this is currently exclusively for the
# `ContextCancelled(boxed_type=trio.Cancelled)` case as is
# used inside `._rpc._invoke()` atm though probably we
# should better emphasize that special (one off?) case
# either by customizing `ContextCancelled.__init__()` or
# through a special factor func?
elif boxed_type:
if not self.msgdata.get('boxed_type_str'):
self.msgdata['boxed_type_str'] = str(
type(boxed_type).__name__
)
assert self.boxed_type_str == self.msgdata['boxed_type_str']
assert self.boxed_type is boxed_type
@property @property
def src_type_str(self) -> str: def type_str(self) -> str:
''' return str(type(self.boxed_type).__name__)
String-name of the source error's type.
This should be the same as `.boxed_type_str` when unpacked
at the first relay/hop's receiving actor.
'''
return self.msgdata['src_type_str']
@property @property
def src_type(self) -> str: def src_actor_uid(self) -> tuple[str, str]|None:
''' return self.msgdata.get('src_actor_uid')
Error type raised by original remote faulting actor.
'''
if self._src_type is None:
self._src_type = get_err_type(
self.msgdata['src_type_str']
)
return self._src_type
@property
def boxed_type_str(self) -> str:
'''
String-name of the (last hop's) boxed error type.
'''
return self.msgdata['boxed_type_str']
@property
def boxed_type(self) -> str:
'''
Error type boxed by last actor IPC hop.
'''
if self._boxed_type is None:
self._boxed_type = get_err_type(
self.msgdata['boxed_type_str']
)
return self._boxed_type
@property
def relay_path(self) -> list[tuple]:
'''
Return the list of actors which consecutively relayed
a boxed `RemoteActorError` the src error up until THIS
actor's hop.
NOTE: a `list` field with the same name is expected to be
passed/updated in `.msgdata`.
'''
return self.msgdata['relay_path']
@property
def relay_uid(self) -> tuple[str, str]|None:
return tuple(
self.msgdata['relay_path'][-1]
)
@property
def src_uid(self) -> tuple[str, str]|None:
if src_uid := (
self.msgdata.get('src_uid')
):
return tuple(src_uid)
# TODO: use path lookup instead?
# return tuple(
# self.msgdata['relay_path'][0]
# )
@property @property
def tb_str( def tb_str(
@ -251,56 +129,28 @@ class RemoteActorError(Exception):
return '' return ''
def _mk_fields_str(
self,
fields: list[str],
end_char: str = '\n',
) -> str:
_repr: str = ''
for key in fields:
val: Any|None = (
getattr(self, key, None)
or
self.msgdata.get(key)
)
# TODO: for `.relay_path` on multiline?
# if not isinstance(val, str):
# val_str = pformat(val)
# else:
val_str: str = repr(val)
if val:
_repr += f'{key}={val_str}{end_char}'
return _repr
def reprol(self) -> str: def reprol(self) -> str:
''' '''
Represent this error for "one line" display, like in Represent this error for "one line" display, like in
a field of our `Context.__repr__()` output. a field of our `Context.__repr__()` output.
''' '''
# TODO: use this matryoshka emjoi XD _repr: str = f'{type(self).__name__}('
# => 🪆 for key in self.reprol_fields:
reprol_str: str = f'{type(self).__name__}(' val: Any|None = self.msgdata.get(key)
_repr: str = self._mk_fields_str( if val:
self.reprol_fields, _repr += f'{key}={repr(val)} '
end_char=' ',
) return _repr
return (
reprol_str
+
_repr
)
def __repr__(self) -> str: def __repr__(self) -> str:
'''
Nicely formatted boxed error meta data + traceback.
''' fields: str = ''
fields: str = self._mk_fields_str( for key in _body_fields:
_body_fields, val: str|None = self.msgdata.get(key)
) if val:
fields += f'{key}={val}\n'
fields: str = textwrap.indent( fields: str = textwrap.indent(
fields, fields,
# prefix=' '*2, # prefix=' '*2,
@ -315,6 +165,8 @@ class RemoteActorError(Exception):
f' ------ - ------\n' f' ------ - ------\n'
f' _|\n' f' _|\n'
) )
# f'|\n'
# f' |\n'
if indent: if indent:
body: str = textwrap.indent( body: str = textwrap.indent(
body, body,
@ -326,47 +178,9 @@ class RemoteActorError(Exception):
')>' ')>'
) )
def unwrap( # TODO: local recontruction of remote exception deats
self,
) -> BaseException:
'''
Unpack the inner-most source error from it's original IPC msg data.
We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the
failing actor's remote env.
'''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611
return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given
# "hop" / relay-node in this error's relay_path?
# => so would render a `RAE[RAE[RAE[Exception]]]` instance
# with all inner errors unpacked?
# -[ ] if this is useful shouldn't be too hard to impl right?
# def unbox(self) -> BaseException: # def unbox(self) -> BaseException:
# ''' # ...
# Unbox to the prior relays (aka last boxing actor's)
# inner error.
# '''
# if not self.relay_path:
# return self.unwrap()
# # TODO..
# # return self.boxed_type(
# # boxed_type=get_type_ref(..
# raise NotImplementedError
class InternalActorError(RemoteActorError): class InternalActorError(RemoteActorError):
@ -418,7 +232,7 @@ class ContextCancelled(RemoteActorError):
f'{self}' f'{self}'
) )
# TODO: to make `.__repr__()` work uniformly? # to make `.__repr__()` work uniformly
# src_actor_uid = canceller # src_actor_uid = canceller
@ -469,8 +283,7 @@ class MessagingError(Exception):
def pack_error( def pack_error(
exc: BaseException|RemoteActorError, exc: BaseException,
tb: str|None = None, tb: str|None = None,
cid: str|None = None, cid: str|None = None,
@ -487,54 +300,27 @@ def pack_error(
else: else:
tb_str = traceback.format_exc() tb_str = traceback.format_exc()
our_uid: tuple = current_actor().uid
error_msg: dict[ error_msg: dict[
str, str,
str | tuple[str, str] str | tuple[str, str]
] = { ] = {
'tb_str': tb_str, 'tb_str': tb_str,
'relay_uid': our_uid, 'type_str': type(exc).__name__,
'boxed_type': type(exc).__name__,
'src_actor_uid': current_actor().uid,
} }
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if ( if (
isinstance(exc, RemoteActorError) isinstance(exc, ContextCancelled)
or isinstance(exc, StreamOverrun)
): ):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
# an onion/inception we need to pack
if (
type(exc) is RemoteActorError
and (boxed := exc.boxed_type)
and boxed != RemoteActorError
):
# sanity on source error (if needed when tweaking this)
assert (src_type := exc.src_type) != RemoteActorError
assert error_msg['src_type_str'] != 'RemoteActorError'
assert error_msg['src_type_str'] == src_type.__name__
assert error_msg['src_uid'] != our_uid
# set the boxed type to be another boxed type thus
# creating an "inception" when unpacked by
# `unpack_error()` in another actor who gets "relayed"
# this error Bo
#
# NOTE on WHY: since we are re-boxing and already
# boxed src error, we want to overwrite the original
# `boxed_type_str` and instead set it to the type of
# the input `exc` type.
error_msg['boxed_type_str'] = 'RemoteActorError'
else:
error_msg['src_uid'] = our_uid
error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__
# XXX alawys append us the last relay in error propagation path
error_msg.setdefault(
'relay_path',
[],
).append(our_uid)
pkt: dict = {'error': error_msg} pkt: dict = {'error': error_msg}
if cid: if cid:
pkt['cid'] = cid pkt['cid'] = cid
@ -543,6 +329,7 @@ def pack_error(
def unpack_error( def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan: Channel|None = None, chan: Channel|None = None,
@ -570,32 +357,35 @@ def unpack_error(
# retrieve the remote error's msg encoded details # retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '') tb_str: str = error_dict.get('tb_str', '')
message: str = ( message: str = f'{chan.uid}\n' + tb_str
f'{chan.uid}\n' type_name: str = (
+ error_dict.get('type_str')
tb_str or error_dict['boxed_type']
) )
suberror_type: Type[BaseException] = Exception
# try to lookup a suitable error type from the local runtime if type_name == 'ContextCancelled':
# env then use it to construct a local instance.
boxed_type_str: str = error_dict['boxed_type_str']
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
if boxed_type_str == 'ContextCancelled':
box_type = ContextCancelled box_type = ContextCancelled
assert boxed_type is box_type suberror_type = box_type
# TODO: already included by `_this_mod` in else loop right? else: # try to lookup a suitable local error type
# for ns in [
# we have an inception/onion-error so ensure builtins,
# we include the relay_path info and the _this_mod,
# original source error. trio,
elif boxed_type_str == 'RemoteActorError': ]:
assert boxed_type is RemoteActorError if suberror_type := getattr(
assert len(error_dict['relay_path']) >= 1 ns,
type_name,
False,
):
break
exc = box_type( exc = box_type(
message, message,
suberror_type=suberror_type,
# unpack other fields into error type init
**error_dict, **error_dict,
) )
@ -711,11 +501,6 @@ def _raise_from_no_key_in_msg(
# destined for the `Context.result()` call during ctx-exit! # destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc stream._eoc: Exception = eoc
# in case there already is some underlying remote error
# that arrived which is probably the source of this stream
# closure
ctx.maybe_raise()
raise eoc from src_err raise eoc from src_err
if ( if (

View File

@ -273,10 +273,7 @@ async def _errors_relayed_via_ipc(
entered_debug = await _debug._maybe_enter_pm(err) entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug: if not entered_debug:
log.exception( log.exception('Actor crashed:\n')
'RPC task crashed\n'
f'|_{ctx}'
)
# always (try to) ship RPC errors back to caller # always (try to) ship RPC errors back to caller
if is_rpc: if is_rpc:
@ -616,8 +613,7 @@ async def _invoke(
# other side. # other side.
ctxc = ContextCancelled( ctxc = ContextCancelled(
msg, msg,
boxed_type=trio.Cancelled, suberror_type=trio.Cancelled,
# boxed_type_str='Cancelled',
canceller=canceller, canceller=canceller,
) )
# assign local error so that the `.outcome` # assign local error so that the `.outcome`
@ -675,7 +671,7 @@ async def _invoke(
f'`{repr(ctx.outcome)}`', f'`{repr(ctx.outcome)}`',
) )
) )
log.runtime( log.cancel(
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n' f'{ctx}\n'
) )
@ -708,6 +704,12 @@ async def try_ship_error_to_remote(
# TODO: special tb fmting for ctxc cases? # TODO: special tb fmting for ctxc cases?
# tb=tb, # tb=tb,
) )
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# actor: Actor = _state.current_actor()
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
await channel.send(msg) await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things # XXX NOTE XXX in SC terms this is one of the worst things

View File

@ -136,7 +136,7 @@ class MsgStream(trio.abc.Channel):
# return await self.receive() # return await self.receive()
# except trio.EndOfChannel: # except trio.EndOfChannel:
# raise StopAsyncIteration # raise StopAsyncIteration
#
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
# introducing this # introducing this
if self._eoc: if self._eoc:
@ -152,6 +152,7 @@ class MsgStream(trio.abc.Channel):
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
# log.exception('GOT KEYERROR')
src_err = kerr src_err = kerr
# NOTE: may raise any of the below error types # NOTE: may raise any of the below error types
@ -165,20 +166,30 @@ class MsgStream(trio.abc.Channel):
stream=self, stream=self,
) )
# XXX: the stream terminates on either of: # XXX: we close the stream on any of these error conditions:
# - via `self._rx_chan.receive()` raising after manual closure
# by the rpc-runtime OR,
# - via a received `{'stop': ...}` msg from remote side.
# |_ NOTE: previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel inside
# `Actor._push_result()`, but now the 'stop' message handling
# has been put just above inside `_raise_from_no_key_in_msg()`.
except ( except (
trio.EndOfChannel, # trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
) as eoc: ) as eoc:
# log.exception('GOT EOC')
src_err = eoc src_err = eoc
self._eoc = eoc self._eoc = eoc
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
# an ``EndOfChannel`` indicates either the internal recv
# memchan exhausted **or** we raisesd it just above after
# receiving a `stop` message from the far end of the stream.
# Previously this was triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()`` (should still be commented code
# there - which should eventually get removed), but now the
# 'stop' message handling has been put just above.
# TODO: Locally, we want to close this stream gracefully, by # TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically. # terminating any local consumers tasks deterministically.
# Once we have broadcast support, we **don't** want to be # Once we have broadcast support, we **don't** want to be
@ -199,11 +210,8 @@ class MsgStream(trio.abc.Channel):
# raise eoc # raise eoc
# a ``ClosedResourceError`` indicates that the internal except trio.ClosedResourceError as cre: # by self._rx_chan
# feeder memory receive channel was closed likely by the # log.exception('GOT CRE')
# runtime after the associated transport-channel
# disconnected or broke.
except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
src_err = cre src_err = cre
log.warning( log.warning(
'`Context._rx_chan` was already closed?' '`Context._rx_chan` was already closed?'
@ -229,30 +237,15 @@ class MsgStream(trio.abc.Channel):
# over the end-of-stream connection error since likely # over the end-of-stream connection error since likely
# the remote error was the source cause? # the remote error was the source cause?
ctx: Context = self._ctx ctx: Context = self._ctx
ctx.maybe_raise( if re := ctx._remote_error:
raise_ctxc_from_self_call=True, ctx._maybe_raise_remote_err(
) re,
raise_ctxc_from_self_call=True,
)
# propagate any error but hide low-level frame details # propagate any error but hide low-level frames from
# from the caller by default for debug noise reduction. # caller by default.
if ( if hide_tb:
hide_tb
# XXX NOTE XXX don't reraise on certain
# stream-specific internal error types like,
#
# - `trio.EoC` since we want to use the exact instance
# to ensure that it is the error that bubbles upward
# for silent absorption by `Context.open_stream()`.
and not self._eoc
# - `RemoteActorError` (or `ContextCancelled`) if it gets
# raised from `_raise_from_no_key_in_msg()` since we
# want the same (as the above bullet) for any
# `.open_context()` block bubbled error raised by
# any nearby ctx API remote-failures.
# and not isinstance(src_err, RemoteActorError)
):
raise type(src_err)(*src_err.args) from src_err raise type(src_err)(*src_err.args) from src_err
else: else:
raise src_err raise src_err
@ -377,10 +370,6 @@ class MsgStream(trio.abc.Channel):
# await rx_chan.aclose() # await rx_chan.aclose()
if not self._eoc: if not self._eoc:
log.cancel(
'Stream closed before it received an EoC?\n'
'Setting eoc manually..\n..'
)
self._eoc: bool = trio.EndOfChannel( self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by {self._ctx.side}\n' f'Context stream closed by {self._ctx.side}\n'
f'|_{self}\n' f'|_{self}\n'
@ -425,11 +414,13 @@ class MsgStream(trio.abc.Channel):
@property @property
def closed(self) -> bool: def closed(self) -> bool:
if (
rxc: bool = self._rx_chan._closed (rxc := self._rx_chan._closed)
_closed: bool|Exception = self._closed or
_eoc: bool|trio.EndOfChannel = self._eoc (_closed := self._closed)
if rxc or _closed or _eoc: or
(_eoc := self._eoc)
):
log.runtime( log.runtime(
f'`MsgStream` is already closed\n' f'`MsgStream` is already closed\n'
f'{self}\n' f'{self}\n'
@ -505,11 +496,7 @@ class MsgStream(trio.abc.Channel):
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# raise any alreay known error immediately
self._ctx.maybe_raise() self._ctx.maybe_raise()
if self._eoc:
raise self._eoc
if self._closed: if self._closed:
raise self._closed raise self._closed