forked from goodboy/tractor
Get inter-peer suite passing with all `Context` state checks!
Definitely needs some cleaning and refinement but this gets us to stage
1 of being pretty frickin correct i'd say 💃
multihomed
parent
ecb525a2bc
commit
ef0cfc4b20
|
@ -15,6 +15,26 @@ from tractor import ( # typing
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX TODO cases:
|
||||||
|
# - [ ] peer cancelled itself - so other peers should
|
||||||
|
# get errors reflecting that the peer was itself the .canceller?
|
||||||
|
|
||||||
|
# - [x] WE cancelled the peer and thus should not see any raised
|
||||||
|
# `ContextCancelled` as it should be reaped silently?
|
||||||
|
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||||
|
# already covers this case?
|
||||||
|
|
||||||
|
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||||
|
# Portal.cancel_actor().
|
||||||
|
# => all other connected peers should get that cancel requesting peer's
|
||||||
|
# uid in the ctx-cancelled error msg raised in all open ctxs
|
||||||
|
# with that peer.
|
||||||
|
|
||||||
|
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
|
||||||
|
# (also) spawned a failing task which was unhandled and
|
||||||
|
# propagated up to the immediate parent - the peer to the actor
|
||||||
|
# that also spawned a remote task task in that same peer-parent.
|
||||||
|
|
||||||
|
|
||||||
# def test_self_cancel():
|
# def test_self_cancel():
|
||||||
# '''
|
# '''
|
||||||
|
@ -29,14 +49,30 @@ from tractor import ( # typing
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def sleep_forever(
|
async def sleep_forever(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
expect_ctxc: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Sync the context, open a stream then just sleep.
|
Sync the context, open a stream then just sleep.
|
||||||
|
|
||||||
|
Allow checking for (context) cancellation locally.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await ctx.started()
|
try:
|
||||||
async with ctx.open_stream():
|
await ctx.started()
|
||||||
await trio.sleep_forever()
|
async with ctx.open_stream():
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
except BaseException as berr:
|
||||||
|
|
||||||
|
# TODO: it'd sure be nice to be able to inject our own
|
||||||
|
# `ContextCancelled` here instead of of `trio.Cancelled`
|
||||||
|
# so that our runtime can expect it and this "user code"
|
||||||
|
# would be able to tell the diff between a generic trio
|
||||||
|
# cancel and a tractor runtime-IPC cancel.
|
||||||
|
if expect_ctxc:
|
||||||
|
assert isinstance(berr, trio.Cancelled)
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -145,6 +181,7 @@ async def stream_ints(
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
for i in itertools.count():
|
for i in itertools.count():
|
||||||
await stream.send(i)
|
await stream.send(i)
|
||||||
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -157,77 +194,111 @@ async def stream_from_peer(
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
tractor.wait_for_actor(peer_name) as peer,
|
tractor.wait_for_actor(peer_name) as peer,
|
||||||
peer.open_context(stream_ints) as (peer_ctx, first),
|
# peer.open_context(stream_ints) as (peer_ctx, first),
|
||||||
peer_ctx.open_stream() as stream,
|
# peer_ctx.open_stream() as stream,
|
||||||
):
|
):
|
||||||
await ctx.started()
|
async with (
|
||||||
# XXX TODO: big set of questions for this
|
peer.open_context(stream_ints) as (peer_ctx, first),
|
||||||
# - should we raise `ContextCancelled` or `Cancelled` (rn
|
# peer_ctx.open_stream() as stream,
|
||||||
# it does that) here?!
|
):
|
||||||
# - test the `ContextCancelled` OUTSIDE the
|
# # try:
|
||||||
# `.open_context()` call?
|
async with (
|
||||||
try:
|
peer_ctx.open_stream() as stream,
|
||||||
async for msg in stream:
|
):
|
||||||
print(msg)
|
|
||||||
|
|
||||||
except trio.Cancelled:
|
await ctx.started()
|
||||||
assert not ctx.cancel_called
|
# XXX QUESTIONS & TODO: for further details around this
|
||||||
assert not ctx.cancelled_caught
|
# in the longer run..
|
||||||
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
|
# - should we raise `ContextCancelled` or `Cancelled` (rn
|
||||||
|
# it does latter) and should/could it be implemented
|
||||||
|
# as a general injection override for `trio` such
|
||||||
|
# that ANY next checkpoint would raise the "cancel
|
||||||
|
# error type" of choice?
|
||||||
|
# - should the `ContextCancelled` bubble from
|
||||||
|
# all `Context` and `MsgStream` apis wherein it
|
||||||
|
# prolly makes the most sense to make it
|
||||||
|
# a `trio.Cancelled` subtype?
|
||||||
|
# - what about IPC-transport specific errors, should
|
||||||
|
# they bubble from the async for and trigger
|
||||||
|
# other special cases?
|
||||||
|
# try:
|
||||||
|
# NOTE: current ctl flow:
|
||||||
|
# - stream raises `trio.EndOfChannel` and
|
||||||
|
# exits the loop
|
||||||
|
# - `.open_context()` will raise the ctxcanc
|
||||||
|
# received from the sleeper.
|
||||||
|
async for msg in stream:
|
||||||
|
assert msg is not None
|
||||||
|
print(msg)
|
||||||
|
# finally:
|
||||||
|
# await trio.sleep(0.1)
|
||||||
|
# from tractor import pause
|
||||||
|
# await pause()
|
||||||
|
|
||||||
assert not peer_ctx.cancel_called
|
# except BaseException as berr:
|
||||||
assert not peer_ctx.cancelled_caught
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await tractor.pause()
|
||||||
|
# raise
|
||||||
|
|
||||||
assert 'root' in ctx.cancel_called_remote
|
# except trio.Cancelled:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
raise # XXX MUST NEVER MASK IT!!
|
# await tractor.pause()
|
||||||
|
# raise # XXX NEVER MASK IT
|
||||||
with trio.CancelScope(shield=True):
|
# from tractor import pause
|
||||||
await tractor.pause()
|
# await pause()
|
||||||
# pass
|
|
||||||
# pytest.fail(
|
|
||||||
raise RuntimeError(
|
|
||||||
'peer never triggered local `[Context]Cancelled`?!?'
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: cancellation of the (sleeper) peer should always
|
# NOTE: cancellation of the (sleeper) peer should always
|
||||||
# cause a `ContextCancelled` raise in this streaming
|
# cause a `ContextCancelled` raise in this streaming
|
||||||
# actor.
|
# actor.
|
||||||
except ContextCancelled as ctxerr:
|
except ContextCancelled as ctxerr:
|
||||||
assert ctxerr.canceller == 'canceller'
|
err = ctxerr
|
||||||
assert ctxerr._remote_error is ctxerr
|
assert peer_ctx._remote_error is ctxerr
|
||||||
|
assert peer_ctx.canceller == ctxerr.canceller
|
||||||
|
|
||||||
# CASE 1: we were cancelled by our parent, the root actor.
|
# caller peer should not be the cancel requester
|
||||||
# TODO: there are other cases depending on how the root
|
assert not ctx.cancel_called
|
||||||
# actor and it's caller side task are written:
|
# XXX can never be true since `._invoke` only
|
||||||
# - if the root does not req us to cancel then an
|
# sets this AFTER the nursery block this task
|
||||||
# IPC-transport related error should bubble from the async
|
# was started in, exits.
|
||||||
# for loop and thus cause local cancellation both here
|
assert not ctx.cancelled_caught
|
||||||
# and in the root (since in that case this task cancels the
|
|
||||||
# context with the root, not the other way around)
|
# we never requested cancellation
|
||||||
assert ctx.cancel_called_remote[0] == 'root'
|
assert not peer_ctx.cancel_called
|
||||||
|
# the `.open_context()` exit definitely
|
||||||
|
# caught a cancellation in the internal `Context._scope`
|
||||||
|
# since likely the runtime called `_deliver_msg()`
|
||||||
|
# after receiving the remote error from the streaming
|
||||||
|
# task.
|
||||||
|
assert peer_ctx.cancelled_caught
|
||||||
|
|
||||||
|
# TODO / NOTE `.canceller` won't have been set yet
|
||||||
|
# here because that machinery is inside
|
||||||
|
# `.open_context().__aexit__()` BUT, if we had
|
||||||
|
# a way to know immediately (from the last
|
||||||
|
# checkpoint) that cancellation was due to
|
||||||
|
# a remote, we COULD assert this here..see,
|
||||||
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
|
|
||||||
|
# root/parent actor task should NEVER HAVE cancelled us!
|
||||||
|
assert not ctx.canceller
|
||||||
|
assert 'canceller' in peer_ctx.canceller
|
||||||
|
|
||||||
|
# TODO: IN THEORY we could have other cases depending on
|
||||||
|
# who cancels first, the root actor or the canceller peer.
|
||||||
|
#
|
||||||
|
# 1- when the peer request is first then the `.canceller`
|
||||||
|
# field should obvi be set to the 'canceller' uid,
|
||||||
|
#
|
||||||
|
# 2-if the root DOES req cancel then we should see the same
|
||||||
|
# `trio.Cancelled` implicitly raised
|
||||||
|
# assert ctx.canceller[0] == 'root'
|
||||||
|
# assert peer_ctx.canceller[0] == 'sleeper'
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# except BaseException as err:
|
raise RuntimeError(
|
||||||
|
'peer never triggered local `ContextCancelled`?'
|
||||||
# raise
|
)
|
||||||
|
|
||||||
# cases:
|
|
||||||
# - some arbitrary remote peer cancels via Portal.cancel_actor().
|
|
||||||
# => all other connected peers should get that cancel requesting peer's
|
|
||||||
# uid in the ctx-cancelled error msg.
|
|
||||||
|
|
||||||
# - peer spawned a sub-actor which (also) spawned a failing task
|
|
||||||
# which was unhandled and propagated up to the immediate
|
|
||||||
# parent, the peer to the actor that also spawned a remote task
|
|
||||||
# task in that same peer-parent.
|
|
||||||
|
|
||||||
# - peer cancelled itself - so other peers should
|
|
||||||
# get errors reflecting that the peer was itself the .canceller?
|
|
||||||
|
|
||||||
# - WE cancelled the peer and thus should not see any raised
|
|
||||||
# `ContextCancelled` as it should be reaped silently?
|
|
||||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
|
||||||
# already covers this case?
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'error_during_ctxerr_handling',
|
'error_during_ctxerr_handling',
|
||||||
|
@ -251,8 +322,8 @@ def test_peer_canceller(
|
||||||
line and be less indented.
|
line and be less indented.
|
||||||
|
|
||||||
.actor0> ()-> .actor1>
|
.actor0> ()-> .actor1>
|
||||||
a inter-actor task context opened (by `async with `Portal.open_context()`)
|
a inter-actor task context opened (by `async with
|
||||||
from actor0 *into* actor1.
|
`Portal.open_context()`) from actor0 *into* actor1.
|
||||||
|
|
||||||
.actor0> ()<=> .actor1>
|
.actor0> ()<=> .actor1>
|
||||||
a inter-actor task context opened (as above)
|
a inter-actor task context opened (as above)
|
||||||
|
@ -287,11 +358,11 @@ def test_peer_canceller(
|
||||||
5. .canceller> ()-> .sleeper>
|
5. .canceller> ()-> .sleeper>
|
||||||
- calls `Portal.cancel_actor()`
|
- calls `Portal.cancel_actor()`
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery(
|
||||||
|
# debug_mode=True
|
||||||
|
) as an:
|
||||||
canceller: Portal = await an.start_actor(
|
canceller: Portal = await an.start_actor(
|
||||||
'canceller',
|
'canceller',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
|
@ -305,10 +376,13 @@ def test_peer_canceller(
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
root = tractor.current_actor()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
sleeper.open_context(
|
sleeper.open_context(
|
||||||
sleep_forever,
|
sleep_forever,
|
||||||
|
expect_ctxc=True,
|
||||||
) as (sleeper_ctx, sent),
|
) as (sleeper_ctx, sent),
|
||||||
|
|
||||||
just_caller.open_context(
|
just_caller.open_context(
|
||||||
|
@ -328,6 +402,7 @@ def test_peer_canceller(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print('PRE CONTEXT RESULT')
|
print('PRE CONTEXT RESULT')
|
||||||
|
# await tractor.pause()
|
||||||
await sleeper_ctx.result()
|
await sleeper_ctx.result()
|
||||||
|
|
||||||
# should never get here
|
# should never get here
|
||||||
|
@ -343,8 +418,8 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# canceller and caller peers should not
|
# canceller and caller peers should not
|
||||||
# have been remotely cancelled.
|
# have been remotely cancelled.
|
||||||
assert canceller_ctx.cancel_called_remote is None
|
assert canceller_ctx.canceller is None
|
||||||
assert caller_ctx.cancel_called_remote is None
|
assert caller_ctx.canceller is None
|
||||||
|
|
||||||
assert ctxerr.canceller[0] == 'canceller'
|
assert ctxerr.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
@ -363,8 +438,9 @@ def test_peer_canceller(
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# SHOULD NEVER GET HERE!
|
# XXX SHOULD NEVER EVER GET HERE XXX
|
||||||
except BaseException:
|
except BaseException as berr:
|
||||||
|
err = berr
|
||||||
pytest.fail('did not rx ctx-cancelled error?')
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
else:
|
else:
|
||||||
pytest.fail('did not rx ctx-cancelled error?')
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
|
@ -375,6 +451,19 @@ def test_peer_canceller(
|
||||||
)as ctxerr:
|
)as ctxerr:
|
||||||
_err = ctxerr
|
_err = ctxerr
|
||||||
|
|
||||||
|
# NOTE: the main state to check on `Context` is:
|
||||||
|
# - `.cancelled_caught` (maps to nursery cs)
|
||||||
|
# - `.cancel_called` (bool of whether this side
|
||||||
|
# requested)
|
||||||
|
# - `.canceller` (uid of cancel-causing actor-task)
|
||||||
|
# - `._remote_error` (any `RemoteActorError`
|
||||||
|
# instance from other side of context)
|
||||||
|
# - `._cancel_msg` (any msg that caused the
|
||||||
|
# cancel)
|
||||||
|
|
||||||
|
# CASE: error raised during handling of
|
||||||
|
# `ContextCancelled` inside `.open_context()`
|
||||||
|
# block
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
assert isinstance(ctxerr, RuntimeError)
|
assert isinstance(ctxerr, RuntimeError)
|
||||||
|
|
||||||
|
@ -384,20 +473,30 @@ def test_peer_canceller(
|
||||||
for ctx in ctxs:
|
for ctx in ctxs:
|
||||||
assert ctx.cancel_called
|
assert ctx.cancel_called
|
||||||
|
|
||||||
|
# this root actor task should have
|
||||||
|
# cancelled all opened contexts except the
|
||||||
|
# sleeper which is obvi by the "canceller"
|
||||||
|
# peer.
|
||||||
|
re = ctx._remote_error
|
||||||
|
if (
|
||||||
|
ctx is sleeper_ctx
|
||||||
|
or ctx is caller_ctx
|
||||||
|
):
|
||||||
|
assert re.canceller == canceller.channel.uid
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert re.canceller == root.uid
|
||||||
|
|
||||||
# each context should have received
|
# each context should have received
|
||||||
# a silently absorbed context cancellation
|
# a silently absorbed context cancellation
|
||||||
# from its peer actor's task.
|
# from its peer actor's task.
|
||||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
# assert ctx.chan.uid == ctx.canceller
|
||||||
|
|
||||||
# this root actor task should have
|
|
||||||
# cancelled all opened contexts except
|
|
||||||
# the sleeper which is cancelled by its
|
|
||||||
# peer "canceller"
|
|
||||||
if ctx is not sleeper_ctx:
|
|
||||||
assert ctx._remote_error.canceller[0] == 'root'
|
|
||||||
|
|
||||||
|
# CASE: standard teardown inside in `.open_context()` block
|
||||||
else:
|
else:
|
||||||
assert ctxerr.canceller[0] == 'canceller'
|
assert ctxerr.canceller == sleeper_ctx.canceller
|
||||||
|
# assert ctxerr.canceller[0] == 'canceller'
|
||||||
|
# assert sleeper_ctx.canceller[0] == 'canceller'
|
||||||
|
|
||||||
# the sleeper's remote error is the error bubbled
|
# the sleeper's remote error is the error bubbled
|
||||||
# out of the context-stack above!
|
# out of the context-stack above!
|
||||||
|
@ -405,18 +504,35 @@ def test_peer_canceller(
|
||||||
assert re is ctxerr
|
assert re is ctxerr
|
||||||
|
|
||||||
for ctx in ctxs:
|
for ctx in ctxs:
|
||||||
|
re: BaseException | None = ctx._remote_error
|
||||||
|
assert re
|
||||||
|
|
||||||
|
# root doesn't cancel sleeper since it's
|
||||||
|
# cancelled by its peer.
|
||||||
|
# match ctx:
|
||||||
|
# case sleeper_ctx:
|
||||||
if ctx is sleeper_ctx:
|
if ctx is sleeper_ctx:
|
||||||
assert not ctx.cancel_called
|
assert not ctx.cancel_called
|
||||||
|
# wait WHY?
|
||||||
assert ctx.cancelled_caught
|
assert ctx.cancelled_caught
|
||||||
|
|
||||||
|
elif ctx is caller_ctx:
|
||||||
|
# since its context was remotely
|
||||||
|
# cancelled, we never needed to
|
||||||
|
# call `Context.cancel()` bc our
|
||||||
|
# context was already remotely
|
||||||
|
# cancelled by the time we'd do it.
|
||||||
|
assert ctx.cancel_called
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert ctx.cancel_called
|
assert ctx.cancel_called
|
||||||
assert not ctx.cancelled_caught
|
assert not ctx.cancelled_caught
|
||||||
|
|
||||||
# each context should have received
|
# TODO: do we even need this flag?
|
||||||
|
# -> each context should have received
|
||||||
# a silently absorbed context cancellation
|
# a silently absorbed context cancellation
|
||||||
# from its peer actor's task.
|
# in its remote nursery scope.
|
||||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
# assert ctx.chan.uid == ctx.canceller
|
||||||
|
|
||||||
# NOTE: when an inter-peer cancellation
|
# NOTE: when an inter-peer cancellation
|
||||||
# occurred, we DO NOT expect this
|
# occurred, we DO NOT expect this
|
||||||
|
@ -434,7 +550,6 @@ def test_peer_canceller(
|
||||||
# including the case where ctx-cancel handling
|
# including the case where ctx-cancel handling
|
||||||
# itself errors.
|
# itself errors.
|
||||||
assert sleeper_ctx.cancelled_caught
|
assert sleeper_ctx.cancelled_caught
|
||||||
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
|
|
||||||
|
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
raise # always to ensure teardown
|
raise # always to ensure teardown
|
||||||
|
|
Loading…
Reference in New Issue