forked from goodboy/tractor
1
0
Fork 0

Tweak inter-peer tests for new/refined semantics

Buncha subtle details changed mostly to do with when `Context.cancel()`
gets called on "real" remote errors vs. (peer requested) cancellation
and then local side handling of `ContextCancelled`.

Specific changes to make tests pass:
- due to raciness with `sleeper_ctx.result()` raising the ctxc locally
  vs. the child-peers receiving similar ctxcs themselves (and then
  erroring and propagating back to the root parent), we might not see
  `._remote_error` set during the sub-ctx loops (except for the sleeper
  itself obvi).
- do not expect `.cancel_called`/`.cancel_caught` to be set on any
  sub-ctx since currently `Context.cancel()` is only called non-shielded
  and thus is not in invoked when `._scope.cancel()` is called as part
  of each root-side ctx ref/block handling the inter-peer ctxc.
- do not expect `Context._scope.cancelled_caught` to be set in most cases
  (even the sleeper)

TODO Outstanding adjustments not fixed yet:
-[ ] `_scope.cancelled_caught` checks outside the `.open_context()`
  blocks.
modden_spawn_from_client_req
Tyler Goodlet 2024-03-06 10:13:41 -05:00
parent 7c22f76274
commit 9e3f41a5b1
1 changed files with 62 additions and 35 deletions

View File

@ -220,11 +220,12 @@ async def stream_from_peer(
# - what about IPC-transport specific errors, should # - what about IPC-transport specific errors, should
# they bubble from the async for and trigger # they bubble from the async for and trigger
# other special cases? # other special cases?
#
# NOTE: current ctl flow: # NOTE: current ctl flow:
# - stream raises `trio.EndOfChannel` and # - stream raises `trio.EndOfChannel` and
# exits the loop # exits the loop
# - `.open_context()` will raise the ctxcanc # - `.open_context()` will raise the ctxc received
# received from the sleeper. # from the sleeper.
async for msg in stream: async for msg in stream:
assert msg is not None assert msg is not None
print(msg) print(msg)
@ -383,11 +384,11 @@ def test_peer_canceller(
) as (canceller_ctx, sent), ) as (canceller_ctx, sent),
): ):
ctxs: list[Context] = [ ctxs: dict[str, Context] = {
sleeper_ctx, 'sleeper': sleeper_ctx,
caller_ctx, 'caller': caller_ctx,
canceller_ctx, 'canceller': canceller_ctx,
] }
try: try:
print('PRE CONTEXT RESULT') print('PRE CONTEXT RESULT')
@ -505,14 +506,17 @@ def test_peer_canceller(
# NOTE: this root actor task should have # NOTE: this root actor task should have
# called `Context.cancel()` on the # called `Context.cancel()` on the
# `.__aexit__()` to every opened ctx. # `.__aexit__()` to every opened ctx.
for ctx in ctxs: for name, ctx in ctxs.items():
assert ctx.cancel_called
# this root actor task should have # this root actor task should have
# cancelled all opened contexts except the # cancelled all opened contexts except the
# sleeper which is obvi by the "canceller" # sleeper which is obvi by the "canceller"
# peer. # peer.
re = ctx._remote_error re = ctx._remote_error
le = ctx._local_error
assert ctx.cancel_called
if ( if (
ctx is sleeper_ctx ctx is sleeper_ctx
or ctx is caller_ctx or ctx is caller_ctx
@ -566,32 +570,43 @@ def test_peer_canceller(
# the sleeper's remote error is the error bubbled # the sleeper's remote error is the error bubbled
# out of the context-stack above! # out of the context-stack above!
re = sleeper_ctx.outcome final_err = sleeper_ctx.outcome
assert ( assert (
re is loc_err final_err is loc_err
is sleeper_ctx.maybe_error is sleeper_ctx.maybe_error
is sleeper_ctx._remote_error is sleeper_ctx._remote_error
) )
for ctx in ctxs: for name, ctx in ctxs.items():
re: BaseException|None = ctx._remote_error re: BaseException|None = ctx._remote_error
re: BaseException|None = ctx.outcome le: BaseException|None = ctx._local_error
assert ( err = ctx.maybe_error
re and out = ctx.outcome
(
re is ctx.maybe_error # every ctx should error!
is ctx._remote_error assert out is err
)
) # the recorded local erro should always be
le: trio.MultiError = ctx._local_error # the same as the one raised by the
# `sleeper_ctx.result()` call
assert ( assert (
le le
and ctx._local_error and
le is loc_err
) )
# root doesn't cancel sleeper since it's # root doesn't cancel sleeper since it's
# cancelled by its peer. # cancelled by its peer.
if ctx is sleeper_ctx: if ctx is sleeper_ctx:
assert re
assert (
ctx._remote_error
is ctx.maybe_error
is ctx.outcome
is ctx._local_error
)
assert not ctx.cancel_called assert not ctx.cancel_called
assert not ctx.cancel_acked assert not ctx.cancel_acked
@ -601,21 +616,38 @@ def test_peer_canceller(
# `ContextCancelled` for it and thus # `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught` # the logic inside `.cancelled_caught`
# should trigger! # should trigger!
assert ctx._scope.cancelled_caught assert not ctx._scope.cancelled_caught
elif ctx is caller_ctx: elif ctx in (
caller_ctx,
canceller_ctx,
):
assert not ctx._remote_error
# the `canceller_ctx` shouldn't
# have called `ctx.cancel()` either!
#
# since its context was remotely # since its context was remotely
# cancelled, we never needed to # cancelled, we never needed to call
# call `Context.cancel()` bc it was # `Context.cancel()` bc the far end
# done by the peer and also we never # task already done by the peer and
assert ctx.cancel_called # also we never
assert not ctx.cancel_called
# TODO: figure out the details of this..? # TODO: figure out the details of this..?
# if you look the `._local_error` here # if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds? # is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught # assert not ctx.cancelled_caught
elif ctx is canceller_ctx: assert (
not ctx.cancel_called
and not ctx.cancel_acked
)
assert not ctx._scope.cancelled_caught
# elif ctx is canceller_ctx:
# assert not ctx._remote_error
# XXX NOTE XXX: ONLY the canceller # XXX NOTE XXX: ONLY the canceller
# will get a self-cancelled outcome # will get a self-cancelled outcome
@ -626,11 +658,6 @@ def test_peer_canceller(
# .cancel() whenever an interpeer # .cancel() whenever an interpeer
# cancel takes place since each # cancel takes place since each
# reception of a ctxc # reception of a ctxc
assert (
ctx.cancel_called
and ctx.cancel_acked
)
assert not ctx._scope.cancelled_caught
else: else:
pytest.fail( pytest.fail(
@ -663,7 +690,7 @@ def test_peer_canceller(
# `.open_context()` block has exited and should be # `.open_context()` block has exited and should be
# set in both outcomes including the case where # set in both outcomes including the case where
# ctx-cancel handling itself errors. # ctx-cancel handling itself errors.
assert sleeper_ctx._scope.cancelled_caught assert not sleeper_ctx._scope.cancelled_caught
assert _loc_err is sleeper_ctx._local_error assert _loc_err is sleeper_ctx._local_error
assert ( assert (
sleeper_ctx.outcome sleeper_ctx.outcome