Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 3f15923537 More thurough hard kill doc strings 2023-12-11 18:17:42 -05:00
Tyler Goodlet 87cd725adb Add `open_root_actor(ensure_registry: bool)`
Allows forcing the opened actor to either obtain the passed registry
addrs or raise a runtime error.
2023-11-07 16:45:24 -05:00
Tyler Goodlet 48accbd28f Fix doc string "its" typo.. 2023-11-06 15:44:21 -05:00
Tyler Goodlet 227c9ea173 Test with `any(portals)` since `gather_contexts()` will return `list[None | tuple]` 2023-11-06 15:43:43 -05:00
Tyler Goodlet d651f3d8e9 Tons of interpeer test cleanup
Drop all the nested `@acm` blocks and defunct comments from initial
validations. Add some todos for cases that are still unclear such as
whether the caller / streamer should have `.cancelled_caught == True` in
it's teardown.
2023-10-25 15:21:41 -04:00
Tyler Goodlet ef0cfc4b20 Get inter-peer suite passing with all `Context` state checks!
Definitely needs some cleaning and refinement but this gets us to stage
1 of being pretty frickin correct i'd say 💃
2023-10-23 18:24:23 -04:00
Tyler Goodlet ecb525a2bc Adjust test details where `Context.cancel()` is called
We can now make asserts on `.cancelled_caught` and `_remote_error` vs.
`_local_error`. Expect a runtime error when `Context.open_stream()` is
called AFTER `.cancel()` and the remote `ContextCancelled` hasn't
arrived (yet). Adjust to `'itself'` string in self-cancel case.
2023-10-23 17:49:02 -04:00
Tyler Goodlet b77d123edd Fix `Context.result()` call to be in runtime scope 2023-10-23 17:48:34 -04:00
Tyler Goodlet f4e63465de Tweak `Channel._cancel_called` comment 2023-10-23 17:47:55 -04:00
Tyler Goodlet df31047ecb Be ultra-correct in `Portal.open_context()`
This took way too long to get right but hopefully will give us grok-able
and correct context exit semantics going forward B)

The main fixes were:
- always shielding the `MsgStream.aclose()` call on teardown to avoid
  bubbling a `Cancelled`.
- properly absorbing any `ContextCancelled` in cases due to "self
  cancellation" using the new `Context.canceller` in the logic.
- capturing any error raised by the `Context.result()` call in the
  "normal exit, result received" case and setting it as the
  `Context._local_error` so that self-cancels can be easily measured via
  `Context.cancelled_caught` in same way as remote-error caused
  cancellations.
- extremely detailed comments around all of the cancellation-error cases
  to avoid ever getting confused about the control flow in the future XD
2023-10-23 17:34:28 -04:00
9 changed files with 426 additions and 173 deletions

View File

