Merge pull request #373 from goodboy/remote_inceptions

Remote inceptions: improved `RemoteActorError` boxing of inter-actor exceptions
sc_super_proto_dgrams
goodboy 2025-03-20 22:37:00 -04:00 committed by GitHub
commit 819889702f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 543 additions and 164 deletions

View File

@ -32,7 +32,7 @@ async def main():
try: try:
await p1.run(name_error) await p1.run(name_error)
except tractor.RemoteActorError as rae: except tractor.RemoteActorError as rae:
assert rae.type is NameError assert rae.boxed_type is NameError
async for i in stream: async for i in stream:

View File

View File

@ -95,6 +95,7 @@ def test_ipc_channel_break_during_stream(
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
consider_namespace_packages=False,
) )
# by def we expect KBI from user after a simulated "hang # by def we expect KBI from user after a simulated "hang

View File

@ -14,7 +14,7 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from conftest import no_windows from .conftest import no_windows
def is_win(): def is_win():
@ -77,7 +77,7 @@ def test_remote_error(reg_addr, args_err):
# of this actor nursery. # of this actor nursery.
await portal.result() await portal.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == errtype assert err.boxed_type == errtype
print("Look Maa that actor failed hard, hehh") print("Look Maa that actor failed hard, hehh")
raise raise
@ -86,7 +86,7 @@ def test_remote_error(reg_addr, args_err):
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.type == errtype assert excinfo.value.boxed_type == errtype
else: else:
# the root task will also error on the `.result()` call # the root task will also error on the `.result()` call
@ -96,7 +96,7 @@ def test_remote_error(reg_addr, args_err):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.type == errtype assert exc.boxed_type == errtype
def test_multierror(reg_addr): def test_multierror(reg_addr):
@ -117,7 +117,7 @@ def test_multierror(reg_addr):
try: try:
await portal2.result() await portal2.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == AssertionError assert err.boxed_type == AssertionError
print("Look Maa that first actor failed hard, hehh") print("Look Maa that first actor failed hard, hehh")
raise raise
@ -169,7 +169,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions: for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.type == AssertionError assert exc.boxed_type == AssertionError
async def do_nothing(): async def do_nothing():
@ -310,7 +310,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
await portal.run(func, **kwargs) await portal.run(func, **kwargs)
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == err_type assert err.boxed_type == err_type
# we only expect this first error to propogate # we only expect this first error to propogate
# (all other daemons are cancelled before they # (all other daemons are cancelled before they
# can be scheduled) # can be scheduled)
@ -329,11 +329,11 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError): if isinstance(exc, tractor.RemoteActorError):
assert exc.type == err_type assert exc.boxed_type == err_type
else: else:
assert isinstance(exc, trio.Cancelled) assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError): elif isinstance(err, tractor.RemoteActorError):
assert err.type == err_type assert err.boxed_type == err_type
assert n.cancelled is True assert n.cancelled is True
assert not n._children assert not n._children
@ -412,7 +412,7 @@ async def test_nested_multierrors(loglevel, start_method):
elif isinstance(subexc, tractor.RemoteActorError): elif isinstance(subexc, tractor.RemoteActorError):
# on windows it seems we can't exactly be sure wtf # on windows it seems we can't exactly be sure wtf
# will happen.. # will happen..
assert subexc.type in ( assert subexc.boxed_type in (
tractor.RemoteActorError, tractor.RemoteActorError,
trio.Cancelled, trio.Cancelled,
BaseExceptionGroup, BaseExceptionGroup,
@ -422,7 +422,7 @@ async def test_nested_multierrors(loglevel, start_method):
for subsub in subexc.exceptions: for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,): if subsub in (tractor.RemoteActorError,):
subsub = subsub.type subsub = subsub.boxed_type
assert type(subsub) in ( assert type(subsub) in (
trio.Cancelled, trio.Cancelled,
@ -437,16 +437,16 @@ async def test_nested_multierrors(loglevel, start_method):
# we get back the (sent) cancel signal instead # we get back the (sent) cancel signal instead
if is_win(): if is_win():
if isinstance(subexc, tractor.RemoteActorError): if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in ( assert subexc.boxed_type in (
BaseExceptionGroup, BaseExceptionGroup,
tractor.RemoteActorError tractor.RemoteActorError
) )
else: else:
assert isinstance(subexc, BaseExceptionGroup) assert isinstance(subexc, BaseExceptionGroup)
else: else:
assert subexc.type is ExceptionGroup assert subexc.boxed_type is ExceptionGroup
else: else:
assert subexc.type in ( assert subexc.boxed_type in (
tractor.RemoteActorError, tractor.RemoteActorError,
trio.Cancelled trio.Cancelled
) )

View File

@ -171,4 +171,4 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio(
# verify boxed error # verify boxed error
err = excinfo.value err = excinfo.value
assert isinstance(err.type(), NameError) assert err.boxed_type is NameError

View File

@ -795,7 +795,7 @@ async def test_callee_cancels_before_started(
# raises a special cancel signal # raises a special cancel signal
except tractor.ContextCancelled as ce: except tractor.ContextCancelled as ce:
ce.type == trio.Cancelled ce.boxed_type == trio.Cancelled
# the traceback should be informative # the traceback should be informative
assert 'itself' in ce.msgdata['tb_str'] assert 'itself' in ce.msgdata['tb_str']
@ -903,7 +903,7 @@ def test_one_end_stream_not_opened(
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.type == StreamOverrun assert excinfo.value.boxed_type == StreamOverrun
elif overrunner == 'callee': elif overrunner == 'callee':
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
@ -912,7 +912,7 @@ def test_one_end_stream_not_opened(
# TODO: embedded remote errors so that we can verify the source # TODO: embedded remote errors so that we can verify the source
# error? the callee delivers an error which is an overrun # error? the callee delivers an error which is an overrun
# wrapped in a remote actor error. # wrapped in a remote actor error.
assert excinfo.value.type == tractor.RemoteActorError assert excinfo.value.boxed_type == tractor.RemoteActorError
else: else:
trio.run(main) trio.run(main)
@ -1131,7 +1131,7 @@ def test_maybe_allow_overruns_stream(
# NOTE: i tried to isolate to a deterministic case here # NOTE: i tried to isolate to a deterministic case here
# based on timeing, but i was kinda wasted, and i don't # based on timeing, but i was kinda wasted, and i don't
# think it's sane to catch them.. # think it's sane to catch them..
assert err.type in ( assert err.boxed_type in (
tractor.RemoteActorError, tractor.RemoteActorError,
StreamOverrun, StreamOverrun,
) )
@ -1139,10 +1139,10 @@ def test_maybe_allow_overruns_stream(
elif ( elif (
slow_side == 'child' slow_side == 'child'
): ):
assert err.type == StreamOverrun assert err.boxed_type == StreamOverrun
elif slow_side == 'parent': elif slow_side == 'parent':
assert err.type == tractor.RemoteActorError assert err.boxed_type == tractor.RemoteActorError
assert 'StreamOverrun' in err.msgdata['tb_str'] assert 'StreamOverrun' in err.msgdata['tb_str']
else: else:

View File

@ -203,7 +203,7 @@ def ctlc(
# XXX: disable pygments highlighting for auto-tests # XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle # since some envs (like actions CI) will struggle
# the the added color-char encoding.. # the the added color-char encoding..
from tractor._debug import TractorConfig from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False TractorConfig.use_pygements = False
yield use_ctlc yield use_ctlc
@ -685,7 +685,7 @@ def test_multi_daemon_subactors(
# now the root actor won't clobber the bp_forever child # now the root actor won't clobber the bp_forever child
# during it's first access to the debug lock, but will instead # during it's first access to the debug lock, but will instead
# wait for the lock to release, by the edge triggered # wait for the lock to release, by the edge triggered
# ``_debug.Lock.no_remote_has_tty`` event before sending cancel messages # ``devx._debug.Lock.no_remote_has_tty`` event before sending cancel messages
# (via portals) to its underlings B) # (via portals) to its underlings B)
# at some point here there should have been some warning msg from # at some point here there should have been some warning msg from

View File

@ -20,7 +20,7 @@ from tractor._testing import (
def run_example_in_subproc( def run_example_in_subproc(
loglevel: str, loglevel: str,
testdir, testdir,
arb_addr: tuple[str, int], reg_addr: tuple[str, int],
): ):
@contextmanager @contextmanager

View File

@ -128,7 +128,7 @@ def test_aio_simple_error(reg_addr):
assert err assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.type == AssertionError assert err.boxed_type == AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(reg_addr):
@ -272,7 +272,7 @@ def test_context_spawns_aio_task_that_errors(
err = excinfo.value err = excinfo.value
assert isinstance(err, expect) assert isinstance(err, expect)
assert err.type == AssertionError assert err.boxed_type == AssertionError
async def aio_cancel(): async def aio_cancel():
@ -314,7 +314,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
assert err assert err
# ensure boxed error is correct # ensure boxed error is correct
assert err.type == to_asyncio.AsyncioCancelled assert err.boxed_type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this.. # TODO: verify open_channel_from will fail on this..
@ -466,7 +466,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.type == Exception assert exc.boxed_type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr): def test_trio_closes_early_and_channel_exits(reg_addr):
@ -500,7 +500,7 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
# ensure boxed errors # ensure boxed errors
for exc in excinfo.value.exceptions: for exc in excinfo.value.exceptions:
assert exc.type == Exception assert exc.boxed_type == Exception
@tractor.context @tractor.context

View File

@ -16,6 +16,11 @@ from tractor import ( # typing
Portal, Portal,
Context, Context,
ContextCancelled, ContextCancelled,
RemoteActorError,
)
from tractor._testing import (
# tractor_test,
expect_ctxc,
) )
# XXX TODO cases: # XXX TODO cases:
@ -156,10 +161,11 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
): ):
await trio.sleep_forever() await trio.sleep_forever()
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.type == TypeError rae = excinfo.value
assert rae.boxed_type == TypeError
@tractor.context @tractor.context
@ -739,14 +745,16 @@ def test_peer_canceller(
with pytest.raises(ContextCancelled) as excinfo: with pytest.raises(ContextCancelled) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.type == ContextCancelled assert excinfo.value.boxed_type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller' assert excinfo.value.canceller[0] == 'canceller'
@tractor.context @tractor.context
async def basic_echo_server( async def basic_echo_server(
ctx: Context, ctx: Context,
peer_name: str = 'stepbro', peer_name: str = 'wittle_bruv',
err_after: int|None = None,
) -> None: ) -> None:
''' '''
@ -774,17 +782,31 @@ async def basic_echo_server(
# assert 0 # assert 0
await ipc.send(resp) await ipc.send(resp)
if (
err_after
and i > err_after
):
raise RuntimeError(
f'Simulated error in `{peer_name}`'
)
@tractor.context @tractor.context
async def serve_subactors( async def serve_subactors(
ctx: Context, ctx: Context,
peer_name: str, peer_name: str,
debug_mode: bool,
) -> None: ) -> None:
async with open_nursery() as an: async with open_nursery() as an:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
await ctx.started(peer_name) await ctx.started(peer_name)
async with ctx.open_stream() as reqs: async with ctx.open_stream() as ipc:
async for msg in reqs: async for msg in ipc:
peer_name: str = msg peer_name: str = msg
peer: Portal = await an.start_actor( peer: Portal = await an.start_actor(
name=peer_name, name=peer_name,
@ -795,7 +817,7 @@ async def serve_subactors(
f'{peer_name}\n' f'{peer_name}\n'
f'|_{peer}\n' f'|_{peer}\n'
) )
await reqs.send(( await ipc.send((
peer.chan.uid, peer.chan.uid,
peer.chan.raddr, peer.chan.raddr,
)) ))
@ -807,14 +829,20 @@ async def serve_subactors(
async def client_req_subactor( async def client_req_subactor(
ctx: Context, ctx: Context,
peer_name: str, peer_name: str,
debug_mode: bool,
# used to simulate a user causing an error to be raised # used to simulate a user causing an error to be raised
# directly in thread (like a KBI) to better replicate the # directly in thread (like a KBI) to better replicate the
# case where a `modden` CLI client would hang afer requesting # case where a `modden` CLI client would hang afer requesting
# a `Context.cancel()` to `bigd`'s wks spawner. # a `Context.cancel()` to `bigd`'s wks spawner.
reraise_on_cancel: str|None = None, reraise_on_cancel: str|None = None,
sub_err_after: int|None = None,
) -> None: ) -> None:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
# TODO: other cases to do with sub lifetimes: # TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub # -[ ] test that we can have the server spawn a sub
# that lives longer then ctx with this client. # that lives longer then ctx with this client.
@ -836,6 +864,7 @@ async def client_req_subactor(
spawner.open_context( spawner.open_context(
serve_subactors, serve_subactors,
peer_name=peer_name, peer_name=peer_name,
debug_mode=debug_mode,
) as (spawner_ctx, first), ) as (spawner_ctx, first),
): ):
assert first == peer_name assert first == peer_name
@ -857,6 +886,7 @@ async def client_req_subactor(
await tell_little_bro( await tell_little_bro(
actor_name=sub_uid[0], actor_name=sub_uid[0],
caller='client', caller='client',
err_after=sub_err_after,
) )
# TODO: test different scope-layers of # TODO: test different scope-layers of
@ -868,9 +898,7 @@ async def client_req_subactor(
# TODO: would be super nice to have a special injected # TODO: would be super nice to have a special injected
# cancel type here (maybe just our ctxc) but using # cancel type here (maybe just our ctxc) but using
# some native mechanism in `trio` :p # some native mechanism in `trio` :p
except ( except trio.Cancelled as err:
trio.Cancelled
) as err:
_err = err _err = err
if reraise_on_cancel: if reraise_on_cancel:
errtype = globals()['__builtins__'][reraise_on_cancel] errtype = globals()['__builtins__'][reraise_on_cancel]
@ -897,7 +925,9 @@ async def client_req_subactor(
async def tell_little_bro( async def tell_little_bro(
actor_name: str, actor_name: str,
caller: str = ''
caller: str = '',
err_after: int|None = None,
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
async with ( async with (
@ -906,10 +936,12 @@ async def tell_little_bro(
) as lb, ) as lb,
lb.open_context( lb.open_context(
basic_echo_server, basic_echo_server,
# XXX proxy any delayed err condition
err_after=err_after,
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream(
basic_echo_server, sub_ctx.open_stream() as echo_ipc,
) as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
uid: tuple = actor.uid uid: tuple = actor.uid
@ -936,10 +968,15 @@ async def tell_little_bro(
'raise_client_error', 'raise_client_error',
[None, 'KeyboardInterrupt'], [None, 'KeyboardInterrupt'],
) )
@pytest.mark.parametrize(
'raise_sub_spawn_error_after',
[None, 50],
)
def test_peer_spawns_and_cancels_service_subactor( def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool, debug_mode: bool,
raise_client_error: str, raise_client_error: str,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
raise_sub_spawn_error_after: int|None,
): ):
# NOTE: this tests for the modden `mod wks open piker` bug # NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx # discovered as part of implementing workspace ctx
@ -953,6 +990,16 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate! # and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro' peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError):
'''
Validate the little_bro's relayed inception!
'''
assert rae.boxed_type is RemoteActorError
assert rae.src_type is RuntimeError
assert 'client' in rae.relay_uid
assert peer_name in rae.src_uid
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this. # NOTE: to halt the peer tasks on ctxc, uncomment this.
@ -976,14 +1023,24 @@ def test_peer_spawns_and_cancels_service_subactor(
server.open_context( server.open_context(
serve_subactors, serve_subactors,
peer_name=peer_name, peer_name=peer_name,
debug_mode=debug_mode,
) as (spawn_ctx, first), ) as (spawn_ctx, first),
client.open_context( client.open_context(
client_req_subactor, client_req_subactor,
peer_name=peer_name, peer_name=peer_name,
debug_mode=debug_mode,
reraise_on_cancel=raise_client_error, reraise_on_cancel=raise_client_error,
# trigger for error condition in sub
# during streaming.
sub_err_after=raise_sub_spawn_error_after,
) as (client_ctx, client_says), ) as (client_ctx, client_says),
): ):
root: Actor = current_actor()
spawner_uid: tuple = spawn_ctx.chan.uid
print( print(
f'Server says: {first}\n' f'Server says: {first}\n'
f'Client says: {client_says}\n' f'Client says: {client_says}\n'
@ -993,6 +1050,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# (grandchild of this root actor) "little_bro" # (grandchild of this root actor) "little_bro"
# and ensure we can also use it as an echo # and ensure we can also use it as an echo
# server. # server.
sub: Portal
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
name=peer_name, name=peer_name,
) as sub: ) as sub:
@ -1004,56 +1062,139 @@ def test_peer_spawns_and_cancels_service_subactor(
f'.uid: {sub.actor.uid}\n' f'.uid: {sub.actor.uid}\n'
f'chan.raddr: {sub.chan.raddr}\n' f'chan.raddr: {sub.chan.raddr}\n'
) )
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
# signal client to raise a KBI async with expect_ctxc(
await client_ctx.cancel() yay=raise_sub_spawn_error_after,
print('root cancelled client, checking that sub-spawn is down') reraise=False,
):
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
async with tractor.find_actor( if not raise_sub_spawn_error_after:
name=peer_name,
) as sub:
assert not sub
print('root cancelling server/client sub-actors') # signal client to cancel and maybe raise a KBI
await client_ctx.cancel()
print(
'-> root cancelling client,\n'
'-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n'
)
# else:
# await tractor.pause() try:
res = await client_ctx.result(hide_tb=False) res = await client_ctx.result(hide_tb=False)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked # in remote (relayed inception) error
assert res.canceller == current_actor().uid # case, we should error on the line above!
if raise_sub_spawn_error_after:
pytest.fail(
'Never rxed proxied `RemoteActorError[RuntimeError]` !?'
)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == root.uid
except RemoteActorError as rae:
_err = rae
assert raise_sub_spawn_error_after
# since this is a "relayed error" via the client
# sub-actor, it is expected to be
# a `RemoteActorError` boxing another
# `RemoteActorError` otherwise known as
# an "inception" (from `trio`'s parlance)
# ((or maybe a "Matryoshka" and/or "matron"
# in our own working parlance)) which
# contains the source error from the
# little_bro: a `RuntimeError`.
#
check_inner_rte(rae)
assert rae.relay_uid == client.chan.uid
assert rae.src_uid == sub.chan.uid
assert not client_ctx.cancel_acked
assert (
client_ctx.maybe_error
is client_ctx.outcome
is rae
)
raise
# await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
await spawn_ctx.cancel()
# await server.cancel_actor() # await server.cancel_actor()
except RemoteActorError as rae:
# XXX more-or-less same as above handler
# this is just making sure the error bubbles out
# of the
_err = rae
assert raise_sub_spawn_error_after
raise
# since we called `.cancel_actor()`, `.cancel_ack` # since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not # will not be set on the ctx bc `ctx.cancel()` was not
# called directly fot this confext. # called directly fot this confext.
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
print('caught ctxc from contexts!') _ctxc = ctxc
assert ctxc.canceller == current_actor().uid print(
f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n'
f'{repr(ctxc)}\n'
)
if not raise_sub_spawn_error_after:
assert ctxc.canceller == root.uid
else:
assert ctxc.canceller == spawner_uid
assert ctxc is spawn_ctx.outcome assert ctxc is spawn_ctx.outcome
assert ctxc is spawn_ctx.maybe_error assert ctxc is spawn_ctx.maybe_error
raise raise
# assert spawn_ctx.cancel_acked if raise_sub_spawn_error_after:
assert spawn_ctx.cancel_acked pytest.fail(
assert client_ctx.cancel_acked 'context block(s) in PARENT never raised?!?'
)
await client.cancel_actor() if not raise_sub_spawn_error_after:
await server.cancel_actor() # assert spawn_ctx.cancel_acked
assert spawn_ctx.cancel_acked
assert client_ctx.cancel_acked
# WOA WOA WOA! we need this to close..!!!?? await client.cancel_actor()
# that's super bad XD await server.cancel_actor()
# TODO: why isn't this working!?!? # WOA WOA WOA! we need this to close..!!!??
# we're now outside the `.open_context()` block so # that's super bad XD
# the internal `Context._scope: CancelScope` should be
# gracefully "closed" ;)
# assert spawn_ctx.cancelled_caught # TODO: why isn't this working!?!?
# we're now outside the `.open_context()` block so
# the internal `Context._scope: CancelScope` should be
# gracefully "closed" ;)
trio.run(main) # assert spawn_ctx.cancelled_caught
if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
rae: RemoteActorError = excinfo.value
check_inner_rte(rae)
else:
trio.run(main)

View File

@ -15,9 +15,19 @@ async def sleep_back_actor(
func_name, func_name,
func_defined, func_defined,
exposed_mods, exposed_mods,
*,
reg_addr: tuple,
): ):
if actor_name: if actor_name:
async with tractor.find_actor(actor_name) as portal: async with tractor.find_actor(
actor_name,
# NOTE: must be set manually since
# the subactor doesn't have the reg_addr
# fixture code run in it!
# TODO: maybe we should just set this once in the
# _state mod and derive to all children?
registry_addrs=[reg_addr],
) as portal:
try: try:
await portal.run(__name__, func_name) await portal.run(__name__, func_name)
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
@ -26,7 +36,7 @@ async def sleep_back_actor(
if not exposed_mods: if not exposed_mods:
expect = tractor.ModuleNotExposed expect = tractor.ModuleNotExposed
assert err.type is expect assert err.boxed_type is expect
raise raise
else: else:
await trio.sleep(float('inf')) await trio.sleep(float('inf'))
@ -52,11 +62,17 @@ async def short_sleep():
'fail_on_syntax', 'fail_on_syntax',
], ],
) )
def test_rpc_errors(reg_addr, to_call, testdir): def test_rpc_errors(
"""Test errors when making various RPC requests to an actor reg_addr,
to_call,
testdir,
):
'''
Test errors when making various RPC requests to an actor
that either doesn't have the requested module exposed or doesn't define that either doesn't have the requested module exposed or doesn't define
the named function. the named function.
"""
'''
exposed_mods, funcname, inside_err = to_call exposed_mods, funcname, inside_err = to_call
subactor_exposed_mods = [] subactor_exposed_mods = []
func_defined = globals().get(funcname, False) func_defined = globals().get(funcname, False)
@ -84,8 +100,13 @@ def test_rpc_errors(reg_addr, to_call, testdir):
# spawn a subactor which calls us back # spawn a subactor which calls us back
async with tractor.open_nursery( async with tractor.open_nursery(
arbiter_addr=reg_addr, registry_addrs=[reg_addr],
enable_modules=exposed_mods.copy(), enable_modules=exposed_mods.copy(),
# NOTE: will halt test in REPL if uncommented, so only
# do that if actually debugging subactor but keep it
# disabled for the test.
# debug_mode=True,
) as n: ) as n:
actor = tractor.current_actor() actor = tractor.current_actor()
@ -102,6 +123,7 @@ def test_rpc_errors(reg_addr, to_call, testdir):
exposed_mods=exposed_mods, exposed_mods=exposed_mods,
func_defined=True if func_defined else False, func_defined=True if func_defined else False,
enable_modules=subactor_exposed_mods, enable_modules=subactor_exposed_mods,
reg_addr=reg_addr,
) )
def run(): def run():
@ -128,4 +150,4 @@ def test_rpc_errors(reg_addr, to_call, testdir):
)) ))
if getattr(value, 'type', None): if getattr(value, 'type', None):
assert value.type is inside_err assert value.boxed_type is inside_err

View File

@ -32,8 +32,7 @@ async def spawn(
if actor.is_arbiter: if actor.is_arbiter:
async with tractor.open_nursery( async with tractor.open_nursery() as nursery:
) as nursery:
# forks here # forks here
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
@ -55,7 +54,9 @@ async def spawn(
return 10 return 10
def test_local_arbiter_subactor_global_state(reg_addr): def test_local_arbiter_subactor_global_state(
reg_addr,
):
result = trio.run( result = trio.run(
spawn, spawn,
True, True,

View File

@ -58,16 +58,44 @@ class InternalError(RuntimeError):
''' '''
_body_fields: list[str] = [ _body_fields: list[str] = [
'src_actor_uid', 'boxed_type',
'src_type',
# TODO: format this better if we're going to include it.
# 'relay_path',
'src_uid',
# only in sub-types
'canceller', 'canceller',
'sender', 'sender',
] ]
_msgdata_keys: list[str] = [ _msgdata_keys: list[str] = [
'type_str', 'boxed_type_str',
] + _body_fields ] + _body_fields
def get_err_type(type_name: str) -> BaseException|None:
'''
Look up an exception type by name from the set of locally
known namespaces:
- `builtins`
- `tractor._exceptions`
- `trio`
'''
for ns in [
builtins,
_this_mod,
trio,
]:
if type_ref := getattr(
ns,
type_name,
False,
):
return type_ref
# TODO: rename to just `RemoteError`? # TODO: rename to just `RemoteError`?
class RemoteActorError(Exception): class RemoteActorError(Exception):
@ -81,13 +109,14 @@ class RemoteActorError(Exception):
''' '''
reprol_fields: list[str] = [ reprol_fields: list[str] = [
'src_actor_uid', 'src_uid',
'relay_path',
] ]
def __init__( def __init__(
self, self,
message: str, message: str,
suberror_type: Type[BaseException] | None = None, boxed_type: Type[BaseException]|None = None,
**msgdata **msgdata
) -> None: ) -> None:
@ -101,20 +130,112 @@ class RemoteActorError(Exception):
# - .remote_type # - .remote_type
# also pertains to our long long oustanding issue XD # also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5 # https://github.com/goodboy/tractor/issues/5
self.boxed_type: str = suberror_type #
# TODO: always set ._boxed_type` as `None` by default
# and instead render if from `.boxed_type_str`?
self._boxed_type: BaseException = boxed_type
self._src_type: BaseException|None = None
self.msgdata: dict[str, Any] = msgdata self.msgdata: dict[str, Any] = msgdata
@property # TODO: mask out eventually or place in `pack_error()`
def type(self) -> str: # pre-`return` lines?
return self.boxed_type # sanity on inceptions
if boxed_type is RemoteActorError:
assert self.src_type_str != 'RemoteActorError'
assert self.src_uid not in self.relay_path
# ensure type-str matches and round-tripping from that
# str results in same error type.
#
# TODO NOTE: this is currently exclusively for the
# `ContextCancelled(boxed_type=trio.Cancelled)` case as is
# used inside `._rpc._invoke()` atm though probably we
# should better emphasize that special (one off?) case
# either by customizing `ContextCancelled.__init__()` or
# through a special factor func?
elif boxed_type:
if not self.msgdata.get('boxed_type_str'):
self.msgdata['boxed_type_str'] = str(
type(boxed_type).__name__
)
assert self.boxed_type_str == self.msgdata['boxed_type_str']
assert self.boxed_type is boxed_type
@property @property
def type_str(self) -> str: def src_type_str(self) -> str:
return str(type(self.boxed_type).__name__) '''
String-name of the source error's type.
This should be the same as `.boxed_type_str` when unpacked
at the first relay/hop's receiving actor.
'''
return self.msgdata['src_type_str']
@property @property
def src_actor_uid(self) -> tuple[str, str]|None: def src_type(self) -> str:
return self.msgdata.get('src_actor_uid') '''
Error type raised by original remote faulting actor.
'''
if self._src_type is None:
self._src_type = get_err_type(
self.msgdata['src_type_str']
)
return self._src_type
@property
def boxed_type_str(self) -> str:
'''
String-name of the (last hop's) boxed error type.
'''
return self.msgdata['boxed_type_str']
@property
def boxed_type(self) -> str:
'''
Error type boxed by last actor IPC hop.
'''
if self._boxed_type is None:
self._boxed_type = get_err_type(
self.msgdata['boxed_type_str']
)
return self._boxed_type
@property
def relay_path(self) -> list[tuple]:
'''
Return the list of actors which consecutively relayed
a boxed `RemoteActorError` the src error up until THIS
actor's hop.
NOTE: a `list` field with the same name is expected to be
passed/updated in `.msgdata`.
'''
return self.msgdata['relay_path']
@property
def relay_uid(self) -> tuple[str, str]|None:
return tuple(
self.msgdata['relay_path'][-1]
)
@property
def src_uid(self) -> tuple[str, str]|None:
if src_uid := (
self.msgdata.get('src_uid')
):
return tuple(src_uid)
# TODO: use path lookup instead?
# return tuple(
# self.msgdata['relay_path'][0]
# )
@property @property
def tb_str( def tb_str(
@ -129,28 +250,56 @@ class RemoteActorError(Exception):
return '' return ''
def _mk_fields_str(
self,
fields: list[str],
end_char: str = '\n',
) -> str:
_repr: str = ''
for key in fields:
val: Any|None = (
getattr(self, key, None)
or
self.msgdata.get(key)
)
# TODO: for `.relay_path` on multiline?
# if not isinstance(val, str):
# val_str = pformat(val)
# else:
val_str: str = repr(val)
if val:
_repr += f'{key}={val_str}{end_char}'
return _repr
def reprol(self) -> str: def reprol(self) -> str:
''' '''
Represent this error for "one line" display, like in Represent this error for "one line" display, like in
a field of our `Context.__repr__()` output. a field of our `Context.__repr__()` output.
''' '''
_repr: str = f'{type(self).__name__}(' # TODO: use this matryoshka emjoi XD
for key in self.reprol_fields: # => 🪆
val: Any|None = self.msgdata.get(key) reprol_str: str = f'{type(self).__name__}('
if val: _repr: str = self._mk_fields_str(
_repr += f'{key}={repr(val)} ' self.reprol_fields,
end_char=' ',
return _repr )
return (
reprol_str
+
_repr
)
def __repr__(self) -> str: def __repr__(self) -> str:
'''
Nicely formatted boxed error meta data + traceback.
fields: str = '' '''
for key in _body_fields: fields: str = self._mk_fields_str(
val: str|None = self.msgdata.get(key) _body_fields,
if val: )
fields += f'{key}={val}\n'
fields: str = textwrap.indent( fields: str = textwrap.indent(
fields, fields,
# prefix=' '*2, # prefix=' '*2,
@ -165,8 +314,6 @@ class RemoteActorError(Exception):
f' ------ - ------\n' f' ------ - ------\n'
f' _|\n' f' _|\n'
) )
# f'|\n'
# f' |\n'
if indent: if indent:
body: str = textwrap.indent( body: str = textwrap.indent(
body, body,
@ -178,9 +325,47 @@ class RemoteActorError(Exception):
')>' ')>'
) )
# TODO: local recontruction of remote exception deats def unwrap(
self,
) -> BaseException:
'''
Unpack the inner-most source error from it's original IPC msg data.
We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the
failing actor's remote env.
'''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611
return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given
# "hop" / relay-node in this error's relay_path?
# => so would render a `RAE[RAE[RAE[Exception]]]` instance
# with all inner errors unpacked?
# -[ ] if this is useful shouldn't be too hard to impl right?
# def unbox(self) -> BaseException: # def unbox(self) -> BaseException:
# ... # '''
# Unbox to the prior relays (aka last boxing actor's)
# inner error.
# '''
# if not self.relay_path:
# return self.unwrap()
# # TODO..
# # return self.boxed_type(
# # boxed_type=get_type_ref(..
# raise NotImplementedError
class InternalActorError(RemoteActorError): class InternalActorError(RemoteActorError):
@ -232,7 +417,7 @@ class ContextCancelled(RemoteActorError):
f'{self}' f'{self}'
) )
# to make `.__repr__()` work uniformly # TODO: to make `.__repr__()` work uniformly?
# src_actor_uid = canceller # src_actor_uid = canceller
@ -283,7 +468,8 @@ class MessagingError(Exception):
def pack_error( def pack_error(
exc: BaseException, exc: BaseException|RemoteActorError,
tb: str|None = None, tb: str|None = None,
cid: str|None = None, cid: str|None = None,
@ -300,27 +486,56 @@ def pack_error(
else: else:
tb_str = traceback.format_exc() tb_str = traceback.format_exc()
error_msg: dict[ error_msg: dict[ # for IPC
str, str,
str | tuple[str, str] str | tuple[str, str]
] = { ] = {}
'tb_str': tb_str, our_uid: tuple = current_actor().uid
'type_str': type(exc).__name__,
'boxed_type': type(exc).__name__,
'src_actor_uid': current_actor().uid,
}
# TODO: ?just wholesale proxy `.msgdata: dict`?
# XXX WARNING, when i swapped these ctx-semantics
# tests started hanging..???!!!???
# if msgdata := exc.getattr('msgdata', {}):
# error_msg.update(msgdata)
if ( if (
isinstance(exc, ContextCancelled) isinstance(exc, RemoteActorError)
or isinstance(exc, StreamOverrun)
): ):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
# an onion/inception we need to pack
if (
type(exc) is RemoteActorError
and (boxed := exc.boxed_type)
and boxed != RemoteActorError
):
# sanity on source error (if needed when tweaking this)
assert (src_type := exc.src_type) != RemoteActorError
assert error_msg['src_type_str'] != 'RemoteActorError'
assert error_msg['src_type_str'] == src_type.__name__
assert error_msg['src_uid'] != our_uid
# set the boxed type to be another boxed type thus
# creating an "inception" when unpacked by
# `unpack_error()` in another actor who gets "relayed"
# this error Bo
#
# NOTE on WHY: since we are re-boxing and already
# boxed src error, we want to overwrite the original
# `boxed_type_str` and instead set it to the type of
# the input `exc` type.
error_msg['boxed_type_str'] = 'RemoteActorError'
else:
error_msg['src_uid'] = our_uid
error_msg['src_type_str'] = type(exc).__name__
error_msg['boxed_type_str'] = type(exc).__name__
# XXX alawys append us the last relay in error propagation path
error_msg.setdefault(
'relay_path',
[],
).append(our_uid)
# XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed
# content's `.msgdata`).
error_msg['tb_str'] = tb_str
pkt: dict = {'error': error_msg} pkt: dict = {'error': error_msg}
if cid: if cid:
pkt['cid'] = cid pkt['cid'] = cid
@ -329,7 +544,6 @@ def pack_error(
def unpack_error( def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan: Channel|None = None, chan: Channel|None = None,
@ -357,35 +571,32 @@ def unpack_error(
# retrieve the remote error's msg encoded details # retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '') tb_str: str = error_dict.get('tb_str', '')
message: str = f'{chan.uid}\n' + tb_str message: str = (
type_name: str = ( f'{chan.uid}\n'
error_dict.get('type_str') +
or error_dict['boxed_type'] tb_str
) )
suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled': # try to lookup a suitable error type from the local runtime
# env then use it to construct a local instance.
boxed_type_str: str = error_dict['boxed_type_str']
boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
if boxed_type_str == 'ContextCancelled':
box_type = ContextCancelled box_type = ContextCancelled
suberror_type = box_type assert boxed_type is box_type
else: # try to lookup a suitable local error type # TODO: already included by `_this_mod` in else loop right?
for ns in [ #
builtins, # we have an inception/onion-error so ensure
_this_mod, # we include the relay_path info and the
trio, # original source error.
]: elif boxed_type_str == 'RemoteActorError':
if suberror_type := getattr( assert boxed_type is RemoteActorError
ns, assert len(error_dict['relay_path']) >= 1
type_name,
False,
):
break
exc = box_type( exc = box_type(
message, message,
suberror_type=suberror_type,
# unpack other fields into error type init
**error_dict, **error_dict,
) )
@ -501,6 +712,11 @@ def _raise_from_no_key_in_msg(
# destined for the `Context.result()` call during ctx-exit! # destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc stream._eoc: Exception = eoc
# in case there already is some underlying remote error
# that arrived which is probably the source of this stream
# closure
ctx.maybe_raise()
raise eoc from src_err raise eoc from src_err
if ( if (

View File

@ -268,7 +268,10 @@ async def _errors_relayed_via_ipc(
entered_debug = await _debug._maybe_enter_pm(err) entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug: if not entered_debug:
log.exception('Actor crashed:\n') log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# always (try to) ship RPC errors back to caller # always (try to) ship RPC errors back to caller
if is_rpc: if is_rpc:
@ -608,7 +611,8 @@ async def _invoke(
# other side. # other side.
ctxc = ContextCancelled( ctxc = ContextCancelled(
msg, msg,
suberror_type=trio.Cancelled, boxed_type=trio.Cancelled,
# boxed_type_str='Cancelled',
canceller=canceller, canceller=canceller,
) )
# assign local error so that the `.outcome` # assign local error so that the `.outcome`
@ -666,7 +670,7 @@ async def _invoke(
f'`{repr(ctx.outcome)}`', f'`{repr(ctx.outcome)}`',
) )
) )
log.cancel( log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n' f'{ctx}\n'
) )
@ -699,12 +703,6 @@ async def try_ship_error_to_remote(
# TODO: special tb fmting for ctxc cases? # TODO: special tb fmting for ctxc cases?
# tb=tb, # tb=tb,
) )
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# actor: Actor = _state.current_actor()
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
await channel.send(msg) await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things # XXX NOTE XXX in SC terms this is one of the worst things