Compare commits

..

No commits in common. "8e66f45e2386efacb344329a05c341343f9c0659" and "668016d37b89d3e140d45e11914d606bc371f4f0" have entirely different histories.

5 changed files with 138 additions and 319 deletions

View File

@ -6,7 +6,6 @@ been an outage) and we want to ensure that despite being in debug mode
actor tree will eventually be cancelled without leaving any zombies. actor tree will eventually be cancelled without leaving any zombies.
''' '''
from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from tractor import ( from tractor import (
@ -18,7 +17,6 @@ from tractor import (
_testing, _testing,
) )
import trio import trio
import pytest
async def break_ipc( async def break_ipc(
@ -43,13 +41,6 @@ async def break_ipc(
await stream.aclose() await stream.aclose()
method: str = method or def_method method: str = method or def_method
print(
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
f'method: {method}\n'
f'pre `.aclose()`: {pre_close}\n'
'#################################\n'
)
match method: match method:
case 'trans_aclose': case 'trans_aclose':
@ -89,17 +80,17 @@ async def break_ipc_then_error(
break_ipc_with: str|None = None, break_ipc_with: str|None = None,
pre_close: bool = False, pre_close: bool = False,
): ):
await break_ipc(
stream=stream,
method=break_ipc_with,
pre_close=pre_close,
)
async for msg in stream: async for msg in stream:
await stream.send(msg) await stream.send(msg)
await break_ipc(
assert 0 stream=stream,
method=break_ipc_with,
pre_close=pre_close,
)
assert 0
# async def close_stream_and_error(
async def iter_ipc_stream( async def iter_ipc_stream(
stream: MsgStream, stream: MsgStream,
break_ipc_with: str|None = None, break_ipc_with: str|None = None,
@ -108,6 +99,20 @@ async def iter_ipc_stream(
async for msg in stream: async for msg in stream:
await stream.send(msg) await stream.send(msg)
# wipe out channel right before raising
# await break_ipc(
# stream=stream,
# method=break_ipc_with,
# pre_close=pre_close,
# )
# send channel close msg at SC-prot level
#
# TODO: what should get raised here if anything?
# await stream.aclose()
# assert 0
@context @context
async def recv_and_spawn_net_killers( async def recv_and_spawn_net_killers(
@ -129,16 +134,14 @@ async def recv_and_spawn_net_killers(
async for i in stream: async for i in stream:
print(f'child echoing {i}') print(f'child echoing {i}')
await stream.send(i) await stream.send(i)
if ( if (
break_ipc_after break_ipc_after
and and
i >= break_ipc_after i > break_ipc_after
): ):
n.start_soon( '#################################\n'
iter_ipc_stream, 'Simulating CHILD-side IPC BREAK!\n'
stream, '#################################\n'
)
n.start_soon( n.start_soon(
partial( partial(
break_ipc_then_error, break_ipc_then_error,
@ -146,23 +149,10 @@ async def recv_and_spawn_net_killers(
pre_close=pre_close, pre_close=pre_close,
) )
) )
n.start_soon(
iter_ipc_stream,
@acm stream,
async def stuff_hangin_ctlc(timeout: float = 1) -> None: )
with trio.move_on_after(timeout) as cs:
yield timeout
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print(
f"i'm a user on the PARENT side and thingz hangin "
f'after timeout={timeout} ???\n\n'
'MASHING CTlR-C..!?\n'
)
raise KeyboardInterrupt
async def main( async def main(
@ -179,6 +169,9 @@ async def main(
) -> None: ) -> None:
# from tractor._state import _runtime_vars as rtv
# rtv['_debug_mode'] = debug_mode
async with ( async with (
open_nursery( open_nursery(
start_method=start_method, start_method=start_method,
@ -197,11 +190,10 @@ async def main(
) )
async with ( async with (
stuff_hangin_ctlc(timeout=2) as timeout,
_testing.expect_ctxc( _testing.expect_ctxc(
yay=( yay=(
break_parent_ipc_after break_parent_ipc_after
or break_child_ipc_after or break_child_ipc_after,
), ),
# TODO: we CAN'T remove this right? # TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either # since we need the ctxc to bubble up from either
@ -213,14 +205,12 @@ async def main(
# and KBI in an eg? # and KBI in an eg?
reraise=True, reraise=True,
), ),
portal.open_context( portal.open_context(
recv_and_spawn_net_killers, recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after, break_ipc_after=break_child_ipc_after,
pre_close=pre_close, pre_close=pre_close,
) as (ctx, sent), ) as (ctx, sent),
): ):
rx_eoc: bool = False
ipc_break_sent: bool = False ipc_break_sent: bool = False
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
for i in range(1000): for i in range(1000):
@ -238,7 +228,6 @@ async def main(
'#################################\n' '#################################\n'
) )
# TODO: other methods? see break func above.
# await stream._ctx.chan.send(None) # await stream._ctx.chan.send(None)
# await stream._ctx.chan.transport.stream.send_eof() # await stream._ctx.chan.transport.stream.send_eof()
await stream._ctx.chan.transport.stream.aclose() await stream._ctx.chan.transport.stream.aclose()
@ -262,12 +251,10 @@ async def main(
# TODO: is this needed or no? # TODO: is this needed or no?
raise raise
# timeout: int = 1 timeout: int = 1
# with trio.move_on_after(timeout) as cs: print(f'Entering `stream.receive()` with timeout={timeout}\n')
async with stuff_hangin_ctlc() as timeout: with trio.move_on_after(timeout) as cs:
print(
f'PARENT `stream.receive()` with timeout={timeout}\n'
)
# NOTE: in the parent side IPC failure case this # NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child # will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's # is killed and sends a stop msg back to it's
@ -279,30 +266,23 @@ async def main(
f'{rx}\n' f'{rx}\n'
) )
except trio.EndOfChannel: except trio.EndOfChannel:
rx_eoc: bool = True
print('MsgStream got EoC for PARENT') print('MsgStream got EoC for PARENT')
raise raise
print( if cs.cancelled_caught:
'Streaming finished and we got Eoc.\n' # pretend to be a user seeing no streaming action
'Canceling `.open_context()` in root with\n' # thinking it's a hang, and then hitting ctl-c..
'CTlR-C..' print(
) f"YOO i'm a PARENT user anddd thingz hangin..\n"
if rx_eoc: f'after timeout={timeout}\n'
assert stream.closed )
try:
await stream.send(i)
pytest.fail('stream not closed?')
except (
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:
if rx_eoc:
assert send_err is stream._eoc
else:
assert send_err is stream._closed
raise KeyboardInterrupt print(
"YOO i'm mad!\n"
'The send side is dun but thingz hangin..\n'
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -85,8 +85,8 @@ def test_ipc_channel_break_during_stream(
''' '''
if spawn_backend != 'trio': if spawn_backend != 'trio':
if debug_mode: # if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner') # pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that # non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree. # requires the user to do ctl-c to cancel the actor tree.
@ -107,10 +107,7 @@ def test_ipc_channel_break_during_stream(
# AND we tell the child to call `MsgStream.aclose()`. # AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream and pre_aclose_msgstream
): ):
# expect_final_exc = trio.EndOfChannel expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
# gracefully!
expect_final_exc = KeyboardInterrupt
# NOTE when ONLY the child breaks or it breaks BEFORE the # NOTE when ONLY the child breaks or it breaks BEFORE the
# parent we expect the parent to get a closed resource error # parent we expect the parent to get a closed resource error
@ -123,25 +120,11 @@ def test_ipc_channel_break_during_stream(
and and
ipc_break['break_parent_ipc_after'] is False ipc_break['break_parent_ipc_after'] is False
): ):
# NOTE: we DO NOT expect this any more since expect_final_exc = trio.ClosedResourceError
# the child side's channel will be broken silently
# and nothing on the parent side will indicate this!
# expect_final_exc = trio.ClosedResourceError
# NOTE: child will send a 'stop' msg before it breaks # if child calls `MsgStream.aclose()` then expect EoC.
# the transport channel BUT, that will be absorbed by the
# `ctx.open_stream()` block and thus the `.open_context()`
# should hang, after which the test script simulates
# a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = trio.EndOfChannel
# XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC.
# ^ XXX not any more ^ since eoc is always absorbed
# gracefully and NOT bubbled to the `.open_context()`
# block!
# expect_final_exc = trio.EndOfChannel
# BOTH but, CHILD breaks FIRST # BOTH but, CHILD breaks FIRST
elif ( elif (
@ -151,8 +134,12 @@ def test_ipc_channel_break_during_stream(
> ipc_break['break_child_ipc_after'] > ipc_break['break_child_ipc_after']
) )
): ):
expect_final_exc = trio.ClosedResourceError
# child will send a 'stop' msg before it breaks
# the transport channel.
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = trio.EndOfChannel
# NOTE when the parent IPC side dies (even if the child's does as well # NOTE when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we always expect the # but the child fails BEFORE the parent) we always expect the
@ -173,8 +160,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after'] is not False ipc_break['break_parent_ipc_after'] is not False
and ( and (
ipc_break['break_child_ipc_after'] ipc_break['break_child_ipc_after']
> > ipc_break['break_parent_ipc_after']
ipc_break['break_parent_ipc_after']
) )
): ):
expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
@ -238,29 +224,25 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
''' '''
async def main(): async def main():
with trio.fail_after(3): async with tractor.open_nursery() as n:
async with tractor.open_nursery() as n: portal = await n.start_actor(
portal = await n.start_actor( 'ipc_breaker',
'ipc_breaker', enable_modules=[__name__],
enable_modules=[__name__], )
)
with trio.move_on_after(1): with trio.move_on_after(1):
async with ( async with (
portal.open_context( portal.open_context(
break_ipc_after_started break_ipc_after_started
) as (ctx, sent), ) as (ctx, sent),
): ):
async with ctx.open_stream(): async with ctx.open_stream():
await trio.sleep(0.5) await trio.sleep(0.5)
print('parent waiting on context') print('parent waiting on context')
print( print('parent exited context')
'parent exited context\n' raise KeyboardInterrupt
'parent raising KBI..\n'
)
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run(main) trio.run(main)

View File

@ -16,11 +16,6 @@ from tractor import ( # typing
Portal, Portal,
Context, Context,
ContextCancelled, ContextCancelled,
RemoteActorError,
)
from tractor._testing import (
# tractor_test,
expect_ctxc,
) )
# XXX TODO cases: # XXX TODO cases:
@ -161,11 +156,10 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
): ):
await trio.sleep_forever() await trio.sleep_forever()
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
rae = excinfo.value assert excinfo.value.type == TypeError
assert rae.boxed_type == TypeError
@tractor.context @tractor.context
@ -745,16 +739,14 @@ def test_peer_canceller(
with pytest.raises(ContextCancelled) as excinfo: with pytest.raises(ContextCancelled) as excinfo:
trio.run(main) trio.run(main)
assert excinfo.value.boxed_type == ContextCancelled assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller' assert excinfo.value.canceller[0] == 'canceller'
@tractor.context @tractor.context
async def basic_echo_server( async def basic_echo_server(
ctx: Context, ctx: Context,
peer_name: str = 'wittle_bruv', peer_name: str = 'stepbro',
err_after: int|None = None,
) -> None: ) -> None:
''' '''
@ -782,31 +774,17 @@ async def basic_echo_server(
# assert 0 # assert 0
await ipc.send(resp) await ipc.send(resp)
if (
err_after
and i > err_after
):
raise RuntimeError(
f'Simulated error in `{peer_name}`'
)
@tractor.context @tractor.context
async def serve_subactors( async def serve_subactors(
ctx: Context, ctx: Context,
peer_name: str, peer_name: str,
debug_mode: bool,
) -> None: ) -> None:
async with open_nursery() as an: async with open_nursery() as an:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
await ctx.started(peer_name) await ctx.started(peer_name)
async with ctx.open_stream() as ipc: async with ctx.open_stream() as reqs:
async for msg in ipc: async for msg in reqs:
peer_name: str = msg peer_name: str = msg
peer: Portal = await an.start_actor( peer: Portal = await an.start_actor(
name=peer_name, name=peer_name,
@ -817,7 +795,7 @@ async def serve_subactors(
f'{peer_name}\n' f'{peer_name}\n'
f'|_{peer}\n' f'|_{peer}\n'
) )
await ipc.send(( await reqs.send((
peer.chan.uid, peer.chan.uid,
peer.chan.raddr, peer.chan.raddr,
)) ))
@ -829,20 +807,14 @@ async def serve_subactors(
async def client_req_subactor( async def client_req_subactor(
ctx: Context, ctx: Context,
peer_name: str, peer_name: str,
debug_mode: bool,
# used to simulate a user causing an error to be raised # used to simulate a user causing an error to be raised
# directly in thread (like a KBI) to better replicate the # directly in thread (like a KBI) to better replicate the
# case where a `modden` CLI client would hang afer requesting # case where a `modden` CLI client would hang afer requesting
# a `Context.cancel()` to `bigd`'s wks spawner. # a `Context.cancel()` to `bigd`'s wks spawner.
reraise_on_cancel: str|None = None, reraise_on_cancel: str|None = None,
sub_err_after: int|None = None,
) -> None: ) -> None:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
# TODO: other cases to do with sub lifetimes: # TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub # -[ ] test that we can have the server spawn a sub
# that lives longer then ctx with this client. # that lives longer then ctx with this client.
@ -864,7 +836,6 @@ async def client_req_subactor(
spawner.open_context( spawner.open_context(
serve_subactors, serve_subactors,
peer_name=peer_name, peer_name=peer_name,
debug_mode=debug_mode,
) as (spawner_ctx, first), ) as (spawner_ctx, first),
): ):
assert first == peer_name assert first == peer_name
@ -886,7 +857,6 @@ async def client_req_subactor(
await tell_little_bro( await tell_little_bro(
actor_name=sub_uid[0], actor_name=sub_uid[0],
caller='client', caller='client',
err_after=sub_err_after,
) )
# TODO: test different scope-layers of # TODO: test different scope-layers of
@ -898,7 +868,9 @@ async def client_req_subactor(
# TODO: would be super nice to have a special injected # TODO: would be super nice to have a special injected
# cancel type here (maybe just our ctxc) but using # cancel type here (maybe just our ctxc) but using
# some native mechanism in `trio` :p # some native mechanism in `trio` :p
except trio.Cancelled as err: except (
trio.Cancelled
) as err:
_err = err _err = err
if reraise_on_cancel: if reraise_on_cancel:
errtype = globals()['__builtins__'][reraise_on_cancel] errtype = globals()['__builtins__'][reraise_on_cancel]
@ -925,9 +897,7 @@ async def client_req_subactor(
async def tell_little_bro( async def tell_little_bro(
actor_name: str, actor_name: str,
caller: str = ''
caller: str = '',
err_after: int|None = None,
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
async with ( async with (
@ -936,12 +906,10 @@ async def tell_little_bro(
) as lb, ) as lb,
lb.open_context( lb.open_context(
basic_echo_server, basic_echo_server,
# XXX proxy any delayed err condition
err_after=err_after,
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream(
sub_ctx.open_stream() as echo_ipc, basic_echo_server,
) as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
uid: tuple = actor.uid uid: tuple = actor.uid
@ -968,15 +936,10 @@ async def tell_little_bro(
'raise_client_error', 'raise_client_error',
[None, 'KeyboardInterrupt'], [None, 'KeyboardInterrupt'],
) )
@pytest.mark.parametrize(
'raise_sub_spawn_error_after',
[None, 50],
)
def test_peer_spawns_and_cancels_service_subactor( def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool, debug_mode: bool,
raise_client_error: str, raise_client_error: str,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
raise_sub_spawn_error_after: int|None,
): ):
# NOTE: this tests for the modden `mod wks open piker` bug # NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx # discovered as part of implementing workspace ctx
@ -990,16 +953,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate! # and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro' peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError):
'''
Validate the little_bro's relayed inception!
'''
assert rae.boxed_type is RemoteActorError
assert rae.src_type is RuntimeError
assert 'client' in rae.relay_uid
assert peer_name in rae.src_uid
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this. # NOTE: to halt the peer tasks on ctxc, uncomment this.
@ -1023,24 +976,14 @@ def test_peer_spawns_and_cancels_service_subactor(
server.open_context( server.open_context(
serve_subactors, serve_subactors,
peer_name=peer_name, peer_name=peer_name,
debug_mode=debug_mode,
) as (spawn_ctx, first), ) as (spawn_ctx, first),
client.open_context( client.open_context(
client_req_subactor, client_req_subactor,
peer_name=peer_name, peer_name=peer_name,
debug_mode=debug_mode,
reraise_on_cancel=raise_client_error, reraise_on_cancel=raise_client_error,
# trigger for error condition in sub
# during streaming.
sub_err_after=raise_sub_spawn_error_after,
) as (client_ctx, client_says), ) as (client_ctx, client_says),
): ):
root: Actor = current_actor()
spawner_uid: tuple = spawn_ctx.chan.uid
print( print(
f'Server says: {first}\n' f'Server says: {first}\n'
f'Client says: {client_says}\n' f'Client says: {client_says}\n'
@ -1050,7 +993,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# (grandchild of this root actor) "little_bro" # (grandchild of this root actor) "little_bro"
# and ensure we can also use it as an echo # and ensure we can also use it as an echo
# server. # server.
sub: Portal
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
name=peer_name, name=peer_name,
) as sub: ) as sub:
@ -1062,139 +1004,56 @@ def test_peer_spawns_and_cancels_service_subactor(
f'.uid: {sub.actor.uid}\n' f'.uid: {sub.actor.uid}\n'
f'chan.raddr: {sub.chan.raddr}\n' f'chan.raddr: {sub.chan.raddr}\n'
) )
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
async with expect_ctxc( # signal client to raise a KBI
yay=raise_sub_spawn_error_after, await client_ctx.cancel()
reraise=False, print('root cancelled client, checking that sub-spawn is down')
):
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
if not raise_sub_spawn_error_after: async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# signal client to cancel and maybe raise a KBI print('root cancelling server/client sub-actors')
await client_ctx.cancel()
print(
'-> root cancelling client,\n'
'-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n'
)
# else:
try: # await tractor.pause()
res = await client_ctx.result(hide_tb=False) res = await client_ctx.result(hide_tb=False)
assert isinstance(res, ContextCancelled)
# in remote (relayed inception) error assert client_ctx.cancel_acked
# case, we should error on the line above! assert res.canceller == current_actor().uid
if raise_sub_spawn_error_after:
pytest.fail(
'Never rxed proxied `RemoteActorError[RuntimeError]` !?'
)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == root.uid
except RemoteActorError as rae:
_err = rae
assert raise_sub_spawn_error_after
# since this is a "relayed error" via the client
# sub-actor, it is expected to be
# a `RemoteActorError` boxing another
# `RemoteActorError` otherwise known as
# an "inception" (from `trio`'s parlance)
# ((or maybe a "Matryoshka" and/or "matron"
# in our own working parlance)) which
# contains the source error from the
# little_bro: a `RuntimeError`.
#
check_inner_rte(rae)
assert rae.relay_uid == client.chan.uid
assert rae.src_uid == sub.chan.uid
assert not client_ctx.cancel_acked
assert (
client_ctx.maybe_error
is client_ctx.outcome
is rae
)
raise
# await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
await spawn_ctx.cancel()
# await server.cancel_actor() # await server.cancel_actor()
except RemoteActorError as rae:
# XXX more-or-less same as above handler
# this is just making sure the error bubbles out
# of the
_err = rae
assert raise_sub_spawn_error_after
raise
# since we called `.cancel_actor()`, `.cancel_ack` # since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not # will not be set on the ctx bc `ctx.cancel()` was not
# called directly fot this confext. # called directly fot this confext.
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
_ctxc = ctxc print('caught ctxc from contexts!')
print( assert ctxc.canceller == current_actor().uid
f'{root.uid} caught ctxc from ctx with {client_ctx.chan.uid}\n'
f'{repr(ctxc)}\n'
)
if not raise_sub_spawn_error_after:
assert ctxc.canceller == root.uid
else:
assert ctxc.canceller == spawner_uid
assert ctxc is spawn_ctx.outcome assert ctxc is spawn_ctx.outcome
assert ctxc is spawn_ctx.maybe_error assert ctxc is spawn_ctx.maybe_error
raise raise
if raise_sub_spawn_error_after: # assert spawn_ctx.cancel_acked
pytest.fail( assert spawn_ctx.cancel_acked
'context block(s) in PARENT never raised?!?' assert client_ctx.cancel_acked
)
if not raise_sub_spawn_error_after: await client.cancel_actor()
# assert spawn_ctx.cancel_acked await server.cancel_actor()
assert spawn_ctx.cancel_acked
assert client_ctx.cancel_acked
await client.cancel_actor() # WOA WOA WOA! we need this to close..!!!??
await server.cancel_actor() # that's super bad XD
# WOA WOA WOA! we need this to close..!!!?? # TODO: why isn't this working!?!?
# that's super bad XD # we're now outside the `.open_context()` block so
# the internal `Context._scope: CancelScope` should be
# gracefully "closed" ;)
# TODO: why isn't this working!?!? # assert spawn_ctx.cancelled_caught
# we're now outside the `.open_context()` block so
# the internal `Context._scope: CancelScope` should be
# gracefully "closed" ;)
# assert spawn_ctx.cancelled_caught trio.run(main)
if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
rae: RemoteActorError = excinfo.value
check_inner_rte(rae)
else:
trio.run(main)

View File

@ -111,6 +111,7 @@ class RemoteActorError(Exception):
reprol_fields: list[str] = [ reprol_fields: list[str] = [
'src_uid', 'src_uid',
'relay_path', 'relay_path',
# 'relay_uid',
] ]
def __init__( def __init__(
@ -486,11 +487,14 @@ def pack_error(
else: else:
tb_str = traceback.format_exc() tb_str = traceback.format_exc()
error_msg: dict[ # for IPC our_uid: tuple = current_actor().uid
error_msg: dict[
str, str,
str | tuple[str, str] str | tuple[str, str]
] = {} ] = {
our_uid: tuple = current_actor().uid 'tb_str': tb_str,
'relay_uid': our_uid,
}
if ( if (
isinstance(exc, RemoteActorError) isinstance(exc, RemoteActorError)
@ -531,11 +535,6 @@ def pack_error(
[], [],
).append(our_uid) ).append(our_uid)
# XXX NOTE: always ensure the traceback-str is from the
# locally raised error (**not** the prior relay's boxed
# content's `.msgdata`).
error_msg['tb_str'] = tb_str
pkt: dict = {'error': error_msg} pkt: dict = {'error': error_msg}
if cid: if cid:
pkt['cid'] = cid pkt['cid'] = cid

View File

@ -715,12 +715,10 @@ class Actor:
f'|_{chan}\n' f'|_{chan}\n'
) )
try: try:
# send msg loop terminate sentinel which # send a msg loop terminate sentinel
# triggers cancellation of all remotely
# started tasks.
await chan.send(None) await chan.send(None)
# XXX: do we want this? no right? # XXX: do we want this?
# causes "[104] connection reset by peer" on other end # causes "[104] connection reset by peer" on other end
# await chan.aclose() # await chan.aclose()
@ -1210,10 +1208,10 @@ class Actor:
# - callee self raises ctxc before caller send request, # - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req. # - callee errors prior to cancel req.
log.cancel( log.cancel(
'Cancel request invalid, RPC task already completed?\n\n' 'Cancel request invalid, RPC task already completed?\n'
f'<= canceller: {requesting_uid}\n\n' f'<= canceller: {requesting_uid}\n\n'
f'=> {cid}@{parent_chan.uid}\n' f'=>{parent_chan}\n'
f' |_{parent_chan}\n' f' |_ctx-id: {cid}\n'
) )
return True return True
@ -1512,6 +1510,7 @@ async def async_main(
): ):
accept_addrs = set_accept_addr_says_rent accept_addrs = set_accept_addr_says_rent
# The "root" nursery ensures the channel with the immediate # The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in # cancellation steps have (mostly) occurred in