@ -13,6 +13,11 @@ from typing import Optional
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor import (
Actor,
Context,
current_actor,
)
from tractor._exceptions import ( from tractor._exceptions import (
StreamOverrun, StreamOverrun,
ContextCancelled, ContextCancelled,
@ -193,9 +198,6 @@ def test_simple_context(
else: else:
assert await ctx.result() == 'yo' assert await ctx.result() == 'yo'
if not error_parent:
await ctx.cancel()
if pointlessly_open_stream: if pointlessly_open_stream:
async with ctx.open_stream(): async with ctx.open_stream():
if error_parent: if error_parent:
@ -208,10 +210,15 @@ def test_simple_context(
# 'stop' msg to the far end which needs # 'stop' msg to the far end which needs
# to be ignored # to be ignored
pass pass
else: else:
if error_parent: if error_parent:
raise error_parent raise error_parent
# cancel AFTER we open a stream
# to avoid a cancel raised inside
# `.open_stream()`
await ctx.cancel()
finally: finally:
# after cancellation # after cancellation
@ -276,7 +283,7 @@ def test_caller_cancels(
assert ( assert (
tuple(err.canceller) tuple(err.canceller)
== ==
tractor.current_actor().uid current_actor().uid
) )
async def main(): async def main():
@ -430,9 +437,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
): ):
'caller context closes without using stream' 'caller context closes without using stream'
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.start_actor( root: Actor = current_actor()
portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -440,10 +449,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context( async with portal.open_context(
expect_cancelled, expect_cancelled,
) as (ctx, sent): ) as (ctx, sent):
await portal.run(assert_state, value=True)
assert sent is None assert sent is None
await portal.run(assert_state, value=True)
# call cancel explicitly # call cancel explicitly
if use_ctx_cancel_method: if use_ctx_cancel_method:
@ -454,8 +463,21 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async for msg in stream: async for msg in stream:
pass pass
except tractor.ContextCancelled: except tractor.ContextCancelled as ctxc:
raise # XXX: must be propagated to __aexit__ # XXX: the cause is US since we call
# `Context.cancel()` just above!
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
else: else:
assert 0, "Should have context cancelled?" assert 0, "Should have context cancelled?"
@ -472,7 +494,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
await ctx.result() await ctx.result()
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already called above
await ctx.cancel() await ctx.cancel()
# local scope should have absorbed the cancellation
assert ctx.cancelled_caught
assert ctx._remote_error is ctx._local_error
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream: async for msg in stream:
@ -551,19 +579,25 @@ async def cancel_self(
global _state global _state
_state = True _state = True
# since we call this the below `.open_stream()` should always
# error!
await ctx.cancel() await ctx.cancel()
# should inline raise immediately # should inline raise immediately
try: try:
async with ctx.open_stream(): async with ctx.open_stream():
pass pass
except tractor.ContextCancelled: # except tractor.ContextCancelled:
except RuntimeError:
# suppress for now so we can do checkpoint tests below # suppress for now so we can do checkpoint tests below
pass print('Got expected runtime error for stream-after-cancel')
else: else:
raise RuntimeError('Context didnt cancel itself?!') raise RuntimeError('Context didnt cancel itself?!')
# check a real ``trio.Cancelled`` is raised on a checkpoint # check that``trio.Cancelled`` is now raised on any further
# checkpoints since the self cancel above will have cancelled
# the `Context._scope.cancel_scope: trio.CancelScope`
try: try:
with trio.fail_after(0.1): with trio.fail_after(0.1):
await trio.sleep_forever() await trio.sleep_forever()
@ -574,6 +608,7 @@ async def cancel_self(
# should never get here # should never get here
assert 0 assert 0
raise RuntimeError('Context didnt cancel itself?!')
@tractor_test @tractor_test
async def test_callee_cancels_before_started(): async def test_callee_cancels_before_started():
@ -601,7 +636,7 @@ async def test_callee_cancels_before_started():
ce.type == trio.Cancelled ce.type == trio.Cancelled
# the traceback should be informative # the traceback should be informative
assert 'cancelled itself' in ce.msgdata['tb_str'] assert 'itself' in ce.msgdata['tb_str']
# teardown the actor # teardown the actor
await portal.cancel_actor() await portal.cancel_actor()
@ -773,7 +808,7 @@ async def echo_back_sequence(
print( print(
'EXITING CALLEEE:\n' 'EXITING CALLEEE:\n'
f'{ctx.cancel_called_remote}' f'{ctx.canceller}'
) )
return 'yo' return 'yo'
@ -871,7 +906,7 @@ def test_maybe_allow_overruns_stream(
if cancel_ctx: if cancel_ctx:
assert isinstance(res, ContextCancelled) assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == tractor.current_actor().uid assert tuple(res.canceller) == current_actor().uid
else: else:
print(f'RX ROOT SIDE RESULT {res}') print(f'RX ROOT SIDE RESULT {res}')

View File

@ -225,7 +225,7 @@ def test_context_spawns_aio_task_that_errors(
await trio.sleep_forever() await trio.sleep_forever()
return await ctx.result() return await ctx.result()
if parent_cancels: if parent_cancels:
# bc the parent made the cancel request, # bc the parent made the cancel request,

View File

@ -15,6 +15,26 @@ from tractor import ( # typing
ContextCancelled, ContextCancelled,
) )
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg raised in all open ctxs
# with that peer.
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
# (also) spawned a failing task which was unhandled and
# propagated up to the immediate parent - the peer to the actor
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel(): # def test_self_cancel():
# ''' # '''
@ -29,14 +49,30 @@ from tractor import ( # typing
@tractor.context @tractor.context
async def sleep_forever( async def sleep_forever(
ctx: Context, ctx: Context,
expect_ctxc: bool = False,
) -> None: ) -> None:
''' '''
Sync the context, open a stream then just sleep. Sync the context, open a stream then just sleep.
Allow checking for (context) cancellation locally.
''' '''
await ctx.started() try:
async with ctx.open_stream(): await ctx.started()
await trio.sleep_forever() async with ctx.open_stream():
await trio.sleep_forever()
except BaseException as berr:
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_ctxc:
assert isinstance(berr, trio.Cancelled)
raise
@tractor.context @tractor.context
@ -145,6 +181,7 @@ async def stream_ints(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
for i in itertools.count(): for i in itertools.count():
await stream.send(i) await stream.send(i)
await trio.sleep(0.01)
@tractor.context @tractor.context
@ -161,73 +198,81 @@ async def stream_from_peer(
peer_ctx.open_stream() as stream, peer_ctx.open_stream() as stream,
): ):
await ctx.started() await ctx.started()
# XXX TODO: big set of questions for this # XXX QUESTIONS & TODO: for further details around this
# in the longer run..
# https://github.com/goodboy/tractor/issues/368
# - should we raise `ContextCancelled` or `Cancelled` (rn # - should we raise `ContextCancelled` or `Cancelled` (rn
# it does that) here?! # it does latter) and should/could it be implemented
# - test the `ContextCancelled` OUTSIDE the # as a general injection override for `trio` such
# `.open_context()` call? # that ANY next checkpoint would raise the "cancel
try: # error type" of choice?
async for msg in stream: # - should the `ContextCancelled` bubble from
print(msg) # all `Context` and `MsgStream` apis wherein it
# prolly makes the most sense to make it
except trio.Cancelled: # a `trio.Cancelled` subtype?
assert not ctx.cancel_called # - what about IPC-transport specific errors, should
assert not ctx.cancelled_caught # they bubble from the async for and trigger
# other special cases?
assert not peer_ctx.cancel_called # NOTE: current ctl flow:
assert not peer_ctx.cancelled_caught # - stream raises `trio.EndOfChannel` and
# exits the loop
assert 'root' in ctx.cancel_called_remote # - `.open_context()` will raise the ctxcanc
# received from the sleeper.
raise # XXX MUST NEVER MASK IT!! async for msg in stream:
assert msg is not None
with trio.CancelScope(shield=True): print(msg)
await tractor.pause()
# pass
# pytest.fail(
raise RuntimeError(
'peer never triggered local `[Context]Cancelled`?!?'
)
# NOTE: cancellation of the (sleeper) peer should always # NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming # cause a `ContextCancelled` raise in this streaming
# actor. # actor.
except ContextCancelled as ctxerr: except ContextCancelled as ctxerr:
assert ctxerr.canceller == 'canceller' err = ctxerr
assert ctxerr._remote_error is ctxerr assert peer_ctx._remote_error is ctxerr
assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
# XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task
# was started in, exits.
assert not ctx.cancelled_caught
# we never requested cancellation
assert not peer_ctx.cancel_called
# the `.open_context()` exit definitely caught
# a cancellation in the internal `Context._scope` since
# likely the runtime called `_deliver_msg()` after
# receiving the remote error from the streaming task.
assert peer_ctx.cancelled_caught
# TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside
# `.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last
# checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368
# root/parent actor task should NEVER HAVE cancelled us!
assert not ctx.canceller
assert 'canceller' in peer_ctx.canceller
# CASE 1: we were cancelled by our parent, the root actor.
# TODO: there are other cases depending on how the root
# actor and it's caller side task are written:
# - if the root does not req us to cancel then an
# IPC-transport related error should bubble from the async
# for loop and thus cause local cancellation both here
# and in the root (since in that case this task cancels the
# context with the root, not the other way around)
assert ctx.cancel_called_remote[0] == 'root'
raise raise
# TODO: IN THEORY we could have other cases depending on
# who cancels first, the root actor or the canceller peer?.
#
# 1- when the peer request is first then the `.canceller`
# field should obvi be set to the 'canceller' uid,
#
# 2-if the root DOES req cancel then we should see the same
# `trio.Cancelled` implicitly raised
# assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper'
# except BaseException as err: raise RuntimeError(
'peer never triggered local `ContextCancelled`?'
)
# raise
# cases:
# - some arbitrary remote peer cancels via Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg.
# - peer spawned a sub-actor which (also) spawned a failing task
# which was unhandled and propagated up to the immediate
# parent, the peer to the actor that also spawned a remote task
# task in that same peer-parent.
# - peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
@pytest.mark.parametrize( @pytest.mark.parametrize(
'error_during_ctxerr_handling', 'error_during_ctxerr_handling',
@ -251,8 +296,8 @@ def test_peer_canceller(
line and be less indented. line and be less indented.
.actor0> ()-> .actor1> .actor0> ()-> .actor1>
a inter-actor task context opened (by `async with `Portal.open_context()`) a inter-actor task context opened (by `async with
from actor0 *into* actor1. `Portal.open_context()`) from actor0 *into* actor1.
.actor0> ()<=> .actor1> .actor0> ()<=> .actor1>
a inter-actor task context opened (as above) a inter-actor task context opened (as above)
@ -287,11 +332,12 @@ def test_peer_canceller(
5. .canceller> ()-> .sleeper> 5. .canceller> ()-> .sleeper>
- calls `Portal.cancel_actor()` - calls `Portal.cancel_actor()`
''' '''
async def main(): async def main():
async with tractor.open_nursery() as an: async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
# debug_mode=True
) as an:
canceller: Portal = await an.start_actor( canceller: Portal = await an.start_actor(
'canceller', 'canceller',
enable_modules=[__name__], enable_modules=[__name__],
@ -305,10 +351,13 @@ def test_peer_canceller(
enable_modules=[__name__], enable_modules=[__name__],
) )
root = tractor.current_actor()
try: try:
async with ( async with (
sleeper.open_context( sleeper.open_context(
sleep_forever, sleep_forever,
expect_ctxc=True,
) as (sleeper_ctx, sent), ) as (sleeper_ctx, sent),
just_caller.open_context( just_caller.open_context(
@ -335,16 +384,15 @@ def test_peer_canceller(
'Context.result() did not raise ctx-cancelled?' 'Context.result() did not raise ctx-cancelled?'
) )
# TODO: not sure why this isn't catching # should always raise since this root task does
# but maybe we need an `ExceptionGroup` and # not request the sleeper cancellation ;)
# the whole except *errs: thinger in 3.11?
except ContextCancelled as ctxerr: except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}') print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not # canceller and caller peers should not
# have been remotely cancelled. # have been remotely cancelled.
assert canceller_ctx.cancel_called_remote is None assert canceller_ctx.canceller is None
assert caller_ctx.cancel_called_remote is None assert caller_ctx.canceller is None
assert ctxerr.canceller[0] == 'canceller' assert ctxerr.canceller[0] == 'canceller'
@ -355,16 +403,14 @@ def test_peer_canceller(
# block it should be. # block it should be.
assert not sleeper_ctx.cancelled_caught assert not sleeper_ctx.cancelled_caught
# TODO: a test which ensures this error is
# bubbled and caught (NOT MASKED) by the
# runtime!!!
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown') raise RuntimeError('Simulated error during teardown')
raise raise
# SHOULD NEVER GET HERE! # XXX SHOULD NEVER EVER GET HERE XXX
except BaseException: except BaseException as berr:
err = berr
pytest.fail('did not rx ctx-cancelled error?') pytest.fail('did not rx ctx-cancelled error?')
else: else:
pytest.fail('did not rx ctx-cancelled error?') pytest.fail('did not rx ctx-cancelled error?')
@ -375,6 +421,20 @@ def test_peer_canceller(
)as ctxerr: )as ctxerr:
_err = ctxerr _err = ctxerr
# NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
# - `.cancel_called` (bool of whether this side
# requested)
# - `.canceller` (uid of cancel-causing actor-task)
# - `._remote_error` (any `RemoteActorError`
# instance from other side of context)
# TODO: are we really planning to use this tho?
# - `._cancel_msg` (any msg that caused the
# cancel)
# CASE: error raised during handling of
# `ContextCancelled` inside `.open_context()`
# block
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError) assert isinstance(ctxerr, RuntimeError)
@ -384,20 +444,42 @@ def test_peer_canceller(
for ctx in ctxs: for ctx in ctxs:
assert ctx.cancel_called assert ctx.cancel_called
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# this root actor task should have # this root actor task should have
# cancelled all opened contexts except # cancelled all opened contexts except the
# the sleeper which is cancelled by its # sleeper which is obvi by the "canceller"
# peer "canceller" # peer.
if ctx is not sleeper_ctx: re = ctx._remote_error
assert ctx._remote_error.canceller[0] == 'root' if (
ctx is sleeper_ctx
or ctx is caller_ctx
):
assert (
re.canceller
==
ctx.canceller
==
canceller.channel.uid
)
else:
assert (
re.canceller
==
ctx.canceller
==
root.uid
)
# CASE: standard teardown inside in `.open_context()` block
else: else:
assert ctxerr.canceller[0] == 'canceller' assert ctxerr.canceller == sleeper_ctx.canceller
assert (
ctxerr.canceller[0]
==
sleeper_ctx.canceller[0]
==
'canceller'
)
# the sleeper's remote error is the error bubbled # the sleeper's remote error is the error bubbled
# out of the context-stack above! # out of the context-stack above!
@ -405,18 +487,43 @@ def test_peer_canceller(
assert re is ctxerr assert re is ctxerr
for ctx in ctxs: for ctx in ctxs:
re: BaseException | None = ctx._remote_error
assert re
# root doesn't cancel sleeper since it's
# cancelled by its peer.
if ctx is sleeper_ctx: if ctx is sleeper_ctx:
assert not ctx.cancel_called assert not ctx.cancel_called
# since sleeper_ctx.result() IS called
# above we should have (silently)
# absorbed the corresponding
# `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught`
# should trigger!
assert ctx.cancelled_caught assert ctx.cancelled_caught
elif ctx is caller_ctx:
# since its context was remotely
# cancelled, we never needed to
# call `Context.cancel()` bc it was
# done by the peer and also we never
assert ctx.cancel_called
# TODO: figure out the details of
# this..
# if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught
else: else:
assert ctx.cancel_called assert ctx.cancel_called
assert not ctx.cancelled_caught assert not ctx.cancelled_caught
# each context should have received # TODO: do we even need this flag?
# -> each context should have received
# a silently absorbed context cancellation # a silently absorbed context cancellation
# from its peer actor's task. # in its remote nursery scope.
assert ctx.chan.uid == ctx.cancel_called_remote # assert ctx.chan.uid == ctx.canceller
# NOTE: when an inter-peer cancellation # NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this # occurred, we DO NOT expect this
@ -434,9 +541,7 @@ def test_peer_canceller(
# including the case where ctx-cancel handling # including the case where ctx-cancel handling
# itself errors. # itself errors.
assert sleeper_ctx.cancelled_caught assert sleeper_ctx.cancelled_caught
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
# await tractor.pause()
raise # always to ensure teardown raise # always to ensure teardown
if error_during_ctxerr_handling: if error_during_ctxerr_handling:

View File

@ -211,10 +211,14 @@ async def find_actor(
# 'Gathered portals:\n' # 'Gathered portals:\n'
# f'{portals}' # f'{portals}'
# ) # )
if not portals: # NOTE: `gather_contexts()` will return a
# `tuple[None, None, ..., None]` if no contact
# can be made with any regstrar at any of the
# N provided addrs!
if not any(portals):
if raise_on_none: if raise_on_none:
raise RuntimeError( raise RuntimeError(
f'No {name} found registered @ {registry_addrs}' f'No actor "{name}" found registered @ {registry_addrs}'
) )
yield None yield None
return return

View File

@ -294,9 +294,11 @@ class Channel:
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# remote (peer) cancellation of the far end actor runtime. # flag set by ``Portal.cancel_actor()`` indicating remote
self._cancel_called: bool = False # set on ``Portal.cancel_actor()`` # (possibly peer) cancellation of the far end actor
# runtime.
self._cancel_called: bool = False
@classmethod @classmethod
def from_stream( def from_stream(

View File

@ -100,7 +100,7 @@ def parse_maddr(
multiaddr: str, multiaddr: str,
) -> dict[str, str | int | dict]: ) -> dict[str, str | int | dict]:
''' '''
Parse a libp2p style "multiaddress" into it's distinct protocol Parse a libp2p style "multiaddress" into its distinct protocol
segments where each segment is of the form: segments where each segment is of the form:
`../<protocol>/<param0>/<param1>/../<paramN>` `../<protocol>/<param0>/<param1>/../<paramN>`

View File

@ -48,6 +48,7 @@ from ._exceptions import (
unpack_error, unpack_error,
NoResult, NoResult,
ContextCancelled, ContextCancelled,
MessagingError,
) )
from ._context import Context from ._context import Context
from ._streaming import MsgStream from ._streaming import MsgStream
@ -71,11 +72,6 @@ def _unwrap_msg(
raise unpack_error(msg, channel) from None raise unpack_error(msg, channel) from None
# TODO: maybe move this to ._exceptions?
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
class Portal: class Portal:
''' '''
A 'portal' to a memory-domain-separated `Actor`. A 'portal' to a memory-domain-separated `Actor`.
@ -220,14 +216,18 @@ class Portal:
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield # XXX: sure would be nice to make this work with
# a proper shield
with trio.move_on_after( with trio.move_on_after(
timeout timeout
or self.cancel_timeout or self.cancel_timeout
) as cs: ) as cs:
cs.shield = True cs.shield = True
await self.run_from_ns('self', 'cancel') await self.run_from_ns(
'self',
'cancel',
)
return True return True
if cs.cancelled_caught: if cs.cancelled_caught:
@ -462,10 +462,14 @@ class Portal:
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
# ``Context.started()`` call. # ``Context.started()`` call.
first = msg['started'] first: Any = msg['started']
ctx._started_called: bool = True ctx._started_called: bool = True
except KeyError: except KeyError:
# TODO: can we maybe factor this into the new raiser
# `_streaming._raise_from_no_yield_msg()` and make that
# helper more generic, say with a `_no_<blah>_msg()`?
if not (cid := msg.get('cid')): if not (cid := msg.get('cid')):
raise MessagingError( raise MessagingError(
'Received internal error at context?\n' 'Received internal error at context?\n'
@ -517,54 +521,102 @@ class Portal:
# started in the ctx nursery. # started in the ctx nursery.
ctx._scope.cancel() ctx._scope.cancel()
# XXX: (maybe) shield/mask context-cancellations that were # XXX NOTE XXX: maybe shield against
# initiated by any of the context's 2 tasks. There are # self-context-cancellation (which raises a local
# subsequently 2 operating cases for a "graceful cancel" # `ContextCancelled`) when requested (via
# of a `Context`: # `Context.cancel()`) by the same task (tree) which entered
# # THIS `.open_context()`.
# 1.*this* side's task called `Context.cancel()`, in
# which case we mask the `ContextCancelled` from bubbling
# to the opener (much like how `trio.Nursery` swallows
# any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
# #
# 2.*the other* side's (callee/spawned) task cancelled due # NOTE: There are 2 operating cases for a "graceful cancel"
# to a self or peer cancellation request in which case we # of a `Context`. In both cases any `ContextCancelled`
# DO let the error bubble to the opener. # raised in this scope-block came from a transport msg
# relayed from some remote-actor-task which our runtime set
# as a `Context._remote_error`
#
# the CASES:
#
# - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit
# without raising.
#
# - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also
# setting our own ctx's "reason for cancel" to be that
# other context's cancellation condition; we set our
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# Again, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
# CASE 1: this context was never cancelled
# via a local task's call to `Context.cancel()`.
if not ctx._cancel_called:
raise
# CASE 2: context was cancelled by local task calling # CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should # `.cancel()`, we don't raise and the exit block should
# exit silently. # exit silently.
else: if (
ctx._cancel_called
and (
ctxc is ctx._remote_error
or
ctxc.canceller is self.canceller
)
):
log.debug( log.debug(
f'Context {ctx} cancelled gracefully with:\n' f'Context {ctx} cancelled gracefully with:\n'
f'{ctxc}' f'{ctxc}'
) )
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else!
else:
raise
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except ( except (
# - a standard error in the caller/yieldee # CASE 3: standard local error in this caller/yieldee
Exception, Exception,
# - a runtime teardown exception-group and/or # CASES 1 & 2: normally manifested as
# cancellation request from a caller task. # a `Context._scope_nursery` raised
BaseExceptionGroup, # exception-group of,
trio.Cancelled, # 1.-`trio.Cancelled`s, since
# `._scope.cancel()` will have been called and any
# `ContextCancelled` absorbed and thus NOT RAISED in
# any `Context._maybe_raise_remote_err()`,
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error raised in the "callee" side with
# a group only raised if there was any more then one
# task started here in the "caller" in the
# `yield`-ed to task.
BaseExceptionGroup, # since overrun handler tasks may have been spawned
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
scope_err = err scope_err = err
# XXX: request cancel of this context on any error. # XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NOT called in # NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "cancellation requested" case # the `ContextCancelled` "self cancellation absorbed" case
# above. # handled in the block above!
log.cancel( log.cancel(
'Context cancelled for task due to\n' 'Context cancelled for task due to\n'
f'{err}\n' f'{err}\n'
@ -583,7 +635,7 @@ class Portal:
raise # duh raise # duh
# no scope error case # no local scope error, the "clean exit with a result" case.
else: else:
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.info(
@ -597,15 +649,27 @@ class Portal:
# `Context._maybe_raise_remote_err()`) IFF # `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime # a `Context._remote_error` was set by the runtime
# via a call to # via a call to
# `Context._maybe_cancel_and_set_remote_error()` # `Context._maybe_cancel_and_set_remote_error()`.
# which IS SET any time the far end fails and # As per `Context._deliver_msg()`, that error IS
# causes "caller side" cancellation via # ALWAYS SET any time "callee" side fails and causes "caller
# a `ContextCancelled` here. # side" cancellation via a `ContextCancelled` here.
result = await ctx.result() # result = await ctx.result()
log.runtime( try:
f'Context {fn_name} returned value from callee:\n' result = await ctx.result()
f'`{result}`' log.runtime(
) f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
# this task didn't know until final teardown
# / value collection.
scope_err = berr
raise
finally: finally:
# though it should be impossible for any tasks # though it should be impossible for any tasks
@ -655,12 +719,14 @@ class Portal:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose() await ctx._recv_chan.aclose()
# XXX: since we always (maybe) re-raise (and thus also # XXX: we always raise remote errors locally and
# mask runtime machinery related # generally speaking mask runtime-machinery related
# multi-`trio.Cancelled`s) any scope error which was # multi-`trio.Cancelled`s. As such, any `scope_error`
# the underlying cause of this context's exit, add # which was the underlying cause of this context's exit
# different log msgs for each of the (2) cases. # should be stored as the `Context._local_error` and
# used in determining `Context.cancelled_caught: bool`.
if scope_err is not None: if scope_err is not None:
ctx._local_error: BaseException = scope_err
etype: Type[BaseException] = type(scope_err) etype: Type[BaseException] = type(scope_err)
# CASE 2 # CASE 2
@ -690,7 +756,7 @@ class Portal:
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit Bo # exit!
self.actor._contexts.pop( self.actor._contexts.pop(
(self.channel.uid, ctx.cid), (self.channel.uid, ctx.cid),
None, None,

View File

@ -85,6 +85,10 @@ async def open_root_actor(
enable_modules: list | None = None, enable_modules: list | None = None,
rpc_module_paths: list | None = None, rpc_module_paths: list | None = None,
# NOTE: allow caller to ensure that only one registry exists
# and that this call creates it.
ensure_registry: bool = False,
) -> Actor: ) -> Actor:
''' '''
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
@ -206,6 +210,12 @@ async def open_root_actor(
# REGISTRAR # REGISTRAR
if ponged_addrs: if ponged_addrs:
if ensure_registry:
raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: '
'registry socket(s) already bound'
)
# we were able to connect to an arbiter # we were able to connect to an arbiter
logger.info( logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}' f'Registry(s) seem(s) to exist @ {ponged_addrs}'

View File

@ -204,6 +204,21 @@ async def do_hard_kill(
# terminate_after: int = 99999, # terminate_after: int = 99999,
) -> None: ) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
Used in 2 main cases:
- "unknown remote runtime state": a hanging/stalled actor that
isn't responding after sending a (graceful) runtime cancel
request via an IPC msg.
- "cancelled during spawn": a process who's actor runtime was
cancelled before full startup completed (such that
cancel-request-handling machinery was never fully
initialized) and thus a "cancel request msg" is never going
to be handled.
'''
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
@ -219,6 +234,9 @@ async def do_hard_kill(
# and wait for it to exit. If cancelled, kills the process and # and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the # waits for it to finish exiting before propagating the
# cancellation. # cancellation.
#
# This code was originally triggred by ``proc.__aexit__()``
# but now must be called manually.
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
if proc.stdin is not None: if proc.stdin is not None:
await proc.stdin.aclose() await proc.stdin.aclose()
@ -234,10 +252,14 @@ async def do_hard_kill(
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await proc.wait() await proc.wait()
# XXX NOTE XXX: zombie squad dispatch:
# (should ideally never, but) If we do get here it means
# graceful termination of a process failed and we need to
# resort to OS level signalling to interrupt and cancel the
# (presumably stalled or hung) actor. Since we never allow
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}") log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
proc.kill() proc.kill()
@ -252,10 +274,13 @@ async def soft_wait(
portal: Portal, portal: Portal,
) -> None: ) -> None:
# Wait for proc termination but **dont' yet** call '''
# ``trio.Process.__aexit__()`` (it tears down stdio Wait for proc termination but **dont' yet** teardown
# which will kill any waiting remote pdb trace). std-streams (since it will clobber any ongoing pdb REPL
# This is a "soft" (cancellable) join/reap. session). This is our "soft" (and thus itself cancellable)
join/reap on an actor-runtime-in-process.
'''
uid = portal.channel.uid uid = portal.channel.uid
try: try:
log.cancel(f'Soft waiting on actor:\n{uid}') log.cancel(f'Soft waiting on actor:\n{uid}')
@ -278,7 +303,13 @@ async def soft_wait(
await wait_func(proc) await wait_func(proc)
n.cancel_scope.cancel() n.cancel_scope.cancel()
# start a task to wait on the termination of the
# process by itself waiting on a (caller provided) wait
# function which should unblock when the target process
# has terminated.
n.start_soon(cancel_on_proc_deth) n.start_soon(cancel_on_proc_deth)
# send the actor-runtime a cancel request.
await portal.cancel_actor() await portal.cancel_actor()
if proc.poll() is None: # type: ignore if proc.poll() is None: # type: ignore