forked from goodboy/tractor
Tons of interpeer test cleanup
Drop all the nested `@acm` blocks and defunct comments from initial validations. Add some todos for cases that are still unclear such as whether the caller / streamer should have `.cancelled_caught == True` in it's teardown.multihomed
parent
ef0cfc4b20
commit
d651f3d8e9
|
@ -194,18 +194,9 @@ async def stream_from_peer(
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
tractor.wait_for_actor(peer_name) as peer,
|
tractor.wait_for_actor(peer_name) as peer,
|
||||||
# peer.open_context(stream_ints) as (peer_ctx, first),
|
|
||||||
# peer_ctx.open_stream() as stream,
|
|
||||||
):
|
|
||||||
async with (
|
|
||||||
peer.open_context(stream_ints) as (peer_ctx, first),
|
peer.open_context(stream_ints) as (peer_ctx, first),
|
||||||
# peer_ctx.open_stream() as stream,
|
|
||||||
):
|
|
||||||
# # try:
|
|
||||||
async with (
|
|
||||||
peer_ctx.open_stream() as stream,
|
peer_ctx.open_stream() as stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
# XXX QUESTIONS & TODO: for further details around this
|
# XXX QUESTIONS & TODO: for further details around this
|
||||||
# in the longer run..
|
# in the longer run..
|
||||||
|
@ -222,7 +213,6 @@ async def stream_from_peer(
|
||||||
# - what about IPC-transport specific errors, should
|
# - what about IPC-transport specific errors, should
|
||||||
# they bubble from the async for and trigger
|
# they bubble from the async for and trigger
|
||||||
# other special cases?
|
# other special cases?
|
||||||
# try:
|
|
||||||
# NOTE: current ctl flow:
|
# NOTE: current ctl flow:
|
||||||
# - stream raises `trio.EndOfChannel` and
|
# - stream raises `trio.EndOfChannel` and
|
||||||
# exits the loop
|
# exits the loop
|
||||||
|
@ -231,22 +221,6 @@ async def stream_from_peer(
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
assert msg is not None
|
assert msg is not None
|
||||||
print(msg)
|
print(msg)
|
||||||
# finally:
|
|
||||||
# await trio.sleep(0.1)
|
|
||||||
# from tractor import pause
|
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# except BaseException as berr:
|
|
||||||
# with trio.CancelScope(shield=True):
|
|
||||||
# await tractor.pause()
|
|
||||||
# raise
|
|
||||||
|
|
||||||
# except trio.Cancelled:
|
|
||||||
# with trio.CancelScope(shield=True):
|
|
||||||
# await tractor.pause()
|
|
||||||
# raise # XXX NEVER MASK IT
|
|
||||||
# from tractor import pause
|
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# NOTE: cancellation of the (sleeper) peer should always
|
# NOTE: cancellation of the (sleeper) peer should always
|
||||||
# cause a `ContextCancelled` raise in this streaming
|
# cause a `ContextCancelled` raise in this streaming
|
||||||
|
@ -265,11 +239,10 @@ async def stream_from_peer(
|
||||||
|
|
||||||
# we never requested cancellation
|
# we never requested cancellation
|
||||||
assert not peer_ctx.cancel_called
|
assert not peer_ctx.cancel_called
|
||||||
# the `.open_context()` exit definitely
|
# the `.open_context()` exit definitely caught
|
||||||
# caught a cancellation in the internal `Context._scope`
|
# a cancellation in the internal `Context._scope` since
|
||||||
# since likely the runtime called `_deliver_msg()`
|
# likely the runtime called `_deliver_msg()` after
|
||||||
# after receiving the remote error from the streaming
|
# receiving the remote error from the streaming task.
|
||||||
# task.
|
|
||||||
assert peer_ctx.cancelled_caught
|
assert peer_ctx.cancelled_caught
|
||||||
|
|
||||||
# TODO / NOTE `.canceller` won't have been set yet
|
# TODO / NOTE `.canceller` won't have been set yet
|
||||||
|
@ -284,8 +257,9 @@ async def stream_from_peer(
|
||||||
assert not ctx.canceller
|
assert not ctx.canceller
|
||||||
assert 'canceller' in peer_ctx.canceller
|
assert 'canceller' in peer_ctx.canceller
|
||||||
|
|
||||||
|
raise
|
||||||
# TODO: IN THEORY we could have other cases depending on
|
# TODO: IN THEORY we could have other cases depending on
|
||||||
# who cancels first, the root actor or the canceller peer.
|
# who cancels first, the root actor or the canceller peer?.
|
||||||
#
|
#
|
||||||
# 1- when the peer request is first then the `.canceller`
|
# 1- when the peer request is first then the `.canceller`
|
||||||
# field should obvi be set to the 'canceller' uid,
|
# field should obvi be set to the 'canceller' uid,
|
||||||
|
@ -294,12 +268,12 @@ async def stream_from_peer(
|
||||||
# `trio.Cancelled` implicitly raised
|
# `trio.Cancelled` implicitly raised
|
||||||
# assert ctx.canceller[0] == 'root'
|
# assert ctx.canceller[0] == 'root'
|
||||||
# assert peer_ctx.canceller[0] == 'sleeper'
|
# assert peer_ctx.canceller[0] == 'sleeper'
|
||||||
raise
|
|
||||||
|
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
'peer never triggered local `ContextCancelled`?'
|
'peer never triggered local `ContextCancelled`?'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'error_during_ctxerr_handling',
|
'error_during_ctxerr_handling',
|
||||||
[False, True],
|
[False, True],
|
||||||
|
@ -361,6 +335,7 @@ def test_peer_canceller(
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||||
# debug_mode=True
|
# debug_mode=True
|
||||||
) as an:
|
) as an:
|
||||||
canceller: Portal = await an.start_actor(
|
canceller: Portal = await an.start_actor(
|
||||||
|
@ -402,7 +377,6 @@ def test_peer_canceller(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print('PRE CONTEXT RESULT')
|
print('PRE CONTEXT RESULT')
|
||||||
# await tractor.pause()
|
|
||||||
await sleeper_ctx.result()
|
await sleeper_ctx.result()
|
||||||
|
|
||||||
# should never get here
|
# should never get here
|
||||||
|
@ -410,9 +384,8 @@ def test_peer_canceller(
|
||||||
'Context.result() did not raise ctx-cancelled?'
|
'Context.result() did not raise ctx-cancelled?'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: not sure why this isn't catching
|
# should always raise since this root task does
|
||||||
# but maybe we need an `ExceptionGroup` and
|
# not request the sleeper cancellation ;)
|
||||||
# the whole except *errs: thinger in 3.11?
|
|
||||||
except ContextCancelled as ctxerr:
|
except ContextCancelled as ctxerr:
|
||||||
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||||
|
|
||||||
|
@ -430,9 +403,6 @@ def test_peer_canceller(
|
||||||
# block it should be.
|
# block it should be.
|
||||||
assert not sleeper_ctx.cancelled_caught
|
assert not sleeper_ctx.cancelled_caught
|
||||||
|
|
||||||
# TODO: a test which ensures this error is
|
|
||||||
# bubbled and caught (NOT MASKED) by the
|
|
||||||
# runtime!!!
|
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
raise RuntimeError('Simulated error during teardown')
|
raise RuntimeError('Simulated error during teardown')
|
||||||
|
|
||||||
|
@ -458,6 +428,7 @@ def test_peer_canceller(
|
||||||
# - `.canceller` (uid of cancel-causing actor-task)
|
# - `.canceller` (uid of cancel-causing actor-task)
|
||||||
# - `._remote_error` (any `RemoteActorError`
|
# - `._remote_error` (any `RemoteActorError`
|
||||||
# instance from other side of context)
|
# instance from other side of context)
|
||||||
|
# TODO: are we really planning to use this tho?
|
||||||
# - `._cancel_msg` (any msg that caused the
|
# - `._cancel_msg` (any msg that caused the
|
||||||
# cancel)
|
# cancel)
|
||||||
|
|
||||||
|
@ -482,21 +453,33 @@ def test_peer_canceller(
|
||||||
ctx is sleeper_ctx
|
ctx is sleeper_ctx
|
||||||
or ctx is caller_ctx
|
or ctx is caller_ctx
|
||||||
):
|
):
|
||||||
assert re.canceller == canceller.channel.uid
|
assert (
|
||||||
|
re.canceller
|
||||||
|
==
|
||||||
|
ctx.canceller
|
||||||
|
==
|
||||||
|
canceller.channel.uid
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert re.canceller == root.uid
|
assert (
|
||||||
|
re.canceller
|
||||||
# each context should have received
|
==
|
||||||
# a silently absorbed context cancellation
|
ctx.canceller
|
||||||
# from its peer actor's task.
|
==
|
||||||
# assert ctx.chan.uid == ctx.canceller
|
root.uid
|
||||||
|
)
|
||||||
|
|
||||||
# CASE: standard teardown inside in `.open_context()` block
|
# CASE: standard teardown inside in `.open_context()` block
|
||||||
else:
|
else:
|
||||||
assert ctxerr.canceller == sleeper_ctx.canceller
|
assert ctxerr.canceller == sleeper_ctx.canceller
|
||||||
# assert ctxerr.canceller[0] == 'canceller'
|
assert (
|
||||||
# assert sleeper_ctx.canceller[0] == 'canceller'
|
ctxerr.canceller[0]
|
||||||
|
==
|
||||||
|
sleeper_ctx.canceller[0]
|
||||||
|
==
|
||||||
|
'canceller'
|
||||||
|
)
|
||||||
|
|
||||||
# the sleeper's remote error is the error bubbled
|
# the sleeper's remote error is the error bubbled
|
||||||
# out of the context-stack above!
|
# out of the context-stack above!
|
||||||
|
@ -509,21 +492,29 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# root doesn't cancel sleeper since it's
|
# root doesn't cancel sleeper since it's
|
||||||
# cancelled by its peer.
|
# cancelled by its peer.
|
||||||
# match ctx:
|
|
||||||
# case sleeper_ctx:
|
|
||||||
if ctx is sleeper_ctx:
|
if ctx is sleeper_ctx:
|
||||||
assert not ctx.cancel_called
|
assert not ctx.cancel_called
|
||||||
# wait WHY?
|
# since sleeper_ctx.result() IS called
|
||||||
|
# above we should have (silently)
|
||||||
|
# absorbed the corresponding
|
||||||
|
# `ContextCancelled` for it and thus
|
||||||
|
# the logic inside `.cancelled_caught`
|
||||||
|
# should trigger!
|
||||||
assert ctx.cancelled_caught
|
assert ctx.cancelled_caught
|
||||||
|
|
||||||
elif ctx is caller_ctx:
|
elif ctx is caller_ctx:
|
||||||
# since its context was remotely
|
# since its context was remotely
|
||||||
# cancelled, we never needed to
|
# cancelled, we never needed to
|
||||||
# call `Context.cancel()` bc our
|
# call `Context.cancel()` bc it was
|
||||||
# context was already remotely
|
# done by the peer and also we never
|
||||||
# cancelled by the time we'd do it.
|
|
||||||
assert ctx.cancel_called
|
assert ctx.cancel_called
|
||||||
|
|
||||||
|
# TODO: figure out the details of
|
||||||
|
# this..
|
||||||
|
# if you look the `._local_error` here
|
||||||
|
# is a multi of ctxc + 2 Cancelleds?
|
||||||
|
# assert not ctx.cancelled_caught
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert ctx.cancel_called
|
assert ctx.cancel_called
|
||||||
assert not ctx.cancelled_caught
|
assert not ctx.cancelled_caught
|
||||||
|
@ -551,7 +542,6 @@ def test_peer_canceller(
|
||||||
# itself errors.
|
# itself errors.
|
||||||
assert sleeper_ctx.cancelled_caught
|
assert sleeper_ctx.cancelled_caught
|
||||||
|
|
||||||
# await tractor.pause()
|
|
||||||
raise # always to ensure teardown
|
raise # always to ensure teardown
|
||||||
|
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
|
|
Loading…
Reference in New Issue