Compare commits

..

No commits in common. "3f159235374b960d120c55a1be844cd1434ca48b" and "131674eabd76a5b73d45259c9af4bd3d03832133" have entirely different histories.

9 changed files with 173 additions and 426 deletions

View File

@ -13,11 +13,6 @@ from typing import Optional
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor import (
Actor,
Context,
current_actor,
)
from tractor._exceptions import ( from tractor._exceptions import (
StreamOverrun, StreamOverrun,
ContextCancelled, ContextCancelled,
@ -198,6 +193,9 @@ def test_simple_context(
else: else:
assert await ctx.result() == 'yo' assert await ctx.result() == 'yo'
if not error_parent:
await ctx.cancel()
if pointlessly_open_stream: if pointlessly_open_stream:
async with ctx.open_stream(): async with ctx.open_stream():
if error_parent: if error_parent:
@ -210,15 +208,10 @@ def test_simple_context(
# 'stop' msg to the far end which needs # 'stop' msg to the far end which needs
# to be ignored # to be ignored
pass pass
else: else:
if error_parent: if error_parent:
raise error_parent raise error_parent
# cancel AFTER we open a stream
# to avoid a cancel raised inside
# `.open_stream()`
await ctx.cancel()
finally: finally:
# after cancellation # after cancellation
@ -283,7 +276,7 @@ def test_caller_cancels(
assert ( assert (
tuple(err.canceller) tuple(err.canceller)
== ==
current_actor().uid tractor.current_actor().uid
) )
async def main(): async def main():
@ -437,11 +430,9 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
): ):
'caller context closes without using stream' 'caller context closes without using stream'
async with tractor.open_nursery() as an: async with tractor.open_nursery() as n:
root: Actor = current_actor() portal = await n.start_actor(
portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -449,10 +440,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context( async with portal.open_context(
expect_cancelled, expect_cancelled,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
assert sent is None
# call cancel explicitly # call cancel explicitly
if use_ctx_cancel_method: if use_ctx_cancel_method:
@ -463,21 +454,8 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async for msg in stream: async for msg in stream:
pass pass
except tractor.ContextCancelled as ctxc: except tractor.ContextCancelled:
# XXX: the cause is US since we call raise # XXX: must be propagated to __aexit__
# `Context.cancel()` just above!
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
else: else:
assert 0, "Should have context cancelled?" assert 0, "Should have context cancelled?"
@ -494,13 +472,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
await ctx.result() await ctx.result()
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already called above
await ctx.cancel() await ctx.cancel()
# local scope should have absorbed the cancellation
assert ctx.cancelled_caught
assert ctx._remote_error is ctx._local_error
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream: async for msg in stream:
@ -579,25 +551,19 @@ async def cancel_self(
global _state global _state
_state = True _state = True
# since we call this the below `.open_stream()` should always
# error!
await ctx.cancel() await ctx.cancel()
# should inline raise immediately # should inline raise immediately
try: try:
async with ctx.open_stream(): async with ctx.open_stream():
pass pass
# except tractor.ContextCancelled: except tractor.ContextCancelled:
except RuntimeError:
# suppress for now so we can do checkpoint tests below # suppress for now so we can do checkpoint tests below
print('Got expected runtime error for stream-after-cancel') pass
else: else:
raise RuntimeError('Context didnt cancel itself?!') raise RuntimeError('Context didnt cancel itself?!')
# check that``trio.Cancelled`` is now raised on any further # check a real ``trio.Cancelled`` is raised on a checkpoint
# checkpoints since the self cancel above will have cancelled
# the `Context._scope.cancel_scope: trio.CancelScope`
try: try:
with trio.fail_after(0.1): with trio.fail_after(0.1):
await trio.sleep_forever() await trio.sleep_forever()
@ -608,7 +574,6 @@ async def cancel_self(
# should never get here # should never get here
assert 0 assert 0
raise RuntimeError('Context didnt cancel itself?!')
@tractor_test @tractor_test
async def test_callee_cancels_before_started(): async def test_callee_cancels_before_started():
@ -636,7 +601,7 @@ async def test_callee_cancels_before_started():
ce.type == trio.Cancelled ce.type == trio.Cancelled
# the traceback should be informative # the traceback should be informative
assert 'itself' in ce.msgdata['tb_str'] assert 'cancelled itself' in ce.msgdata['tb_str']
# teardown the actor # teardown the actor
await portal.cancel_actor() await portal.cancel_actor()
@ -808,7 +773,7 @@ async def echo_back_sequence(
print( print(
'EXITING CALLEEE:\n' 'EXITING CALLEEE:\n'
f'{ctx.canceller}' f'{ctx.cancel_called_remote}'
) )
return 'yo' return 'yo'
@ -906,7 +871,7 @@ def test_maybe_allow_overruns_stream(
if cancel_ctx: if cancel_ctx:
assert isinstance(res, ContextCancelled) assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == current_actor().uid assert tuple(res.canceller) == tractor.current_actor().uid
else: else:
print(f'RX ROOT SIDE RESULT {res}') print(f'RX ROOT SIDE RESULT {res}')

View File

@ -225,7 +225,7 @@ def test_context_spawns_aio_task_that_errors(
await trio.sleep_forever() await trio.sleep_forever()
return await ctx.result() return await ctx.result()
if parent_cancels: if parent_cancels:
# bc the parent made the cancel request, # bc the parent made the cancel request,

View File

@ -15,26 +15,6 @@ from tractor import ( # typing
ContextCancelled, ContextCancelled,
) )
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg raised in all open ctxs
# with that peer.
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
# (also) spawned a failing task which was unhandled and
# propagated up to the immediate parent - the peer to the actor
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel(): # def test_self_cancel():
# ''' # '''
@ -49,30 +29,14 @@ from tractor import ( # typing
@tractor.context @tractor.context
async def sleep_forever( async def sleep_forever(
ctx: Context, ctx: Context,
expect_ctxc: bool = False,
) -> None: ) -> None:
''' '''
Sync the context, open a stream then just sleep. Sync the context, open a stream then just sleep.
Allow checking for (context) cancellation locally.
''' '''
try: await ctx.started()
await ctx.started() async with ctx.open_stream():
async with ctx.open_stream(): await trio.sleep_forever()
await trio.sleep_forever()
except BaseException as berr:
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_ctxc:
assert isinstance(berr, trio.Cancelled)
raise
@tractor.context @tractor.context
@ -181,7 +145,6 @@ async def stream_ints(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
for i in itertools.count(): for i in itertools.count():
await stream.send(i) await stream.send(i)
await trio.sleep(0.01)
@tractor.context @tractor.context
@ -198,81 +161,73 @@ async def stream_from_peer(
peer_ctx.open_stream() as stream, peer_ctx.open_stream() as stream,
): ):
await ctx.started() await ctx.started()
# XXX QUESTIONS & TODO: for further details around this # XXX TODO: big set of questions for this
# in the longer run..
# https://github.com/goodboy/tractor/issues/368
# - should we raise `ContextCancelled` or `Cancelled` (rn # - should we raise `ContextCancelled` or `Cancelled` (rn
# it does latter) and should/could it be implemented # it does that) here?!
# as a general injection override for `trio` such # - test the `ContextCancelled` OUTSIDE the
# that ANY next checkpoint would raise the "cancel # `.open_context()` call?
# error type" of choice? try:
# - should the `ContextCancelled` bubble from async for msg in stream:
# all `Context` and `MsgStream` apis wherein it print(msg)
# prolly makes the most sense to make it
# a `trio.Cancelled` subtype? except trio.Cancelled:
# - what about IPC-transport specific errors, should assert not ctx.cancel_called
# they bubble from the async for and trigger assert not ctx.cancelled_caught
# other special cases?
# NOTE: current ctl flow: assert not peer_ctx.cancel_called
# - stream raises `trio.EndOfChannel` and assert not peer_ctx.cancelled_caught
# exits the loop
# - `.open_context()` will raise the ctxcanc assert 'root' in ctx.cancel_called_remote
# received from the sleeper.
async for msg in stream: raise # XXX MUST NEVER MASK IT!!
assert msg is not None
print(msg) with trio.CancelScope(shield=True):
await tractor.pause()
# pass
# pytest.fail(
raise RuntimeError(
'peer never triggered local `[Context]Cancelled`?!?'
)
# NOTE: cancellation of the (sleeper) peer should always # NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming # cause a `ContextCancelled` raise in this streaming
# actor. # actor.
except ContextCancelled as ctxerr: except ContextCancelled as ctxerr:
err = ctxerr assert ctxerr.canceller == 'canceller'
assert peer_ctx._remote_error is ctxerr assert ctxerr._remote_error is ctxerr
assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
# XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task
# was started in, exits.
assert not ctx.cancelled_caught
# we never requested cancellation
assert not peer_ctx.cancel_called
# the `.open_context()` exit definitely caught
# a cancellation in the internal `Context._scope` since
# likely the runtime called `_deliver_msg()` after
# receiving the remote error from the streaming task.
assert peer_ctx.cancelled_caught
# TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside
# `.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last
# checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368
# root/parent actor task should NEVER HAVE cancelled us!
assert not ctx.canceller
assert 'canceller' in peer_ctx.canceller
# CASE 1: we were cancelled by our parent, the root actor.
# TODO: there are other cases depending on how the root
# actor and it's caller side task are written:
# - if the root does not req us to cancel then an
# IPC-transport related error should bubble from the async
# for loop and thus cause local cancellation both here
# and in the root (since in that case this task cancels the
# context with the root, not the other way around)
assert ctx.cancel_called_remote[0] == 'root'
raise raise
# TODO: IN THEORY we could have other cases depending on
# who cancels first, the root actor or the canceller peer?.
#
# 1- when the peer request is first then the `.canceller`
# field should obvi be set to the 'canceller' uid,
#
# 2-if the root DOES req cancel then we should see the same
# `trio.Cancelled` implicitly raised
# assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper'
raise RuntimeError( # except BaseException as err:
'peer never triggered local `ContextCancelled`?'
)
# raise
# cases:
# - some arbitrary remote peer cancels via Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg.
# - peer spawned a sub-actor which (also) spawned a failing task
# which was unhandled and propagated up to the immediate
# parent, the peer to the actor that also spawned a remote task
# task in that same peer-parent.
# - peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
@pytest.mark.parametrize( @pytest.mark.parametrize(
'error_during_ctxerr_handling', 'error_during_ctxerr_handling',
@ -296,8 +251,8 @@ def test_peer_canceller(
line and be less indented. line and be less indented.
.actor0> ()-> .actor1> .actor0> ()-> .actor1>
a inter-actor task context opened (by `async with a inter-actor task context opened (by `async with `Portal.open_context()`)
`Portal.open_context()`) from actor0 *into* actor1. from actor0 *into* actor1.
.actor0> ()<=> .actor1> .actor0> ()<=> .actor1>
a inter-actor task context opened (as above) a inter-actor task context opened (as above)
@ -332,12 +287,11 @@ def test_peer_canceller(
5. .canceller> ()-> .sleeper> 5. .canceller> ()-> .sleeper>
- calls `Portal.cancel_actor()` - calls `Portal.cancel_actor()`
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as an:
# NOTE: to halt the peer tasks on ctxc, uncomment this.
# debug_mode=True
) as an:
canceller: Portal = await an.start_actor( canceller: Portal = await an.start_actor(
'canceller', 'canceller',
enable_modules=[__name__], enable_modules=[__name__],
@ -351,13 +305,10 @@ def test_peer_canceller(
enable_modules=[__name__], enable_modules=[__name__],
) )
root = tractor.current_actor()
try: try:
async with ( async with (
sleeper.open_context( sleeper.open_context(
sleep_forever, sleep_forever,
expect_ctxc=True,
) as (sleeper_ctx, sent), ) as (sleeper_ctx, sent),
just_caller.open_context( just_caller.open_context(
@ -384,15 +335,16 @@ def test_peer_canceller(
'Context.result() did not raise ctx-cancelled?' 'Context.result() did not raise ctx-cancelled?'
) )
# should always raise since this root task does # TODO: not sure why this isn't catching
# not request the sleeper cancellation ;) # but maybe we need an `ExceptionGroup` and
# the whole except *errs: thinger in 3.11?
except ContextCancelled as ctxerr: except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}') print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not # canceller and caller peers should not
# have been remotely cancelled. # have been remotely cancelled.
assert canceller_ctx.canceller is None assert canceller_ctx.cancel_called_remote is None
assert caller_ctx.canceller is None assert caller_ctx.cancel_called_remote is None
assert ctxerr.canceller[0] == 'canceller' assert ctxerr.canceller[0] == 'canceller'
@ -403,14 +355,16 @@ def test_peer_canceller(
# block it should be. # block it should be.
assert not sleeper_ctx.cancelled_caught assert not sleeper_ctx.cancelled_caught
# TODO: a test which ensures this error is
# bubbled and caught (NOT MASKED) by the
# runtime!!!
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown') raise RuntimeError('Simulated error during teardown')
raise raise
# XXX SHOULD NEVER EVER GET HERE XXX # SHOULD NEVER GET HERE!
except BaseException as berr: except BaseException:
err = berr
pytest.fail('did not rx ctx-cancelled error?') pytest.fail('did not rx ctx-cancelled error?')
else: else:
pytest.fail('did not rx ctx-cancelled error?') pytest.fail('did not rx ctx-cancelled error?')
@ -421,20 +375,6 @@ def test_peer_canceller(
)as ctxerr: )as ctxerr:
_err = ctxerr _err = ctxerr
# NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
# - `.cancel_called` (bool of whether this side
# requested)
# - `.canceller` (uid of cancel-causing actor-task)
# - `._remote_error` (any `RemoteActorError`
# instance from other side of context)
# TODO: are we really planning to use this tho?
# - `._cancel_msg` (any msg that caused the
# cancel)
# CASE: error raised during handling of
# `ContextCancelled` inside `.open_context()`
# block
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError) assert isinstance(ctxerr, RuntimeError)
@ -444,42 +384,20 @@ def test_peer_canceller(
for ctx in ctxs: for ctx in ctxs:
assert ctx.cancel_called assert ctx.cancel_called
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# this root actor task should have # this root actor task should have
# cancelled all opened contexts except the # cancelled all opened contexts except
# sleeper which is obvi by the "canceller" # the sleeper which is cancelled by its
# peer. # peer "canceller"
re = ctx._remote_error if ctx is not sleeper_ctx:
if ( assert ctx._remote_error.canceller[0] == 'root'
ctx is sleeper_ctx
or ctx is caller_ctx
):
assert (
re.canceller
==
ctx.canceller
==
canceller.channel.uid
)
else:
assert (
re.canceller
==
ctx.canceller
==
root.uid
)
# CASE: standard teardown inside in `.open_context()` block
else: else:
assert ctxerr.canceller == sleeper_ctx.canceller assert ctxerr.canceller[0] == 'canceller'
assert (
ctxerr.canceller[0]
==
sleeper_ctx.canceller[0]
==
'canceller'
)
# the sleeper's remote error is the error bubbled # the sleeper's remote error is the error bubbled
# out of the context-stack above! # out of the context-stack above!
@ -487,43 +405,18 @@ def test_peer_canceller(
assert re is ctxerr assert re is ctxerr
for ctx in ctxs: for ctx in ctxs:
re: BaseException | None = ctx._remote_error
assert re
# root doesn't cancel sleeper since it's
# cancelled by its peer.
if ctx is sleeper_ctx: if ctx is sleeper_ctx:
assert not ctx.cancel_called assert not ctx.cancel_called
# since sleeper_ctx.result() IS called
# above we should have (silently)
# absorbed the corresponding
# `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught`
# should trigger!
assert ctx.cancelled_caught assert ctx.cancelled_caught
elif ctx is caller_ctx:
# since its context was remotely
# cancelled, we never needed to
# call `Context.cancel()` bc it was
# done by the peer and also we never
assert ctx.cancel_called
# TODO: figure out the details of
# this..
# if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught
else: else:
assert ctx.cancel_called assert ctx.cancel_called
assert not ctx.cancelled_caught assert not ctx.cancelled_caught
# TODO: do we even need this flag? # each context should have received
# -> each context should have received
# a silently absorbed context cancellation # a silently absorbed context cancellation
# in its remote nursery scope. # from its peer actor's task.
# assert ctx.chan.uid == ctx.canceller assert ctx.chan.uid == ctx.cancel_called_remote
# NOTE: when an inter-peer cancellation # NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this # occurred, we DO NOT expect this
@ -541,7 +434,9 @@ def test_peer_canceller(
# including the case where ctx-cancel handling # including the case where ctx-cancel handling
# itself errors. # itself errors.
assert sleeper_ctx.cancelled_caught assert sleeper_ctx.cancelled_caught
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
# await tractor.pause()
raise # always to ensure teardown raise # always to ensure teardown
if error_during_ctxerr_handling: if error_during_ctxerr_handling:

View File

@ -211,14 +211,10 @@ async def find_actor(
# 'Gathered portals:\n' # 'Gathered portals:\n'
# f'{portals}' # f'{portals}'
# ) # )
# NOTE: `gather_contexts()` will return a if not portals:
# `tuple[None, None, ..., None]` if no contact
# can be made with any regstrar at any of the
# N provided addrs!
if not any(portals):
if raise_on_none: if raise_on_none:
raise RuntimeError( raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}' f'No {name} found registered @ {registry_addrs}'
) )
yield None yield None
return return

View File

@ -294,11 +294,9 @@ class Channel:
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# flag set by ``Portal.cancel_actor()`` indicating remote # remote (peer) cancellation of the far end actor runtime.
# (possibly peer) cancellation of the far end actor self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
# runtime.
self._cancel_called: bool = False
@classmethod @classmethod
def from_stream( def from_stream(

View File

@ -100,7 +100,7 @@ def parse_maddr(
multiaddr: str, multiaddr: str,
) -> dict[str, str | int | dict]: ) -> dict[str, str | int | dict]:
''' '''
Parse a libp2p style "multiaddress" into its distinct protocol Parse a libp2p style "multiaddress" into it's distinct protocol
segments where each segment is of the form: segments where each segment is of the form:
`../<protocol>/<param0>/<param1>/../<paramN>` `../<protocol>/<param0>/<param1>/../<paramN>`

View File

@ -48,7 +48,6 @@ from ._exceptions import (
unpack_error, unpack_error,
NoResult, NoResult,
ContextCancelled, ContextCancelled,
MessagingError,
) )
from ._context import Context from ._context import Context
from ._streaming import MsgStream from ._streaming import MsgStream
@ -72,6 +71,11 @@ def _unwrap_msg(
raise unpack_error(msg, channel) from None raise unpack_error(msg, channel) from None
# TODO: maybe move this to ._exceptions?
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
class Portal: class Portal:
''' '''
A 'portal' to a memory-domain-separated `Actor`. A 'portal' to a memory-domain-separated `Actor`.
@ -216,18 +220,14 @@ class Portal:
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with # XXX: sure would be nice to make this work with a proper shield
# a proper shield
with trio.move_on_after( with trio.move_on_after(
timeout timeout
or self.cancel_timeout or self.cancel_timeout
) as cs: ) as cs:
cs.shield = True cs.shield = True
await self.run_from_ns( await self.run_from_ns('self', 'cancel')
'self',
'cancel',
)
return True return True
if cs.cancelled_caught: if cs.cancelled_caught:
@ -462,14 +462,10 @@ class Portal:
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
# ``Context.started()`` call. # ``Context.started()`` call.
first: Any = msg['started'] first = msg['started']
ctx._started_called: bool = True ctx._started_called: bool = True
except KeyError: except KeyError:
# TODO: can we maybe factor this into the new raiser
# `_streaming._raise_from_no_yield_msg()` and make that
# helper more generic, say with a `_no_<blah>_msg()`?
if not (cid := msg.get('cid')): if not (cid := msg.get('cid')):
raise MessagingError( raise MessagingError(
'Received internal error at context?\n' 'Received internal error at context?\n'
@ -521,102 +517,54 @@ class Portal:
# started in the ctx nursery. # started in the ctx nursery.
ctx._scope.cancel() ctx._scope.cancel()
# XXX NOTE XXX: maybe shield against # XXX: (maybe) shield/mask context-cancellations that were
# self-context-cancellation (which raises a local # initiated by any of the context's 2 tasks. There are
# `ContextCancelled`) when requested (via # subsequently 2 operating cases for a "graceful cancel"
# `Context.cancel()`) by the same task (tree) which entered # of a `Context`:
# THIS `.open_context()`.
# #
# NOTE: There are 2 operating cases for a "graceful cancel" # 1.*this* side's task called `Context.cancel()`, in
# of a `Context`. In both cases any `ContextCancelled` # which case we mask the `ContextCancelled` from bubbling
# raised in this scope-block came from a transport msg # to the opener (much like how `trio.Nursery` swallows
# relayed from some remote-actor-task which our runtime set # any `trio.Cancelled` bubbled by a call to
# as a `Context._remote_error`
#
# the CASES:
#
# - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit
# without raising.
#
# - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also
# setting our own ctx's "reason for cancel" to be that
# other context's cancellation condition; we set our
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# Again, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`) # `Nursery.cancel_scope.cancel()`)
#
# 2.*the other* side's (callee/spawned) task cancelled due
# to a self or peer cancellation request in which case we
# DO let the error bubble to the opener.
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
# CASE 1: this context was never cancelled
# via a local task's call to `Context.cancel()`.
if not ctx._cancel_called:
raise
# CASE 2: context was cancelled by local task calling # CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should # `.cancel()`, we don't raise and the exit block should
# exit silently. # exit silently.
if ( else:
ctx._cancel_called
and (
ctxc is ctx._remote_error
or
ctxc.canceller is self.canceller
)
):
log.debug( log.debug(
f'Context {ctx} cancelled gracefully with:\n' f'Context {ctx} cancelled gracefully with:\n'
f'{ctxc}' f'{ctxc}'
) )
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else!
else:
raise
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except ( except (
# CASE 3: standard local error in this caller/yieldee # - a standard error in the caller/yieldee
Exception, Exception,
# CASES 1 & 2: normally manifested as # - a runtime teardown exception-group and/or
# a `Context._scope_nursery` raised # cancellation request from a caller task.
# exception-group of, BaseExceptionGroup,
# 1.-`trio.Cancelled`s, since trio.Cancelled,
# `._scope.cancel()` will have been called and any
# `ContextCancelled` absorbed and thus NOT RAISED in
# any `Context._maybe_raise_remote_err()`,
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error raised in the "callee" side with
# a group only raised if there was any more then one
# task started here in the "caller" in the
# `yield`-ed to task.
BaseExceptionGroup, # since overrun handler tasks may have been spawned
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
scope_err = err scope_err = err
# XXX: ALWAYS request the context to CANCEL ON any ERROR. # XXX: request cancel of this context on any error.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in # NOTE: `Context.cancel()` is conversely NOT called in
# the `ContextCancelled` "self cancellation absorbed" case # the `ContextCancelled` "cancellation requested" case
# handled in the block above! # above.
log.cancel( log.cancel(
'Context cancelled for task due to\n' 'Context cancelled for task due to\n'
f'{err}\n' f'{err}\n'
@ -635,7 +583,7 @@ class Portal:
raise # duh raise # duh
# no local scope error, the "clean exit with a result" case. # no scope error case
else: else:
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.info(
@ -649,27 +597,15 @@ class Portal:
# `Context._maybe_raise_remote_err()`) IFF # `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime # a `Context._remote_error` was set by the runtime
# via a call to # via a call to
# `Context._maybe_cancel_and_set_remote_error()`. # `Context._maybe_cancel_and_set_remote_error()`
# As per `Context._deliver_msg()`, that error IS # which IS SET any time the far end fails and
# ALWAYS SET any time "callee" side fails and causes "caller # causes "caller side" cancellation via
# side" cancellation via a `ContextCancelled` here. # a `ContextCancelled` here.
# result = await ctx.result() result = await ctx.result()
try: log.runtime(
result = await ctx.result() f'Context {fn_name} returned value from callee:\n'
log.runtime( f'`{result}`'
f'Context {fn_name} returned value from callee:\n' )
f'`{result}`'
)
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
# this task didn't know until final teardown
# / value collection.
scope_err = berr
raise
finally: finally:
# though it should be impossible for any tasks # though it should be impossible for any tasks
@ -719,14 +655,12 @@ class Portal:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose() await ctx._recv_chan.aclose()
# XXX: we always raise remote errors locally and # XXX: since we always (maybe) re-raise (and thus also
# generally speaking mask runtime-machinery related # mask runtime machinery related
# multi-`trio.Cancelled`s. As such, any `scope_error` # multi-`trio.Cancelled`s) any scope error which was
# which was the underlying cause of this context's exit # the underlying cause of this context's exit, add
# should be stored as the `Context._local_error` and # different log msgs for each of the (2) cases.
# used in determining `Context.cancelled_caught: bool`.
if scope_err is not None: if scope_err is not None:
ctx._local_error: BaseException = scope_err
etype: Type[BaseException] = type(scope_err) etype: Type[BaseException] = type(scope_err)
# CASE 2 # CASE 2
@ -756,7 +690,7 @@ class Portal:
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit Bo
self.actor._contexts.pop( self.actor._contexts.pop(
(self.channel.uid, ctx.cid), (self.channel.uid, ctx.cid),
None, None,

View File

@ -85,10 +85,6 @@ async def open_root_actor(
enable_modules: list | None = None, enable_modules: list | None = None,
rpc_module_paths: list | None = None, rpc_module_paths: list | None = None,
# NOTE: allow caller to ensure that only one registry exists
# and that this call creates it.
ensure_registry: bool = False,
) -> Actor: ) -> Actor:
''' '''
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
@ -210,12 +206,6 @@ async def open_root_actor(
# REGISTRAR # REGISTRAR
if ponged_addrs: if ponged_addrs:
if ensure_registry:
raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: '
'registry socket(s) already bound'
)
# we were able to connect to an arbiter # we were able to connect to an arbiter
logger.info( logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}' f'Registry(s) seem(s) to exist @ {ponged_addrs}'

View File

@ -204,21 +204,6 @@ async def do_hard_kill(
# terminate_after: int = 99999, # terminate_after: int = 99999,
) -> None: ) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
Used in 2 main cases:
- "unknown remote runtime state": a hanging/stalled actor that
isn't responding after sending a (graceful) runtime cancel
request via an IPC msg.
- "cancelled during spawn": a process who's actor runtime was
cancelled before full startup completed (such that
cancel-request-handling machinery was never fully
initialized) and thus a "cancel request msg" is never going
to be handled.
'''
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
@ -234,9 +219,6 @@ async def do_hard_kill(
# and wait for it to exit. If cancelled, kills the process and # and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the # waits for it to finish exiting before propagating the
# cancellation. # cancellation.
#
# This code was originally triggred by ``proc.__aexit__()``
# but now must be called manually.
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
if proc.stdin is not None: if proc.stdin is not None:
await proc.stdin.aclose() await proc.stdin.aclose()
@ -252,14 +234,10 @@ async def do_hard_kill(
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await proc.wait() await proc.wait()
# XXX NOTE XXX: zombie squad dispatch:
# (should ideally never, but) If we do get here it means
# graceful termination of a process failed and we need to
# resort to OS level signalling to interrupt and cancel the
# (presumably stalled or hung) actor. Since we never allow
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}") log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
proc.kill() proc.kill()
@ -274,13 +252,10 @@ async def soft_wait(
portal: Portal, portal: Portal,
) -> None: ) -> None:
''' # Wait for proc termination but **dont' yet** call
Wait for proc termination but **dont' yet** teardown # ``trio.Process.__aexit__()`` (it tears down stdio
std-streams (since it will clobber any ongoing pdb REPL # which will kill any waiting remote pdb trace).
session). This is our "soft" (and thus itself cancellable) # This is a "soft" (cancellable) join/reap.
join/reap on an actor-runtime-in-process.
'''
uid = portal.channel.uid uid = portal.channel.uid
try: try:
log.cancel(f'Soft waiting on actor:\n{uid}') log.cancel(f'Soft waiting on actor:\n{uid}')
@ -303,13 +278,7 @@ async def soft_wait(
await wait_func(proc) await wait_func(proc)
n.cancel_scope.cancel() n.cancel_scope.cancel()
# start a task to wait on the termination of the
# process by itself waiting on a (caller provided) wait
# function which should unblock when the target process
# has terminated.
n.start_soon(cancel_on_proc_deth) n.start_soon(cancel_on_proc_deth)
# send the actor-runtime a cancel request.
await portal.cancel_actor() await portal.cancel_actor()
if proc.poll() is None: # type: ignore if proc.poll() is None: # type: ignore