Compare commits
10 Commits
131674eabd
...
3f15923537
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 3f15923537 | |
Tyler Goodlet | 87cd725adb | |
Tyler Goodlet | 48accbd28f | |
Tyler Goodlet | 227c9ea173 | |
Tyler Goodlet | d651f3d8e9 | |
Tyler Goodlet | ef0cfc4b20 | |
Tyler Goodlet | ecb525a2bc | |
Tyler Goodlet | b77d123edd | |
Tyler Goodlet | f4e63465de | |
Tyler Goodlet | df31047ecb |
|
@ -13,6 +13,11 @@ from typing import Optional
|
|||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import (
|
||||
Actor,
|
||||
Context,
|
||||
current_actor,
|
||||
)
|
||||
from tractor._exceptions import (
|
||||
StreamOverrun,
|
||||
ContextCancelled,
|
||||
|
@ -193,9 +198,6 @@ def test_simple_context(
|
|||
else:
|
||||
assert await ctx.result() == 'yo'
|
||||
|
||||
if not error_parent:
|
||||
await ctx.cancel()
|
||||
|
||||
if pointlessly_open_stream:
|
||||
async with ctx.open_stream():
|
||||
if error_parent:
|
||||
|
@ -208,10 +210,15 @@ def test_simple_context(
|
|||
# 'stop' msg to the far end which needs
|
||||
# to be ignored
|
||||
pass
|
||||
|
||||
else:
|
||||
if error_parent:
|
||||
raise error_parent
|
||||
|
||||
# cancel AFTER we open a stream
|
||||
# to avoid a cancel raised inside
|
||||
# `.open_stream()`
|
||||
await ctx.cancel()
|
||||
finally:
|
||||
|
||||
# after cancellation
|
||||
|
@ -276,7 +283,7 @@ def test_caller_cancels(
|
|||
assert (
|
||||
tuple(err.canceller)
|
||||
==
|
||||
tractor.current_actor().uid
|
||||
current_actor().uid
|
||||
)
|
||||
|
||||
async def main():
|
||||
|
@ -430,9 +437,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
):
|
||||
'caller context closes without using stream'
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
async with tractor.open_nursery() as an:
|
||||
|
||||
portal = await n.start_actor(
|
||||
root: Actor = current_actor()
|
||||
|
||||
portal = await an.start_actor(
|
||||
'ctx_cancelled',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -440,10 +449,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
async with portal.open_context(
|
||||
expect_cancelled,
|
||||
) as (ctx, sent):
|
||||
await portal.run(assert_state, value=True)
|
||||
|
||||
assert sent is None
|
||||
|
||||
await portal.run(assert_state, value=True)
|
||||
|
||||
# call cancel explicitly
|
||||
if use_ctx_cancel_method:
|
||||
|
||||
|
@ -454,8 +463,21 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
async for msg in stream:
|
||||
pass
|
||||
|
||||
except tractor.ContextCancelled:
|
||||
raise # XXX: must be propagated to __aexit__
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
# XXX: the cause is US since we call
|
||||
# `Context.cancel()` just above!
|
||||
assert (
|
||||
ctxc.canceller
|
||||
==
|
||||
current_actor().uid
|
||||
==
|
||||
root.uid
|
||||
)
|
||||
|
||||
# XXX: must be propagated to __aexit__
|
||||
# and should be silently absorbed there
|
||||
# since we called `.cancel()` just above ;)
|
||||
raise
|
||||
|
||||
else:
|
||||
assert 0, "Should have context cancelled?"
|
||||
|
@ -472,7 +494,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
await ctx.result()
|
||||
assert 0, "Callee should have blocked!?"
|
||||
except trio.TooSlowError:
|
||||
# NO-OP -> since already called above
|
||||
await ctx.cancel()
|
||||
|
||||
# local scope should have absorbed the cancellation
|
||||
assert ctx.cancelled_caught
|
||||
assert ctx._remote_error is ctx._local_error
|
||||
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
async for msg in stream:
|
||||
|
@ -551,19 +579,25 @@ async def cancel_self(
|
|||
global _state
|
||||
_state = True
|
||||
|
||||
# since we call this the below `.open_stream()` should always
|
||||
# error!
|
||||
await ctx.cancel()
|
||||
|
||||
# should inline raise immediately
|
||||
try:
|
||||
async with ctx.open_stream():
|
||||
pass
|
||||
except tractor.ContextCancelled:
|
||||
# except tractor.ContextCancelled:
|
||||
except RuntimeError:
|
||||
# suppress for now so we can do checkpoint tests below
|
||||
pass
|
||||
print('Got expected runtime error for stream-after-cancel')
|
||||
|
||||
else:
|
||||
raise RuntimeError('Context didnt cancel itself?!')
|
||||
|
||||
# check a real ``trio.Cancelled`` is raised on a checkpoint
|
||||
# check that``trio.Cancelled`` is now raised on any further
|
||||
# checkpoints since the self cancel above will have cancelled
|
||||
# the `Context._scope.cancel_scope: trio.CancelScope`
|
||||
try:
|
||||
with trio.fail_after(0.1):
|
||||
await trio.sleep_forever()
|
||||
|
@ -574,6 +608,7 @@ async def cancel_self(
|
|||
# should never get here
|
||||
assert 0
|
||||
|
||||
raise RuntimeError('Context didnt cancel itself?!')
|
||||
|
||||
@tractor_test
|
||||
async def test_callee_cancels_before_started():
|
||||
|
@ -601,7 +636,7 @@ async def test_callee_cancels_before_started():
|
|||
ce.type == trio.Cancelled
|
||||
|
||||
# the traceback should be informative
|
||||
assert 'cancelled itself' in ce.msgdata['tb_str']
|
||||
assert 'itself' in ce.msgdata['tb_str']
|
||||
|
||||
# teardown the actor
|
||||
await portal.cancel_actor()
|
||||
|
@ -773,7 +808,7 @@ async def echo_back_sequence(
|
|||
|
||||
print(
|
||||
'EXITING CALLEEE:\n'
|
||||
f'{ctx.cancel_called_remote}'
|
||||
f'{ctx.canceller}'
|
||||
)
|
||||
return 'yo'
|
||||
|
||||
|
@ -871,7 +906,7 @@ def test_maybe_allow_overruns_stream(
|
|||
|
||||
if cancel_ctx:
|
||||
assert isinstance(res, ContextCancelled)
|
||||
assert tuple(res.canceller) == tractor.current_actor().uid
|
||||
assert tuple(res.canceller) == current_actor().uid
|
||||
|
||||
else:
|
||||
print(f'RX ROOT SIDE RESULT {res}')
|
||||
|
|
|
@ -225,7 +225,7 @@ def test_context_spawns_aio_task_that_errors(
|
|||
|
||||
await trio.sleep_forever()
|
||||
|
||||
return await ctx.result()
|
||||
return await ctx.result()
|
||||
|
||||
if parent_cancels:
|
||||
# bc the parent made the cancel request,
|
||||
|
|
|
@ -15,6 +15,26 @@ from tractor import ( # typing
|
|||
ContextCancelled,
|
||||
)
|
||||
|
||||
# XXX TODO cases:
|
||||
# - [ ] peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# - [x] WE cancelled the peer and thus should not see any raised
|
||||
# `ContextCancelled` as it should be reaped silently?
|
||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||
# already covers this case?
|
||||
|
||||
# - [x] INTER-PEER: some arbitrary remote peer cancels via
|
||||
# Portal.cancel_actor().
|
||||
# => all other connected peers should get that cancel requesting peer's
|
||||
# uid in the ctx-cancelled error msg raised in all open ctxs
|
||||
# with that peer.
|
||||
|
||||
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
|
||||
# (also) spawned a failing task which was unhandled and
|
||||
# propagated up to the immediate parent - the peer to the actor
|
||||
# that also spawned a remote task task in that same peer-parent.
|
||||
|
||||
|
||||
# def test_self_cancel():
|
||||
# '''
|
||||
|
@ -29,14 +49,30 @@ from tractor import ( # typing
|
|||
@tractor.context
|
||||
async def sleep_forever(
|
||||
ctx: Context,
|
||||
expect_ctxc: bool = False,
|
||||
) -> None:
|
||||
'''
|
||||
Sync the context, open a stream then just sleep.
|
||||
|
||||
Allow checking for (context) cancellation locally.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep_forever()
|
||||
try:
|
||||
await ctx.started()
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep_forever()
|
||||
|
||||
except BaseException as berr:
|
||||
|
||||
# TODO: it'd sure be nice to be able to inject our own
|
||||
# `ContextCancelled` here instead of of `trio.Cancelled`
|
||||
# so that our runtime can expect it and this "user code"
|
||||
# would be able to tell the diff between a generic trio
|
||||
# cancel and a tractor runtime-IPC cancel.
|
||||
if expect_ctxc:
|
||||
assert isinstance(berr, trio.Cancelled)
|
||||
|
||||
raise
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -145,6 +181,7 @@ async def stream_ints(
|
|||
async with ctx.open_stream() as stream:
|
||||
for i in itertools.count():
|
||||
await stream.send(i)
|
||||
await trio.sleep(0.01)
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -161,73 +198,81 @@ async def stream_from_peer(
|
|||
peer_ctx.open_stream() as stream,
|
||||
):
|
||||
await ctx.started()
|
||||
# XXX TODO: big set of questions for this
|
||||
# XXX QUESTIONS & TODO: for further details around this
|
||||
# in the longer run..
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
# - should we raise `ContextCancelled` or `Cancelled` (rn
|
||||
# it does that) here?!
|
||||
# - test the `ContextCancelled` OUTSIDE the
|
||||
# `.open_context()` call?
|
||||
try:
|
||||
async for msg in stream:
|
||||
print(msg)
|
||||
|
||||
except trio.Cancelled:
|
||||
assert not ctx.cancel_called
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
assert not peer_ctx.cancel_called
|
||||
assert not peer_ctx.cancelled_caught
|
||||
|
||||
assert 'root' in ctx.cancel_called_remote
|
||||
|
||||
raise # XXX MUST NEVER MASK IT!!
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await tractor.pause()
|
||||
# pass
|
||||
# pytest.fail(
|
||||
raise RuntimeError(
|
||||
'peer never triggered local `[Context]Cancelled`?!?'
|
||||
)
|
||||
# it does latter) and should/could it be implemented
|
||||
# as a general injection override for `trio` such
|
||||
# that ANY next checkpoint would raise the "cancel
|
||||
# error type" of choice?
|
||||
# - should the `ContextCancelled` bubble from
|
||||
# all `Context` and `MsgStream` apis wherein it
|
||||
# prolly makes the most sense to make it
|
||||
# a `trio.Cancelled` subtype?
|
||||
# - what about IPC-transport specific errors, should
|
||||
# they bubble from the async for and trigger
|
||||
# other special cases?
|
||||
# NOTE: current ctl flow:
|
||||
# - stream raises `trio.EndOfChannel` and
|
||||
# exits the loop
|
||||
# - `.open_context()` will raise the ctxcanc
|
||||
# received from the sleeper.
|
||||
async for msg in stream:
|
||||
assert msg is not None
|
||||
print(msg)
|
||||
|
||||
# NOTE: cancellation of the (sleeper) peer should always
|
||||
# cause a `ContextCancelled` raise in this streaming
|
||||
# actor.
|
||||
except ContextCancelled as ctxerr:
|
||||
assert ctxerr.canceller == 'canceller'
|
||||
assert ctxerr._remote_error is ctxerr
|
||||
err = ctxerr
|
||||
assert peer_ctx._remote_error is ctxerr
|
||||
assert peer_ctx.canceller == ctxerr.canceller
|
||||
|
||||
# caller peer should not be the cancel requester
|
||||
assert not ctx.cancel_called
|
||||
# XXX can never be true since `._invoke` only
|
||||
# sets this AFTER the nursery block this task
|
||||
# was started in, exits.
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
# we never requested cancellation
|
||||
assert not peer_ctx.cancel_called
|
||||
# the `.open_context()` exit definitely caught
|
||||
# a cancellation in the internal `Context._scope` since
|
||||
# likely the runtime called `_deliver_msg()` after
|
||||
# receiving the remote error from the streaming task.
|
||||
assert peer_ctx.cancelled_caught
|
||||
|
||||
# TODO / NOTE `.canceller` won't have been set yet
|
||||
# here because that machinery is inside
|
||||
# `.open_context().__aexit__()` BUT, if we had
|
||||
# a way to know immediately (from the last
|
||||
# checkpoint) that cancellation was due to
|
||||
# a remote, we COULD assert this here..see,
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
|
||||
# root/parent actor task should NEVER HAVE cancelled us!
|
||||
assert not ctx.canceller
|
||||
assert 'canceller' in peer_ctx.canceller
|
||||
|
||||
# CASE 1: we were cancelled by our parent, the root actor.
|
||||
# TODO: there are other cases depending on how the root
|
||||
# actor and it's caller side task are written:
|
||||
# - if the root does not req us to cancel then an
|
||||
# IPC-transport related error should bubble from the async
|
||||
# for loop and thus cause local cancellation both here
|
||||
# and in the root (since in that case this task cancels the
|
||||
# context with the root, not the other way around)
|
||||
assert ctx.cancel_called_remote[0] == 'root'
|
||||
raise
|
||||
# TODO: IN THEORY we could have other cases depending on
|
||||
# who cancels first, the root actor or the canceller peer?.
|
||||
#
|
||||
# 1- when the peer request is first then the `.canceller`
|
||||
# field should obvi be set to the 'canceller' uid,
|
||||
#
|
||||
# 2-if the root DOES req cancel then we should see the same
|
||||
# `trio.Cancelled` implicitly raised
|
||||
# assert ctx.canceller[0] == 'root'
|
||||
# assert peer_ctx.canceller[0] == 'sleeper'
|
||||
|
||||
# except BaseException as err:
|
||||
raise RuntimeError(
|
||||
'peer never triggered local `ContextCancelled`?'
|
||||
)
|
||||
|
||||
# raise
|
||||
|
||||
# cases:
|
||||
# - some arbitrary remote peer cancels via Portal.cancel_actor().
|
||||
# => all other connected peers should get that cancel requesting peer's
|
||||
# uid in the ctx-cancelled error msg.
|
||||
|
||||
# - peer spawned a sub-actor which (also) spawned a failing task
|
||||
# which was unhandled and propagated up to the immediate
|
||||
# parent, the peer to the actor that also spawned a remote task
|
||||
# task in that same peer-parent.
|
||||
|
||||
# - peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# - WE cancelled the peer and thus should not see any raised
|
||||
# `ContextCancelled` as it should be reaped silently?
|
||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||
# already covers this case?
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'error_during_ctxerr_handling',
|
||||
|
@ -251,8 +296,8 @@ def test_peer_canceller(
|
|||
line and be less indented.
|
||||
|
||||
.actor0> ()-> .actor1>
|
||||
a inter-actor task context opened (by `async with `Portal.open_context()`)
|
||||
from actor0 *into* actor1.
|
||||
a inter-actor task context opened (by `async with
|
||||
`Portal.open_context()`) from actor0 *into* actor1.
|
||||
|
||||
.actor0> ()<=> .actor1>
|
||||
a inter-actor task context opened (as above)
|
||||
|
@ -287,11 +332,12 @@ def test_peer_canceller(
|
|||
5. .canceller> ()-> .sleeper>
|
||||
- calls `Portal.cancel_actor()`
|
||||
|
||||
|
||||
'''
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as an:
|
||||
async with tractor.open_nursery(
|
||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
||||
# debug_mode=True
|
||||
) as an:
|
||||
canceller: Portal = await an.start_actor(
|
||||
'canceller',
|
||||
enable_modules=[__name__],
|
||||
|
@ -305,10 +351,13 @@ def test_peer_canceller(
|
|||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
root = tractor.current_actor()
|
||||
|
||||
try:
|
||||
async with (
|
||||
sleeper.open_context(
|
||||
sleep_forever,
|
||||
expect_ctxc=True,
|
||||
) as (sleeper_ctx, sent),
|
||||
|
||||
just_caller.open_context(
|
||||
|
@ -335,16 +384,15 @@ def test_peer_canceller(
|
|||
'Context.result() did not raise ctx-cancelled?'
|
||||
)
|
||||
|
||||
# TODO: not sure why this isn't catching
|
||||
# but maybe we need an `ExceptionGroup` and
|
||||
# the whole except *errs: thinger in 3.11?
|
||||
# should always raise since this root task does
|
||||
# not request the sleeper cancellation ;)
|
||||
except ContextCancelled as ctxerr:
|
||||
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||
|
||||
# canceller and caller peers should not
|
||||
# have been remotely cancelled.
|
||||
assert canceller_ctx.cancel_called_remote is None
|
||||
assert caller_ctx.cancel_called_remote is None
|
||||
assert canceller_ctx.canceller is None
|
||||
assert caller_ctx.canceller is None
|
||||
|
||||
assert ctxerr.canceller[0] == 'canceller'
|
||||
|
||||
|
@ -355,16 +403,14 @@ def test_peer_canceller(
|
|||
# block it should be.
|
||||
assert not sleeper_ctx.cancelled_caught
|
||||
|
||||
# TODO: a test which ensures this error is
|
||||
# bubbled and caught (NOT MASKED) by the
|
||||
# runtime!!!
|
||||
if error_during_ctxerr_handling:
|
||||
raise RuntimeError('Simulated error during teardown')
|
||||
|
||||
raise
|
||||
|
||||
# SHOULD NEVER GET HERE!
|
||||
except BaseException:
|
||||
# XXX SHOULD NEVER EVER GET HERE XXX
|
||||
except BaseException as berr:
|
||||
err = berr
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
else:
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
|
@ -375,6 +421,20 @@ def test_peer_canceller(
|
|||
)as ctxerr:
|
||||
_err = ctxerr
|
||||
|
||||
# NOTE: the main state to check on `Context` is:
|
||||
# - `.cancelled_caught` (maps to nursery cs)
|
||||
# - `.cancel_called` (bool of whether this side
|
||||
# requested)
|
||||
# - `.canceller` (uid of cancel-causing actor-task)
|
||||
# - `._remote_error` (any `RemoteActorError`
|
||||
# instance from other side of context)
|
||||
# TODO: are we really planning to use this tho?
|
||||
# - `._cancel_msg` (any msg that caused the
|
||||
# cancel)
|
||||
|
||||
# CASE: error raised during handling of
|
||||
# `ContextCancelled` inside `.open_context()`
|
||||
# block
|
||||
if error_during_ctxerr_handling:
|
||||
assert isinstance(ctxerr, RuntimeError)
|
||||
|
||||
|
@ -384,20 +444,42 @@ def test_peer_canceller(
|
|||
for ctx in ctxs:
|
||||
assert ctx.cancel_called
|
||||
|
||||
# each context should have received
|
||||
# a silently absorbed context cancellation
|
||||
# from its peer actor's task.
|
||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||
|
||||
# this root actor task should have
|
||||
# cancelled all opened contexts except
|
||||
# the sleeper which is cancelled by its
|
||||
# peer "canceller"
|
||||
if ctx is not sleeper_ctx:
|
||||
assert ctx._remote_error.canceller[0] == 'root'
|
||||
# cancelled all opened contexts except the
|
||||
# sleeper which is obvi by the "canceller"
|
||||
# peer.
|
||||
re = ctx._remote_error
|
||||
if (
|
||||
ctx is sleeper_ctx
|
||||
or ctx is caller_ctx
|
||||
):
|
||||
assert (
|
||||
re.canceller
|
||||
==
|
||||
ctx.canceller
|
||||
==
|
||||
canceller.channel.uid
|
||||
)
|
||||
|
||||
else:
|
||||
assert (
|
||||
re.canceller
|
||||
==
|
||||
ctx.canceller
|
||||
==
|
||||
root.uid
|
||||
)
|
||||
|
||||
# CASE: standard teardown inside in `.open_context()` block
|
||||
else:
|
||||
assert ctxerr.canceller[0] == 'canceller'
|
||||
assert ctxerr.canceller == sleeper_ctx.canceller
|
||||
assert (
|
||||
ctxerr.canceller[0]
|
||||
==
|
||||
sleeper_ctx.canceller[0]
|
||||
==
|
||||
'canceller'
|
||||
)
|
||||
|
||||
# the sleeper's remote error is the error bubbled
|
||||
# out of the context-stack above!
|
||||
|
@ -405,18 +487,43 @@ def test_peer_canceller(
|
|||
assert re is ctxerr
|
||||
|
||||
for ctx in ctxs:
|
||||
re: BaseException | None = ctx._remote_error
|
||||
assert re
|
||||
|
||||
# root doesn't cancel sleeper since it's
|
||||
# cancelled by its peer.
|
||||
if ctx is sleeper_ctx:
|
||||
assert not ctx.cancel_called
|
||||
# since sleeper_ctx.result() IS called
|
||||
# above we should have (silently)
|
||||
# absorbed the corresponding
|
||||
# `ContextCancelled` for it and thus
|
||||
# the logic inside `.cancelled_caught`
|
||||
# should trigger!
|
||||
assert ctx.cancelled_caught
|
||||
|
||||
elif ctx is caller_ctx:
|
||||
# since its context was remotely
|
||||
# cancelled, we never needed to
|
||||
# call `Context.cancel()` bc it was
|
||||
# done by the peer and also we never
|
||||
assert ctx.cancel_called
|
||||
|
||||
# TODO: figure out the details of
|
||||
# this..
|
||||
# if you look the `._local_error` here
|
||||
# is a multi of ctxc + 2 Cancelleds?
|
||||
# assert not ctx.cancelled_caught
|
||||
|
||||
else:
|
||||
assert ctx.cancel_called
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
# each context should have received
|
||||
# TODO: do we even need this flag?
|
||||
# -> each context should have received
|
||||
# a silently absorbed context cancellation
|
||||
# from its peer actor's task.
|
||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||
# in its remote nursery scope.
|
||||
# assert ctx.chan.uid == ctx.canceller
|
||||
|
||||
# NOTE: when an inter-peer cancellation
|
||||
# occurred, we DO NOT expect this
|
||||
|
@ -434,9 +541,7 @@ def test_peer_canceller(
|
|||
# including the case where ctx-cancel handling
|
||||
# itself errors.
|
||||
assert sleeper_ctx.cancelled_caught
|
||||
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
|
||||
|
||||
# await tractor.pause()
|
||||
raise # always to ensure teardown
|
||||
|
||||
if error_during_ctxerr_handling:
|
||||
|
|
|
@ -211,10 +211,14 @@ async def find_actor(
|
|||
# 'Gathered portals:\n'
|
||||
# f'{portals}'
|
||||
# )
|
||||
if not portals:
|
||||
# NOTE: `gather_contexts()` will return a
|
||||
# `tuple[None, None, ..., None]` if no contact
|
||||
# can be made with any regstrar at any of the
|
||||
# N provided addrs!
|
||||
if not any(portals):
|
||||
if raise_on_none:
|
||||
raise RuntimeError(
|
||||
f'No {name} found registered @ {registry_addrs}'
|
||||
f'No actor "{name}" found registered @ {registry_addrs}'
|
||||
)
|
||||
yield None
|
||||
return
|
||||
|
|
|
@ -294,9 +294,11 @@ class Channel:
|
|||
self._agen = self._aiter_recv()
|
||||
self._exc: Optional[Exception] = None # set if far end actor errors
|
||||
self._closed: bool = False
|
||||
# flag set on ``Portal.cancel_actor()`` indicating
|
||||
# remote (peer) cancellation of the far end actor runtime.
|
||||
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
|
||||
|
||||
# flag set by ``Portal.cancel_actor()`` indicating remote
|
||||
# (possibly peer) cancellation of the far end actor
|
||||
# runtime.
|
||||
self._cancel_called: bool = False
|
||||
|
||||
@classmethod
|
||||
def from_stream(
|
||||
|
|
|
@ -100,7 +100,7 @@ def parse_maddr(
|
|||
multiaddr: str,
|
||||
) -> dict[str, str | int | dict]:
|
||||
'''
|
||||
Parse a libp2p style "multiaddress" into it's distinct protocol
|
||||
Parse a libp2p style "multiaddress" into its distinct protocol
|
||||
segments where each segment is of the form:
|
||||
|
||||
`../<protocol>/<param0>/<param1>/../<paramN>`
|
||||
|
|
|
@ -48,6 +48,7 @@ from ._exceptions import (
|
|||
unpack_error,
|
||||
NoResult,
|
||||
ContextCancelled,
|
||||
MessagingError,
|
||||
)
|
||||
from ._context import Context
|
||||
from ._streaming import MsgStream
|
||||
|
@ -71,11 +72,6 @@ def _unwrap_msg(
|
|||
raise unpack_error(msg, channel) from None
|
||||
|
||||
|
||||
# TODO: maybe move this to ._exceptions?
|
||||
class MessagingError(Exception):
|
||||
'Some kind of unexpected SC messaging dialog issue'
|
||||
|
||||
|
||||
class Portal:
|
||||
'''
|
||||
A 'portal' to a memory-domain-separated `Actor`.
|
||||
|
@ -220,14 +216,18 @@ class Portal:
|
|||
|
||||
try:
|
||||
# send cancel cmd - might not get response
|
||||
# XXX: sure would be nice to make this work with a proper shield
|
||||
# XXX: sure would be nice to make this work with
|
||||
# a proper shield
|
||||
with trio.move_on_after(
|
||||
timeout
|
||||
or self.cancel_timeout
|
||||
) as cs:
|
||||
cs.shield = True
|
||||
|
||||
await self.run_from_ns('self', 'cancel')
|
||||
await self.run_from_ns(
|
||||
'self',
|
||||
'cancel',
|
||||
)
|
||||
return True
|
||||
|
||||
if cs.cancelled_caught:
|
||||
|
@ -462,10 +462,14 @@ class Portal:
|
|||
try:
|
||||
# the "first" value here is delivered by the callee's
|
||||
# ``Context.started()`` call.
|
||||
first = msg['started']
|
||||
first: Any = msg['started']
|
||||
ctx._started_called: bool = True
|
||||
|
||||
except KeyError:
|
||||
|
||||
# TODO: can we maybe factor this into the new raiser
|
||||
# `_streaming._raise_from_no_yield_msg()` and make that
|
||||
# helper more generic, say with a `_no_<blah>_msg()`?
|
||||
if not (cid := msg.get('cid')):
|
||||
raise MessagingError(
|
||||
'Received internal error at context?\n'
|
||||
|
@ -517,54 +521,102 @@ class Portal:
|
|||
# started in the ctx nursery.
|
||||
ctx._scope.cancel()
|
||||
|
||||
# XXX: (maybe) shield/mask context-cancellations that were
|
||||
# initiated by any of the context's 2 tasks. There are
|
||||
# subsequently 2 operating cases for a "graceful cancel"
|
||||
# of a `Context`:
|
||||
#
|
||||
# 1.*this* side's task called `Context.cancel()`, in
|
||||
# which case we mask the `ContextCancelled` from bubbling
|
||||
# to the opener (much like how `trio.Nursery` swallows
|
||||
# any `trio.Cancelled` bubbled by a call to
|
||||
# `Nursery.cancel_scope.cancel()`)
|
||||
# XXX NOTE XXX: maybe shield against
|
||||
# self-context-cancellation (which raises a local
|
||||
# `ContextCancelled`) when requested (via
|
||||
# `Context.cancel()`) by the same task (tree) which entered
|
||||
# THIS `.open_context()`.
|
||||
#
|
||||
# 2.*the other* side's (callee/spawned) task cancelled due
|
||||
# to a self or peer cancellation request in which case we
|
||||
# DO let the error bubble to the opener.
|
||||
# NOTE: There are 2 operating cases for a "graceful cancel"
|
||||
# of a `Context`. In both cases any `ContextCancelled`
|
||||
# raised in this scope-block came from a transport msg
|
||||
# relayed from some remote-actor-task which our runtime set
|
||||
# as a `Context._remote_error`
|
||||
#
|
||||
# the CASES:
|
||||
#
|
||||
# - if that context IS THE SAME ONE that called
|
||||
# `Context.cancel()`, we want to absorb the error
|
||||
# silently and let this `.open_context()` block to exit
|
||||
# without raising.
|
||||
#
|
||||
# - if it is from some OTHER context (we did NOT call
|
||||
# `.cancel()`), we want to re-RAISE IT whilst also
|
||||
# setting our own ctx's "reason for cancel" to be that
|
||||
# other context's cancellation condition; we set our
|
||||
# `.canceller: tuple[str, str]` to be same value as
|
||||
# caught here in a `ContextCancelled.canceller`.
|
||||
#
|
||||
# Again, there are 2 cases:
|
||||
#
|
||||
# 1-some other context opened in this `.open_context()`
|
||||
# block cancelled due to a self or peer cancellation
|
||||
# request in which case we DO let the error bubble to the
|
||||
# opener.
|
||||
#
|
||||
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
|
||||
# and received a `ContextCanclled` from the "callee"
|
||||
# task, in which case we mask the `ContextCancelled` from
|
||||
# bubbling to this "caller" (much like how `trio.Nursery`
|
||||
# swallows any `trio.Cancelled` bubbled by a call to
|
||||
# `Nursery.cancel_scope.cancel()`)
|
||||
except ContextCancelled as ctxc:
|
||||
scope_err = ctxc
|
||||
|
||||
# CASE 1: this context was never cancelled
|
||||
# via a local task's call to `Context.cancel()`.
|
||||
if not ctx._cancel_called:
|
||||
raise
|
||||
|
||||
# CASE 2: context was cancelled by local task calling
|
||||
# `.cancel()`, we don't raise and the exit block should
|
||||
# exit silently.
|
||||
else:
|
||||
if (
|
||||
ctx._cancel_called
|
||||
and (
|
||||
ctxc is ctx._remote_error
|
||||
or
|
||||
ctxc.canceller is self.canceller
|
||||
)
|
||||
):
|
||||
log.debug(
|
||||
f'Context {ctx} cancelled gracefully with:\n'
|
||||
f'{ctxc}'
|
||||
)
|
||||
# CASE 1: this context was never cancelled via a local
|
||||
# task (tree) having called `Context.cancel()`, raise
|
||||
# the error since it was caused by someone else!
|
||||
else:
|
||||
raise
|
||||
|
||||
# the above `._scope` can be cancelled due to:
|
||||
# 1. an explicit self cancel via `Context.cancel()` or
|
||||
# `Actor.cancel()`,
|
||||
# 2. any "callee"-side remote error, possibly also a cancellation
|
||||
# request by some peer,
|
||||
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
|
||||
except (
|
||||
# - a standard error in the caller/yieldee
|
||||
# CASE 3: standard local error in this caller/yieldee
|
||||
Exception,
|
||||
|
||||
# - a runtime teardown exception-group and/or
|
||||
# cancellation request from a caller task.
|
||||
BaseExceptionGroup,
|
||||
trio.Cancelled,
|
||||
# CASES 1 & 2: normally manifested as
|
||||
# a `Context._scope_nursery` raised
|
||||
# exception-group of,
|
||||
# 1.-`trio.Cancelled`s, since
|
||||
# `._scope.cancel()` will have been called and any
|
||||
# `ContextCancelled` absorbed and thus NOT RAISED in
|
||||
# any `Context._maybe_raise_remote_err()`,
|
||||
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
|
||||
# from any error raised in the "callee" side with
|
||||
# a group only raised if there was any more then one
|
||||
# task started here in the "caller" in the
|
||||
# `yield`-ed to task.
|
||||
BaseExceptionGroup, # since overrun handler tasks may have been spawned
|
||||
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
|
||||
KeyboardInterrupt,
|
||||
|
||||
) as err:
|
||||
scope_err = err
|
||||
|
||||
# XXX: request cancel of this context on any error.
|
||||
# NOTE: `Context.cancel()` is conversely NOT called in
|
||||
# the `ContextCancelled` "cancellation requested" case
|
||||
# above.
|
||||
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
|
||||
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
|
||||
# the `ContextCancelled` "self cancellation absorbed" case
|
||||
# handled in the block above!
|
||||
log.cancel(
|
||||
'Context cancelled for task due to\n'
|
||||
f'{err}\n'
|
||||
|
@ -583,7 +635,7 @@ class Portal:
|
|||
|
||||
raise # duh
|
||||
|
||||
# no scope error case
|
||||
# no local scope error, the "clean exit with a result" case.
|
||||
else:
|
||||
if ctx.chan.connected():
|
||||
log.info(
|
||||
|
@ -597,15 +649,27 @@ class Portal:
|
|||
# `Context._maybe_raise_remote_err()`) IFF
|
||||
# a `Context._remote_error` was set by the runtime
|
||||
# via a call to
|
||||
# `Context._maybe_cancel_and_set_remote_error()`
|
||||
# which IS SET any time the far end fails and
|
||||
# causes "caller side" cancellation via
|
||||
# a `ContextCancelled` here.
|
||||
result = await ctx.result()
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned value from callee:\n'
|
||||
f'`{result}`'
|
||||
)
|
||||
# `Context._maybe_cancel_and_set_remote_error()`.
|
||||
# As per `Context._deliver_msg()`, that error IS
|
||||
# ALWAYS SET any time "callee" side fails and causes "caller
|
||||
# side" cancellation via a `ContextCancelled` here.
|
||||
# result = await ctx.result()
|
||||
try:
|
||||
result = await ctx.result()
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned value from callee:\n'
|
||||
f'`{result}`'
|
||||
)
|
||||
except BaseException as berr:
|
||||
# on normal teardown, if we get some error
|
||||
# raised in `Context.result()` we still want to
|
||||
# save that error on the ctx's state to
|
||||
# determine things like `.cancelled_caught` for
|
||||
# cases where there was remote cancellation but
|
||||
# this task didn't know until final teardown
|
||||
# / value collection.
|
||||
scope_err = berr
|
||||
raise
|
||||
|
||||
finally:
|
||||
# though it should be impossible for any tasks
|
||||
|
@ -655,12 +719,14 @@ class Portal:
|
|||
with trio.CancelScope(shield=True):
|
||||
await ctx._recv_chan.aclose()
|
||||
|
||||
# XXX: since we always (maybe) re-raise (and thus also
|
||||
# mask runtime machinery related
|
||||
# multi-`trio.Cancelled`s) any scope error which was
|
||||
# the underlying cause of this context's exit, add
|
||||
# different log msgs for each of the (2) cases.
|
||||
# XXX: we always raise remote errors locally and
|
||||
# generally speaking mask runtime-machinery related
|
||||
# multi-`trio.Cancelled`s. As such, any `scope_error`
|
||||
# which was the underlying cause of this context's exit
|
||||
# should be stored as the `Context._local_error` and
|
||||
# used in determining `Context.cancelled_caught: bool`.
|
||||
if scope_err is not None:
|
||||
ctx._local_error: BaseException = scope_err
|
||||
etype: Type[BaseException] = type(scope_err)
|
||||
|
||||
# CASE 2
|
||||
|
@ -690,7 +756,7 @@ class Portal:
|
|||
await maybe_wait_for_debugger()
|
||||
|
||||
# FINALLY, remove the context from runtime tracking and
|
||||
# exit Bo
|
||||
# exit!
|
||||
self.actor._contexts.pop(
|
||||
(self.channel.uid, ctx.cid),
|
||||
None,
|
||||
|
|
|
@ -85,6 +85,10 @@ async def open_root_actor(
|
|||
enable_modules: list | None = None,
|
||||
rpc_module_paths: list | None = None,
|
||||
|
||||
# NOTE: allow caller to ensure that only one registry exists
|
||||
# and that this call creates it.
|
||||
ensure_registry: bool = False,
|
||||
|
||||
) -> Actor:
|
||||
'''
|
||||
Runtime init entry point for ``tractor``.
|
||||
|
@ -206,6 +210,12 @@ async def open_root_actor(
|
|||
# REGISTRAR
|
||||
if ponged_addrs:
|
||||
|
||||
if ensure_registry:
|
||||
raise RuntimeError(
|
||||
f'Failed to open `{name}`@{ponged_addrs}: '
|
||||
'registry socket(s) already bound'
|
||||
)
|
||||
|
||||
# we were able to connect to an arbiter
|
||||
logger.info(
|
||||
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
|
||||
|
|
|
@ -204,6 +204,21 @@ async def do_hard_kill(
|
|||
# terminate_after: int = 99999,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Un-gracefully terminate an OS level `trio.Process` after timeout.
|
||||
|
||||
Used in 2 main cases:
|
||||
|
||||
- "unknown remote runtime state": a hanging/stalled actor that
|
||||
isn't responding after sending a (graceful) runtime cancel
|
||||
request via an IPC msg.
|
||||
- "cancelled during spawn": a process who's actor runtime was
|
||||
cancelled before full startup completed (such that
|
||||
cancel-request-handling machinery was never fully
|
||||
initialized) and thus a "cancel request msg" is never going
|
||||
to be handled.
|
||||
|
||||
'''
|
||||
# NOTE: this timeout used to do nothing since we were shielding
|
||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||
# never release until the process exits, now it acts as
|
||||
|
@ -219,6 +234,9 @@ async def do_hard_kill(
|
|||
# and wait for it to exit. If cancelled, kills the process and
|
||||
# waits for it to finish exiting before propagating the
|
||||
# cancellation.
|
||||
#
|
||||
# This code was originally triggred by ``proc.__aexit__()``
|
||||
# but now must be called manually.
|
||||
with trio.CancelScope(shield=True):
|
||||
if proc.stdin is not None:
|
||||
await proc.stdin.aclose()
|
||||
|
@ -234,10 +252,14 @@ async def do_hard_kill(
|
|||
with trio.CancelScope(shield=True):
|
||||
await proc.wait()
|
||||
|
||||
# XXX NOTE XXX: zombie squad dispatch:
|
||||
# (should ideally never, but) If we do get here it means
|
||||
# graceful termination of a process failed and we need to
|
||||
# resort to OS level signalling to interrupt and cancel the
|
||||
# (presumably stalled or hung) actor. Since we never allow
|
||||
# zombies (as a feature) we ask the OS to do send in the
|
||||
# removal swad as the last resort.
|
||||
if cs.cancelled_caught:
|
||||
# XXX: should pretty much never get here unless we have
|
||||
# to move the bits from ``proc.__aexit__()`` out and
|
||||
# into here.
|
||||
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
|
||||
proc.kill()
|
||||
|
||||
|
@ -252,10 +274,13 @@ async def soft_wait(
|
|||
portal: Portal,
|
||||
|
||||
) -> None:
|
||||
# Wait for proc termination but **dont' yet** call
|
||||
# ``trio.Process.__aexit__()`` (it tears down stdio
|
||||
# which will kill any waiting remote pdb trace).
|
||||
# This is a "soft" (cancellable) join/reap.
|
||||
'''
|
||||
Wait for proc termination but **dont' yet** teardown
|
||||
std-streams (since it will clobber any ongoing pdb REPL
|
||||
session). This is our "soft" (and thus itself cancellable)
|
||||
join/reap on an actor-runtime-in-process.
|
||||
|
||||
'''
|
||||
uid = portal.channel.uid
|
||||
try:
|
||||
log.cancel(f'Soft waiting on actor:\n{uid}')
|
||||
|
@ -278,7 +303,13 @@ async def soft_wait(
|
|||
await wait_func(proc)
|
||||
n.cancel_scope.cancel()
|
||||
|
||||
# start a task to wait on the termination of the
|
||||
# process by itself waiting on a (caller provided) wait
|
||||
# function which should unblock when the target process
|
||||
# has terminated.
|
||||
n.start_soon(cancel_on_proc_deth)
|
||||
|
||||
# send the actor-runtime a cancel request.
|
||||
await portal.cancel_actor()
|
||||
|
||||
if proc.poll() is None: # type: ignore
|
||||
|
|
Loading…
Reference in New Issue