Compare commits

..

No commits in common. "3f159235374b960d120c55a1be844cd1434ca48b" and "131674eabd76a5b73d45259c9af4bd3d03832133" have entirely different histories.

9 changed files with 173 additions and 426 deletions

View File

@ -13,11 +13,6 @@ from typing import Optional
import pytest
import trio
import tractor
from tractor import (
Actor,
Context,
current_actor,
)
from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
@ -198,6 +193,9 @@ def test_simple_context(
else:
assert await ctx.result() == 'yo'
if not error_parent:
await ctx.cancel()
if pointlessly_open_stream:
async with ctx.open_stream():
if error_parent:
@ -210,15 +208,10 @@ def test_simple_context(
# 'stop' msg to the far end which needs
# to be ignored
pass
else:
if error_parent:
raise error_parent
# cancel AFTER we open a stream
# to avoid a cancel raised inside
# `.open_stream()`
await ctx.cancel()
finally:
# after cancellation
@ -283,7 +276,7 @@ def test_caller_cancels(
assert (
tuple(err.canceller)
==
current_actor().uid
tractor.current_actor().uid
)
async def main():
@ -437,11 +430,9 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
):
'caller context closes without using stream'
async with tractor.open_nursery() as an:
async with tractor.open_nursery() as n:
root: Actor = current_actor()
portal = await an.start_actor(
portal = await n.start_actor(
'ctx_cancelled',
enable_modules=[__name__],
)
@ -449,10 +440,10 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context(
expect_cancelled,
) as (ctx, sent):
assert sent is None
await portal.run(assert_state, value=True)
assert sent is None
# call cancel explicitly
if use_ctx_cancel_method:
@ -463,21 +454,8 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async for msg in stream:
pass
except tractor.ContextCancelled as ctxc:
# XXX: the cause is US since we call
# `Context.cancel()` just above!
assert (
ctxc.canceller
==
current_actor().uid
==
root.uid
)
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
except tractor.ContextCancelled:
raise # XXX: must be propagated to __aexit__
else:
assert 0, "Should have context cancelled?"
@ -494,13 +472,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
await ctx.result()
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
# NO-OP -> since already called above
await ctx.cancel()
# local scope should have absorbed the cancellation
assert ctx.cancelled_caught
assert ctx._remote_error is ctx._local_error
try:
async with ctx.open_stream() as stream:
async for msg in stream:
@ -579,25 +551,19 @@ async def cancel_self(
global _state
_state = True
# since we call this the below `.open_stream()` should always
# error!
await ctx.cancel()
# should inline raise immediately
try:
async with ctx.open_stream():
pass
# except tractor.ContextCancelled:
except RuntimeError:
except tractor.ContextCancelled:
# suppress for now so we can do checkpoint tests below
print('Got expected runtime error for stream-after-cancel')
pass
else:
raise RuntimeError('Context didnt cancel itself?!')
# check that``trio.Cancelled`` is now raised on any further
# checkpoints since the self cancel above will have cancelled
# the `Context._scope.cancel_scope: trio.CancelScope`
# check a real ``trio.Cancelled`` is raised on a checkpoint
try:
with trio.fail_after(0.1):
await trio.sleep_forever()
@ -608,7 +574,6 @@ async def cancel_self(
# should never get here
assert 0
raise RuntimeError('Context didnt cancel itself?!')
@tractor_test
async def test_callee_cancels_before_started():
@ -636,7 +601,7 @@ async def test_callee_cancels_before_started():
ce.type == trio.Cancelled
# the traceback should be informative
assert 'itself' in ce.msgdata['tb_str']
assert 'cancelled itself' in ce.msgdata['tb_str']
# teardown the actor
await portal.cancel_actor()
@ -808,7 +773,7 @@ async def echo_back_sequence(
print(
'EXITING CALLEEE:\n'
f'{ctx.canceller}'
f'{ctx.cancel_called_remote}'
)
return 'yo'
@ -906,7 +871,7 @@ def test_maybe_allow_overruns_stream(
if cancel_ctx:
assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == current_actor().uid
assert tuple(res.canceller) == tractor.current_actor().uid
else:
print(f'RX ROOT SIDE RESULT {res}')

View File

@ -225,7 +225,7 @@ def test_context_spawns_aio_task_that_errors(
await trio.sleep_forever()
return await ctx.result()
return await ctx.result()
if parent_cancels:
# bc the parent made the cancel request,

View File

@ -15,26 +15,6 @@ from tractor import ( # typing
ContextCancelled,
)
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg raised in all open ctxs
# with that peer.
# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which
# (also) spawned a failing task which was unhandled and
# propagated up to the immediate parent - the peer to the actor
# that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
@ -49,30 +29,14 @@ from tractor import ( # typing
@tractor.context
async def sleep_forever(
ctx: Context,
expect_ctxc: bool = False,
) -> None:
'''
Sync the context, open a stream then just sleep.
Allow checking for (context) cancellation locally.
'''
try:
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
except BaseException as berr:
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_ctxc:
assert isinstance(berr, trio.Cancelled)
raise
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
@tractor.context
@ -181,7 +145,6 @@ async def stream_ints(
async with ctx.open_stream() as stream:
for i in itertools.count():
await stream.send(i)
await trio.sleep(0.01)
@tractor.context
@ -198,81 +161,73 @@ async def stream_from_peer(
peer_ctx.open_stream() as stream,
):
await ctx.started()
# XXX QUESTIONS & TODO: for further details around this
# in the longer run..
# https://github.com/goodboy/tractor/issues/368
# XXX TODO: big set of questions for this
# - should we raise `ContextCancelled` or `Cancelled` (rn
# it does latter) and should/could it be implemented
# as a general injection override for `trio` such
# that ANY next checkpoint would raise the "cancel
# error type" of choice?
# - should the `ContextCancelled` bubble from
# all `Context` and `MsgStream` apis wherein it
# prolly makes the most sense to make it
# a `trio.Cancelled` subtype?
# - what about IPC-transport specific errors, should
# they bubble from the async for and trigger
# other special cases?
# NOTE: current ctl flow:
# - stream raises `trio.EndOfChannel` and
# exits the loop
# - `.open_context()` will raise the ctxcanc
# received from the sleeper.
async for msg in stream:
assert msg is not None
print(msg)
# it does that) here?!
# - test the `ContextCancelled` OUTSIDE the
# `.open_context()` call?
try:
async for msg in stream:
print(msg)
except trio.Cancelled:
assert not ctx.cancel_called
assert not ctx.cancelled_caught
assert not peer_ctx.cancel_called
assert not peer_ctx.cancelled_caught
assert 'root' in ctx.cancel_called_remote
raise # XXX MUST NEVER MASK IT!!
with trio.CancelScope(shield=True):
await tractor.pause()
# pass
# pytest.fail(
raise RuntimeError(
'peer never triggered local `[Context]Cancelled`?!?'
)
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxerr:
err = ctxerr
assert peer_ctx._remote_error is ctxerr
assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
# XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task
# was started in, exits.
assert not ctx.cancelled_caught
# we never requested cancellation
assert not peer_ctx.cancel_called
# the `.open_context()` exit definitely caught
# a cancellation in the internal `Context._scope` since
# likely the runtime called `_deliver_msg()` after
# receiving the remote error from the streaming task.
assert peer_ctx.cancelled_caught
# TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside
# `.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last
# checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368
# root/parent actor task should NEVER HAVE cancelled us!
assert not ctx.canceller
assert 'canceller' in peer_ctx.canceller
assert ctxerr.canceller == 'canceller'
assert ctxerr._remote_error is ctxerr
# CASE 1: we were cancelled by our parent, the root actor.
# TODO: there are other cases depending on how the root
# actor and it's caller side task are written:
# - if the root does not req us to cancel then an
# IPC-transport related error should bubble from the async
# for loop and thus cause local cancellation both here
# and in the root (since in that case this task cancels the
# context with the root, not the other way around)
assert ctx.cancel_called_remote[0] == 'root'
raise
# TODO: IN THEORY we could have other cases depending on
# who cancels first, the root actor or the canceller peer?.
#
# 1- when the peer request is first then the `.canceller`
# field should obvi be set to the 'canceller' uid,
#
# 2-if the root DOES req cancel then we should see the same
# `trio.Cancelled` implicitly raised
# assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper'
raise RuntimeError(
'peer never triggered local `ContextCancelled`?'
)
# except BaseException as err:
# raise
# cases:
# - some arbitrary remote peer cancels via Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg.
# - peer spawned a sub-actor which (also) spawned a failing task
# which was unhandled and propagated up to the immediate
# parent, the peer to the actor that also spawned a remote task
# task in that same peer-parent.
# - peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
@pytest.mark.parametrize(
'error_during_ctxerr_handling',
@ -296,8 +251,8 @@ def test_peer_canceller(
line and be less indented.
.actor0> ()-> .actor1>
a inter-actor task context opened (by `async with
`Portal.open_context()`) from actor0 *into* actor1.
a inter-actor task context opened (by `async with `Portal.open_context()`)
from actor0 *into* actor1.
.actor0> ()<=> .actor1>
a inter-actor task context opened (as above)
@ -332,12 +287,11 @@ def test_peer_canceller(
5. .canceller> ()-> .sleeper>
- calls `Portal.cancel_actor()`
'''
async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
# debug_mode=True
) as an:
async with tractor.open_nursery() as an:
canceller: Portal = await an.start_actor(
'canceller',
enable_modules=[__name__],
@ -351,13 +305,10 @@ def test_peer_canceller(
enable_modules=[__name__],
)
root = tractor.current_actor()
try:
async with (
sleeper.open_context(
sleep_forever,
expect_ctxc=True,
) as (sleeper_ctx, sent),
just_caller.open_context(
@ -384,15 +335,16 @@ def test_peer_canceller(
'Context.result() did not raise ctx-cancelled?'
)
# should always raise since this root task does
# not request the sleeper cancellation ;)
# TODO: not sure why this isn't catching
# but maybe we need an `ExceptionGroup` and
# the whole except *errs: thinger in 3.11?
except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not
# have been remotely cancelled.
assert canceller_ctx.canceller is None
assert caller_ctx.canceller is None
assert canceller_ctx.cancel_called_remote is None
assert caller_ctx.cancel_called_remote is None
assert ctxerr.canceller[0] == 'canceller'
@ -403,14 +355,16 @@ def test_peer_canceller(
# block it should be.
assert not sleeper_ctx.cancelled_caught
# TODO: a test which ensures this error is
# bubbled and caught (NOT MASKED) by the
# runtime!!!
if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown')
raise
# XXX SHOULD NEVER EVER GET HERE XXX
except BaseException as berr:
err = berr
# SHOULD NEVER GET HERE!
except BaseException:
pytest.fail('did not rx ctx-cancelled error?')
else:
pytest.fail('did not rx ctx-cancelled error?')
@ -421,20 +375,6 @@ def test_peer_canceller(
)as ctxerr:
_err = ctxerr
# NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
# - `.cancel_called` (bool of whether this side
# requested)
# - `.canceller` (uid of cancel-causing actor-task)
# - `._remote_error` (any `RemoteActorError`
# instance from other side of context)
# TODO: are we really planning to use this tho?
# - `._cancel_msg` (any msg that caused the
# cancel)
# CASE: error raised during handling of
# `ContextCancelled` inside `.open_context()`
# block
if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError)
@ -444,42 +384,20 @@ def test_peer_canceller(
for ctx in ctxs:
assert ctx.cancel_called
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# this root actor task should have
# cancelled all opened contexts except the
# sleeper which is obvi by the "canceller"
# peer.
re = ctx._remote_error
if (
ctx is sleeper_ctx
or ctx is caller_ctx
):
assert (
re.canceller
==
ctx.canceller
==
canceller.channel.uid
)
# cancelled all opened contexts except
# the sleeper which is cancelled by its
# peer "canceller"
if ctx is not sleeper_ctx:
assert ctx._remote_error.canceller[0] == 'root'
else:
assert (
re.canceller
==
ctx.canceller
==
root.uid
)
# CASE: standard teardown inside in `.open_context()` block
else:
assert ctxerr.canceller == sleeper_ctx.canceller
assert (
ctxerr.canceller[0]
==
sleeper_ctx.canceller[0]
==
'canceller'
)
assert ctxerr.canceller[0] == 'canceller'
# the sleeper's remote error is the error bubbled
# out of the context-stack above!
@ -487,43 +405,18 @@ def test_peer_canceller(
assert re is ctxerr
for ctx in ctxs:
re: BaseException | None = ctx._remote_error
assert re
# root doesn't cancel sleeper since it's
# cancelled by its peer.
if ctx is sleeper_ctx:
assert not ctx.cancel_called
# since sleeper_ctx.result() IS called
# above we should have (silently)
# absorbed the corresponding
# `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught`
# should trigger!
assert ctx.cancelled_caught
elif ctx is caller_ctx:
# since its context was remotely
# cancelled, we never needed to
# call `Context.cancel()` bc it was
# done by the peer and also we never
assert ctx.cancel_called
# TODO: figure out the details of
# this..
# if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught
else:
assert ctx.cancel_called
assert not ctx.cancelled_caught
# TODO: do we even need this flag?
# -> each context should have received
# each context should have received
# a silently absorbed context cancellation
# in its remote nursery scope.
# assert ctx.chan.uid == ctx.canceller
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this
@ -541,7 +434,9 @@ def test_peer_canceller(
# including the case where ctx-cancel handling
# itself errors.
assert sleeper_ctx.cancelled_caught
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
# await tractor.pause()
raise # always to ensure teardown
if error_during_ctxerr_handling:

View File

@ -211,14 +211,10 @@ async def find_actor(
# 'Gathered portals:\n'
# f'{portals}'
# )
# NOTE: `gather_contexts()` will return a
# `tuple[None, None, ..., None]` if no contact
# can be made with any regstrar at any of the
# N provided addrs!
if not any(portals):
if not portals:
if raise_on_none:
raise RuntimeError(
f'No actor "{name}" found registered @ {registry_addrs}'
f'No {name} found registered @ {registry_addrs}'
)
yield None
return

View File

@ -294,11 +294,9 @@ class Channel:
self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote
# (possibly peer) cancellation of the far end actor
# runtime.
self._cancel_called: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# remote (peer) cancellation of the far end actor runtime.
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
@classmethod
def from_stream(

View File

@ -100,7 +100,7 @@ def parse_maddr(
multiaddr: str,
) -> dict[str, str | int | dict]:
'''
Parse a libp2p style "multiaddress" into its distinct protocol
Parse a libp2p style "multiaddress" into it's distinct protocol
segments where each segment is of the form:
`../<protocol>/<param0>/<param1>/../<paramN>`

View File

@ -48,7 +48,6 @@ from ._exceptions import (
unpack_error,
NoResult,
ContextCancelled,
MessagingError,
)
from ._context import Context
from ._streaming import MsgStream
@ -72,6 +71,11 @@ def _unwrap_msg(
raise unpack_error(msg, channel) from None
# TODO: maybe move this to ._exceptions?
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
class Portal:
'''
A 'portal' to a memory-domain-separated `Actor`.
@ -216,18 +220,14 @@ class Portal:
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with
# a proper shield
# XXX: sure would be nice to make this work with a proper shield
with trio.move_on_after(
timeout
or self.cancel_timeout
) as cs:
cs.shield = True
await self.run_from_ns(
'self',
'cancel',
)
await self.run_from_ns('self', 'cancel')
return True
if cs.cancelled_caught:
@ -462,14 +462,10 @@ class Portal:
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first: Any = msg['started']
first = msg['started']
ctx._started_called: bool = True
except KeyError:
# TODO: can we maybe factor this into the new raiser
# `_streaming._raise_from_no_yield_msg()` and make that
# helper more generic, say with a `_no_<blah>_msg()`?
if not (cid := msg.get('cid')):
raise MessagingError(
'Received internal error at context?\n'
@ -521,102 +517,54 @@ class Portal:
# started in the ctx nursery.
ctx._scope.cancel()
# XXX NOTE XXX: maybe shield against
# self-context-cancellation (which raises a local
# `ContextCancelled`) when requested (via
# `Context.cancel()`) by the same task (tree) which entered
# THIS `.open_context()`.
# XXX: (maybe) shield/mask context-cancellations that were
# initiated by any of the context's 2 tasks. There are
# subsequently 2 operating cases for a "graceful cancel"
# of a `Context`:
#
# NOTE: There are 2 operating cases for a "graceful cancel"
# of a `Context`. In both cases any `ContextCancelled`
# raised in this scope-block came from a transport msg
# relayed from some remote-actor-task which our runtime set
# as a `Context._remote_error`
#
# the CASES:
#
# - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit
# without raising.
#
# - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also
# setting our own ctx's "reason for cancel" to be that
# other context's cancellation condition; we set our
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# Again, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# 1.*this* side's task called `Context.cancel()`, in
# which case we mask the `ContextCancelled` from bubbling
# to the opener (much like how `trio.Nursery` swallows
# any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
#
# 2.*the other* side's (callee/spawned) task cancelled due
# to a self or peer cancellation request in which case we
# DO let the error bubble to the opener.
except ContextCancelled as ctxc:
scope_err = ctxc
# CASE 1: this context was never cancelled
# via a local task's call to `Context.cancel()`.
if not ctx._cancel_called:
raise
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
if (
ctx._cancel_called
and (
ctxc is ctx._remote_error
or
ctxc.canceller is self.canceller
)
):
else:
log.debug(
f'Context {ctx} cancelled gracefully with:\n'
f'{ctxc}'
)
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else!
else:
raise
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except (
# CASE 3: standard local error in this caller/yieldee
# - a standard error in the caller/yieldee
Exception,
# CASES 1 & 2: normally manifested as
# a `Context._scope_nursery` raised
# exception-group of,
# 1.-`trio.Cancelled`s, since
# `._scope.cancel()` will have been called and any
# `ContextCancelled` absorbed and thus NOT RAISED in
# any `Context._maybe_raise_remote_err()`,
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error raised in the "callee" side with
# a group only raised if there was any more then one
# task started here in the "caller" in the
# `yield`-ed to task.
BaseExceptionGroup, # since overrun handler tasks may have been spawned
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
# - a runtime teardown exception-group and/or
# cancellation request from a caller task.
BaseExceptionGroup,
trio.Cancelled,
KeyboardInterrupt,
) as err:
scope_err = err
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above!
# XXX: request cancel of this context on any error.
# NOTE: `Context.cancel()` is conversely NOT called in
# the `ContextCancelled` "cancellation requested" case
# above.
log.cancel(
'Context cancelled for task due to\n'
f'{err}\n'
@ -635,7 +583,7 @@ class Portal:
raise # duh
# no local scope error, the "clean exit with a result" case.
# no scope error case
else:
if ctx.chan.connected():
log.info(
@ -649,27 +597,15 @@ class Portal:
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
# result = await ctx.result()
try:
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
# this task didn't know until final teardown
# / value collection.
scope_err = berr
raise
# `Context._maybe_cancel_and_set_remote_error()`
# which IS SET any time the far end fails and
# causes "caller side" cancellation via
# a `ContextCancelled` here.
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
finally:
# though it should be impossible for any tasks
@ -719,14 +655,12 @@ class Portal:
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
# XXX: we always raise remote errors locally and
# generally speaking mask runtime-machinery related
# multi-`trio.Cancelled`s. As such, any `scope_error`
# which was the underlying cause of this context's exit
# should be stored as the `Context._local_error` and
# used in determining `Context.cancelled_caught: bool`.
# XXX: since we always (maybe) re-raise (and thus also
# mask runtime machinery related
# multi-`trio.Cancelled`s) any scope error which was
# the underlying cause of this context's exit, add
# different log msgs for each of the (2) cases.
if scope_err is not None:
ctx._local_error: BaseException = scope_err
etype: Type[BaseException] = type(scope_err)
# CASE 2
@ -756,7 +690,7 @@ class Portal:
await maybe_wait_for_debugger()
# FINALLY, remove the context from runtime tracking and
# exit!
# exit Bo
self.actor._contexts.pop(
(self.channel.uid, ctx.cid),
None,

View File

@ -85,10 +85,6 @@ async def open_root_actor(
enable_modules: list | None = None,
rpc_module_paths: list | None = None,
# NOTE: allow caller to ensure that only one registry exists
# and that this call creates it.
ensure_registry: bool = False,
) -> Actor:
'''
Runtime init entry point for ``tractor``.
@ -210,12 +206,6 @@ async def open_root_actor(
# REGISTRAR
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: '
'registry socket(s) already bound'
)
# we were able to connect to an arbiter
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'

View File

@ -204,21 +204,6 @@ async def do_hard_kill(
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.
Used in 2 main cases:
- "unknown remote runtime state": a hanging/stalled actor that
isn't responding after sending a (graceful) runtime cancel
request via an IPC msg.
- "cancelled during spawn": a process who's actor runtime was
cancelled before full startup completed (such that
cancel-request-handling machinery was never fully
initialized) and thus a "cancel request msg" is never going
to be handled.
'''
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
@ -234,9 +219,6 @@ async def do_hard_kill(
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
#
# This code was originally triggred by ``proc.__aexit__()``
# but now must be called manually.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
@ -252,14 +234,10 @@ async def do_hard_kill(
with trio.CancelScope(shield=True):
await proc.wait()
# XXX NOTE XXX: zombie squad dispatch:
# (should ideally never, but) If we do get here it means
# graceful termination of a process failed and we need to
# resort to OS level signalling to interrupt and cancel the
# (presumably stalled or hung) actor. Since we never allow
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and
# into here.
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
proc.kill()
@ -274,13 +252,10 @@ async def soft_wait(
portal: Portal,
) -> None:
'''
Wait for proc termination but **dont' yet** teardown
std-streams (since it will clobber any ongoing pdb REPL
session). This is our "soft" (and thus itself cancellable)
join/reap on an actor-runtime-in-process.
'''
# Wait for proc termination but **dont' yet** call
# ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace).
# This is a "soft" (cancellable) join/reap.
uid = portal.channel.uid
try:
log.cancel(f'Soft waiting on actor:\n{uid}')
@ -303,13 +278,7 @@ async def soft_wait(
await wait_func(proc)
n.cancel_scope.cancel()
# start a task to wait on the termination of the
# process by itself waiting on a (caller provided) wait
# function which should unblock when the target process
# has terminated.
n.start_soon(cancel_on_proc_deth)
# send the actor-runtime a cancel request.
await portal.cancel_actor()
if proc.poll() is None: # type: ignore