Compare commits

..

18 Commits

Author SHA1 Message Date
Tyler Goodlet fdf0c43bfa Type out the full-fledged streaming ex. 2025-03-14 13:40:19 -04:00
Tyler Goodlet f895c96600 Add masked super timeout line to `do_hard_kill()` for would-be runtime hackers 2025-03-14 13:40:19 -04:00
Tyler Goodlet ca1a1476bb Add a first serious inter-peer remote cancel suite
Tests that appropriate `Context` exit state, the relay of
a `ContextCancelled` error and its `.canceller: tuple[str, str]` value
are set when an inter-peer cancellation happens via an "out of band"
request method (in this case using `Portal.cancel_actor()` and that
cancellation is propagated "horizontally" to other peers. Verify that
any such cancellation scenario which also experiences an "error during
`ContextCancelled` handling" DOES NOT result in that further error being
suppressed and that the user's exception bubbles out of the
`Context.open_context()` block(s) appropriately!

Likely more tests to come as well as some factoring of the teardown
state checks where possible.

Pertains to serious testing the major work landing in #357
2025-03-14 13:40:19 -04:00
Tyler Goodlet a7c36a9cbe Tidy/clarify another `._runtime` comment 2025-03-14 13:40:19 -04:00
Tyler Goodlet 22e4b324b1 Get mega-pedantic in `Portal.open_context()`
Specifically in the `.__aexit__()` phase to ensure remote,
runtime-internal, and locally raised error-during-cancelled-handling
exceptions are NEVER masked by a local `ContextCancelled` or any
exception group of `trio.Cancelled`s.

Also adds a ton of details to doc strings including extreme detail
surrounding the `ContextCancelled` raising cases and their processing
inside `.open_context()`'s exception handler blocks.

Details, details:
- internal rename `err`/`_err` stuff to just be `scope_err` since it's
  effectively the error bubbled up from the context's surrounding (and
  cross-actor) "scope".
- always shield `._recv_chan.aclose()` to avoid any `Cancelled` from
  masking the `scope_err` with a runtime related `trio.Cancelled`.
- explicitly catch the specific set of `scope_err: BaseException` that
  we can reasonably expect to handle instead of the catch-all parent
  type including exception groups, cancels and KBIs.
2025-03-14 13:40:18 -04:00
Tyler Goodlet 89ed8b67ff Drop `msg` kwarg from `Context.cancel()`
Well first off, turns out it's never used and generally speaking
doesn't seem to help much with "runtime hacking/debugging"; why would
we need to "fabricate" a msg when `.cancel()` is called to self-cancel?

Also (and since `._maybe_cancel_and_set_remote_error()` now takes an
`error: BaseException` as input and thus expects error-msg unpacking
prior to being called), we now manually set `Context._cancel_msg: dict`
just prior to any remote error assignment - so any case where we would
have fabbed a "cancel msg" near calling `.cancel()`, just do the manual
assign.

In this vein some other subtle changes:
- obviously don't set `._cancel_msg` in `.cancel()` since it's no longer
  an input.
- generally do walrus-style `error := unpack_error()` before applying
  and setting remote error-msg state.
- always raise any `._remote_error` in `.result()` instead of returning
  the exception instance and check before AND after the underlying mem
  chan read.
- add notes/todos around `raise self._remote_error from None` masking of
  (runtime) errors in `._maybe_raise_remote_err()` and use it inside
  `.result()` since we had the inverse duplicate logic there anyway..

Further, this adds and extends a ton of (internal) interface docs and
details comments around the `Context` API including many subtleties
pertaining to calling `._maybe_cancel_and_set_remote_error()`.
2025-03-14 13:37:55 -04:00
Tyler Goodlet 11bbf15817 `._exceptions`: typing and error unpacking updates
Bump type annotations to 3.10+ style throughout module as well as fill
out doc strings a bit. Inside `unpack_error()` pop any `error_dict: dict`
and,
- return `None` early if not found,
- versus pass directly as `**error_dict` to the error constructor
  instead of a double field read.
2025-03-14 13:36:16 -04:00
Tyler Goodlet a18663213a Add comments around diff between `C/context` refs 2025-03-14 13:36:16 -04:00
Tyler Goodlet d4d09b6071 Factor non-yield stream msg processing into helper
Since both `MsgStream.receive()` and `.receive_nowait()` need the same
raising logic when a non-stream msg arrives (so that maybe an
appropriate IPC translated error can be raised) move the `KeyError`
handler code into a new `._streaming._raise_from_no_yield_msg()` func
and call it from both methods to make the error-interface-raising
symmetrical across both methods.
2025-03-14 13:36:16 -04:00
Tyler Goodlet 6d10f0c516 Always raise remote (cancelled) error if set
Previously we weren't raising a remote error if the local scope was
cancelled during a call to `Context.result()` which is problematic if
the caller WAS NOT the requester for said remote cancellation; in that
case we still want a `ContextCancelled` raised with the `.canceller:
str` set to the cancelling actor uid.

Further fix a naming bug where the (seemingly older) `._remote_err` was
being set to such an error instead of `._remote_error` XD
2025-03-14 13:36:16 -04:00
Tyler Goodlet fa9b57bae0 Write more comprehensive `Portal.cancel_actor()` doc str 2025-03-14 13:36:16 -04:00
Tyler Goodlet 81776a6238 Drop pause line from ctx cancel handler block in test 2025-03-14 13:36:16 -04:00
Tyler Goodlet 144d1f4d94 Msg-ified `ContextCancelled`s sub-error type should always be just, its type.. 2025-03-14 13:36:16 -04:00
Tyler Goodlet 51fdf3524c Start inter-peer cancellation test mod
Move over relevant test from the "context semantics" test module which
was already verifying peer-caused-`ContextCancelled.canceller: tuple`
error info and propagation during an inter-peer cancellation scenario.

Also begin a more general set of inter-peer cancellation tests starting
with the simplest case where when a peer is cancelled the parent should
NOT get an "muted" `trio.Cancelled` and instead
a `tractor.ContextCancelled` with a `.canceller: tuple` which points to
the sibling actor which requested the peer cancel.
2025-03-14 13:36:16 -04:00
Tyler Goodlet cff69d07fe Mk `gather_contexts()` support `@acm`s yielding `None`
We were using a `all(<yielded values>)` condition which obviously won't
work if the batched managers yield any non-truthy value. So instead see
the `unwrapped: dict` with the `id(mngrs)` and only unblock once all
values have been filled in to be something that is not that value.
2025-03-14 13:36:16 -04:00
Tyler Goodlet ee94d6d62c Teensie tidy up on actor doc string 2025-03-14 13:36:16 -04:00
Tyler Goodlet 89b84ed6c0 Make `NamespacePath` work on object refs
Detect if the input ref is a non-func (like an `object` instance) in
which case grab its type name using `type()`. Wrap all the name-getting
into a new `_mk_fqpn()` static meth: gets the "fully qualified path
name" and returns path and name in tuple; port other methds to use it.
Refine and update the docs B)
2025-03-14 13:36:16 -04:00
Tyler Goodlet f33f689f34 .log: more correct handling for `get_logger(__name__)` usage 2025-03-14 13:36:16 -04:00
13 changed files with 1081 additions and 372 deletions

View File

@ -65,21 +65,28 @@ async def aggregate(seed):
print("AGGREGATOR COMPLETE!")
# this is the main actor and *arbiter*
async def main():
# a nursery which spawns "actors"
async with tractor.open_nursery(
arbiter_addr=('127.0.0.1', 1616)
) as nursery:
async def main() -> list[int]:
'''
This is the "root" actor's main task's entrypoint.
By default (and if not otherwise specified) that root process
also acts as a "registry actor" / "registrar" on the localhost
for the purposes of multi-actor "service discovery".
'''
# yes, a nursery which spawns `trio`-"actors" B)
nursery: tractor.ActorNursery
async with tractor.open_nursery() as nursery:
seed = int(1e3)
pre_start = time.time()
portal = await nursery.start_actor(
portal: tractor.Portal = await nursery.start_actor(
name='aggregator',
enable_modules=[__name__],
)
stream: tractor.MsgStream
async with portal.open_stream_from(
aggregate,
seed=seed,

View File

@ -1,8 +1,8 @@
'''
``async with ():`` inlined context-stream cancellation testing.
Verify the we raise errors when streams are opened prior to sync-opening
a ``tractor.Context`` beforehand.
Verify the we raise errors when streams are opened prior to
sync-opening a ``tractor.Context`` beforehand.
'''
from contextlib import asynccontextmanager as acm
@ -922,93 +922,3 @@ def test_maybe_allow_overruns_stream(
# if this hits the logic blocks from above are not
# exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
@tractor.context
async def sleep_forever(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
@acm
async def attach_to_sleep_forever():
'''
Cancel a context **before** any underlying error is raised in order
to trigger a local reception of a ``ContextCancelled`` which **should not**
be re-raised in the local surrounding ``Context`` *iff* the cancel was
requested by **this** side of the context.
'''
async with tractor.wait_for_actor('sleeper') as p2:
async with (
p2.open_context(sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(),
):
try:
yield
finally:
# XXX: previously this would trigger local
# ``ContextCancelled`` to be received and raised in the
# local context overriding any local error due to logic
# inside ``_invoke()`` which checked for an error set on
# ``Context._error`` and raised it in a cancellation
# scenario.
# ------
# The problem is you can have a remote cancellation that
# is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we **were not** the side
# of the context to initiate it, i.e.
# ``Context._cancel_called`` should **NOT** have been
# set. The special logic to handle this case is now
# inside ``Context._maybe_raise_from_remote_msg()`` XD
await peer_ctx.cancel()
@tractor.context
async def error_before_started(
ctx: tractor.Context,
) -> None:
'''
This simulates exactly an original bug discovered in:
https://github.com/pikers/piker/issues/244
'''
async with attach_to_sleep_forever():
# send an unserializable type which should raise a type error
# here and **NOT BE SWALLOWED** by the surrounding acm!!?!
await ctx.started(object())
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
Verify that an error raised in a remote context which itself opens
another remote context, which it cancels, does not ovverride the
original error that caused the cancellation of the secondardy
context.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'errorer',
enable_modules=[__name__],
)
await n.start_actor(
'sleeper',
enable_modules=[__name__],
)
async with (
portal.open_context(
error_before_started
) as (ctx, sent),
):
await trio.sleep_forever()
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == TypeError

View File

@ -0,0 +1,451 @@
'''
Codify the cancellation request semantics in terms
of one remote actor cancelling another.
'''
# from contextlib import asynccontextmanager as acm
import itertools
import pytest
import trio
import tractor
from tractor import ( # typing
Portal,
Context,
ContextCancelled,
)
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
# '''
# ...
@tractor.context
async def sleep_forever(
ctx: Context,
) -> None:
'''
Sync the context, open a stream then just sleep.
'''
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
@tractor.context
async def error_before_started(
ctx: Context,
) -> None:
'''
This simulates exactly an original bug discovered in:
https://github.com/pikers/piker/issues/244
Cancel a context **before** any underlying error is raised so
as to trigger a local reception of a ``ContextCancelled`` which
SHOULD NOT be re-raised in the local surrounding ``Context``
*iff* the cancel was requested by **this** (callee) side of
the context.
'''
async with tractor.wait_for_actor('sleeper') as p2:
async with (
p2.open_context(sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(),
):
# NOTE: this WAS inside an @acm body but i factored it
# out and just put it inline here since i don't think
# the mngr part really matters, though maybe it could?
try:
# XXX NOTE XXX: THIS sends an UNSERIALIZABLE TYPE which
# should raise a `TypeError` and **NOT BE SWALLOWED** by
# the surrounding try/finally (normally inside the
# body of some acm)..
await ctx.started(object())
# yield
finally:
# XXX: previously this would trigger local
# ``ContextCancelled`` to be received and raised in the
# local context overriding any local error due to logic
# inside ``_invoke()`` which checked for an error set on
# ``Context._error`` and raised it in a cancellation
# scenario.
# ------
# The problem is you can have a remote cancellation that
# is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we **were not** the side
# of the context to initiate it, i.e.
# ``Context._cancel_called`` should **NOT** have been
# set. The special logic to handle this case is now
# inside ``Context._maybe_raise_from_remote_msg()`` XD
await peer_ctx.cancel()
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
Verify that an error raised in a remote context which itself
opens YET ANOTHER remote context, which it then cancels, does not
override the original error that caused the cancellation of the
secondary context.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'errorer',
enable_modules=[__name__],
)
await n.start_actor(
'sleeper',
enable_modules=[__name__],
)
async with (
portal.open_context(
error_before_started
) as (ctx, sent),
):
await trio.sleep_forever()
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == TypeError
@tractor.context
async def sleep_a_bit_then_cancel_peer(
ctx: Context,
peer_name: str = 'sleeper',
cancel_after: float = .5,
) -> None:
'''
Connect to peer, sleep as per input delay, cancel the peer.
'''
peer: Portal
async with tractor.wait_for_actor(peer_name) as peer:
await ctx.started()
await trio.sleep(cancel_after)
await peer.cancel_actor()
@tractor.context
async def stream_ints(
ctx: Context,
):
await ctx.started()
async with ctx.open_stream() as stream:
for i in itertools.count():
await stream.send(i)
@tractor.context
async def stream_from_peer(
ctx: Context,
peer_name: str = 'sleeper',
) -> None:
peer: Portal
try:
async with (
tractor.wait_for_actor(peer_name) as peer,
peer.open_context(stream_ints) as (peer_ctx, first),
peer_ctx.open_stream() as stream,
):
await ctx.started()
# XXX TODO: big set of questions for this
# - should we raise `ContextCancelled` or `Cancelled` (rn
# it does that) here?!
# - test the `ContextCancelled` OUTSIDE the
# `.open_context()` call?
try:
async for msg in stream:
print(msg)
except trio.Cancelled:
assert not ctx.cancel_called
assert not ctx.cancelled_caught
assert not peer_ctx.cancel_called
assert not peer_ctx.cancelled_caught
assert 'root' in ctx.cancel_called_remote
raise # XXX MUST NEVER MASK IT!!
with trio.CancelScope(shield=True):
await tractor.pause()
# pass
# pytest.fail(
raise RuntimeError(
'peer never triggered local `[Context]Cancelled`?!?'
)
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxerr:
assert ctxerr.canceller == 'canceller'
assert ctxerr._remote_error is ctxerr
# CASE 1: we were cancelled by our parent, the root actor.
# TODO: there are other cases depending on how the root
# actor and it's caller side task are written:
# - if the root does not req us to cancel then an
# IPC-transport related error should bubble from the async
# for loop and thus cause local cancellation both here
# and in the root (since in that case this task cancels the
# context with the root, not the other way around)
assert ctx.cancel_called_remote[0] == 'root'
raise
# except BaseException as err:
# raise
# cases:
# - some arbitrary remote peer cancels via Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg.
# - peer spawned a sub-actor which (also) spawned a failing task
# which was unhandled and propagated up to the immediate
# parent, the peer to the actor that also spawned a remote task
# task in that same peer-parent.
# - peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
@pytest.mark.parametrize(
'error_during_ctxerr_handling',
[False, True],
)
def test_peer_canceller(
error_during_ctxerr_handling: bool,
):
'''
Verify that a cancellation triggered by an in-actor-tree peer
results in a cancelled errors with all other actors which have
opened contexts to that same actor.
legend:
name>
a "play button" that indicates a new runtime instance,
an individual actor with `name`.
.subname>
a subactor who's parent should be on some previous
line and be less indented.
.actor0> ()-> .actor1>
a inter-actor task context opened (by `async with `Portal.open_context()`)
from actor0 *into* actor1.
.actor0> ()<=> .actor1>
a inter-actor task context opened (as above)
from actor0 *into* actor1 which INCLUDES an additional
stream open using `async with Context.open_stream()`.
------ - ------
supervision view
------ - ------
root>
.sleeper> TODO: SOME SYNTAX SHOWING JUST SLEEPING
.just_caller> ()=> .sleeper>
.canceller> ()-> .sleeper>
TODO: how define calling `Portal.cancel_actor()`
In this case a `ContextCancelled` with `.errorer` set to the
requesting actor, in this case 'canceller', should be relayed
to all other actors who have also opened a (remote task)
context with that now cancelled actor.
------ - ------
task view
------ - ------
So there are 5 context open in total with 3 from the root to
its children and 2 from children to their peers:
1. root> ()-> .sleeper>
2. root> ()-> .streamer>
3. root> ()-> .canceller>
4. .streamer> ()<=> .sleep>
5. .canceller> ()-> .sleeper>
- calls `Portal.cancel_actor()`
'''
async def main():
async with tractor.open_nursery() as an:
canceller: Portal = await an.start_actor(
'canceller',
enable_modules=[__name__],
)
sleeper: Portal = await an.start_actor(
'sleeper',
enable_modules=[__name__],
)
just_caller: Portal = await an.start_actor(
'just_caller', # but i just met her?
enable_modules=[__name__],
)
try:
async with (
sleeper.open_context(
sleep_forever,
) as (sleeper_ctx, sent),
just_caller.open_context(
stream_from_peer,
) as (caller_ctx, sent),
canceller.open_context(
sleep_a_bit_then_cancel_peer,
) as (canceller_ctx, sent),
):
ctxs: list[Context] = [
sleeper_ctx,
caller_ctx,
canceller_ctx,
]
try:
print('PRE CONTEXT RESULT')
await sleeper_ctx.result()
# should never get here
pytest.fail(
'Context.result() did not raise ctx-cancelled?'
)
# TODO: not sure why this isn't catching
# but maybe we need an `ExceptionGroup` and
# the whole except *errs: thinger in 3.11?
except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not
# have been remotely cancelled.
assert canceller_ctx.cancel_called_remote is None
assert caller_ctx.cancel_called_remote is None
assert ctxerr.canceller[0] == 'canceller'
# XXX NOTE XXX: since THIS `ContextCancelled`
# HAS NOT YET bubbled up to the
# `sleeper.open_context().__aexit__()` this
# value is not yet set, however outside this
# block it should be.
assert not sleeper_ctx.cancelled_caught
# TODO: a test which ensures this error is
# bubbled and caught (NOT MASKED) by the
# runtime!!!
if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown')
raise
# SHOULD NEVER GET HERE!
except BaseException:
pytest.fail('did not rx ctx-cancelled error?')
else:
pytest.fail('did not rx ctx-cancelled error?')
except (
ContextCancelled,
RuntimeError,
)as ctxerr:
_err = ctxerr
if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError)
# NOTE: this root actor task should have
# called `Context.cancel()` on the
# `.__aexit__()` to every opened ctx.
for ctx in ctxs:
assert ctx.cancel_called
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# this root actor task should have
# cancelled all opened contexts except
# the sleeper which is cancelled by its
# peer "canceller"
if ctx is not sleeper_ctx:
assert ctx._remote_error.canceller[0] == 'root'
else:
assert ctxerr.canceller[0] == 'canceller'
# the sleeper's remote error is the error bubbled
# out of the context-stack above!
re = sleeper_ctx._remote_error
assert re is ctxerr
for ctx in ctxs:
if ctx is sleeper_ctx:
assert not ctx.cancel_called
assert ctx.cancelled_caught
else:
assert ctx.cancel_called
assert not ctx.cancelled_caught
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this
# root-actor-task to have requested a cancel of
# the context since cancellation was caused by
# the "canceller" peer and thus
# `Context.cancel()` SHOULD NOT have been
# called inside
# `Portal.open_context().__aexit__()`.
assert not sleeper_ctx.cancel_called
# XXX NOTE XXX: and see matching comment above but,
# this flag is set only AFTER the `.open_context()`
# has exited and should be set in both outcomes
# including the case where ctx-cancel handling
# itself errors.
assert sleeper_ctx.cancelled_caught
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
# await tractor.pause()
raise # always to ensure teardown
if error_during_ctxerr_handling:
with pytest.raises(RuntimeError) as excinfo:
trio.run(main)
else:
with pytest.raises(ContextCancelled) as excinfo:
trio.run(main)
assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller'

View File

@ -23,8 +23,8 @@ from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._context import (
Context,
context,
Context, # the type
context, # a func-decorator
)
from ._streaming import (
MsgStream,

View File

@ -86,26 +86,51 @@ class Context:
'''
chan: Channel
cid: str
cid: str # "context id", more or less a unique linked-task-pair id
# these are the "feeder" channels for delivering
# message values to the local task from the runtime
# msg processing loop.
# the "feeder" channels for delivering message values to the
# local task from the runtime's msg processing loop.
_recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel
# the "invocation type" of the far end task-entry-point
# function, normally matching a logic block inside
# `._runtime.invoke()`.
_remote_func_type: str | None = None
# only set on the caller side
_portal: Portal | None = None # type: ignore # noqa
# NOTE: (for now) only set (a portal) on the caller side since
# the callee doesn't generally need a ref to one and should
# normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed?
_portal: Portal | None = None
# NOTE: each side of the context has its own cancel scope
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None
_result: Any | int = None
_remote_error: BaseException | None = None
# cancellation state
_cancel_called: bool = False
_cancelled_remote: tuple | None = None
_cancel_msg: str | None = None
_scope: trio.CancelScope | None = None
_cancel_called: bool = False # did WE cancel the far end?
_cancelled_remote: tuple[str, str] | None = None
# NOTE: we try to ensure assignment of a "cancel msg" since
# there's always going to be an "underlying reason" that any
# context was closed due to either a remote side error or
# a call to `.cancel()` which triggers `ContextCancelled`.
_cancel_msg: str | dict | None = None
# NOTE: this state var used by the runtime to determine if the
# `pdbp` REPL is allowed to engage on contexts terminated via
# a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "callee"
# raises the cancellation,
# - `.devx._debug.lock_tty_for_child()` will set it to `False` if
# the global tty-lock has been configured to filter out some
# actors from being able to acquire the debugger lock.
_enter_debugger_on_cancel: bool = True
@property
@ -173,41 +198,76 @@ class Context:
async def _maybe_cancel_and_set_remote_error(
self,
error_msg: dict[str, Any],
error: BaseException,
) -> None:
'''
(Maybe) unpack and raise a msg error into the local scope
nursery for this context.
(Maybe) cancel this local scope due to a received remote
error (normally via an IPC msg) which the actor runtime
routes to this context.
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
Acts as a form of "relay" for a remote error raised in the
corresponding remote task's `Context` wherein the next time
the local task exectutes a checkpoint, a `trio.Cancelled`
will be raised and depending on the type and source of the
original remote error, and whether or not the local task
called `.cancel()` itself prior, an equivalent
`ContextCancelled` or `RemoteActorError` wrapping the
remote error may be raised here by any of,
- `Portal.open_context()`
- `Portal.result()`
- `Context.open_stream()`
- `Context.result()`
when called/closed by actor local task(s).
NOTEs & TODOs:
- It is expected that the caller has previously unwrapped
the remote error using a call to `unpack_error()` and
provides that output exception value as the input
`error` argument here.
- If this is an error message from a context opened by
`Portal.open_context()` we want to interrupt any
ongoing local tasks operating within that `Context`'s
cancel-scope so as to be notified ASAP of the remote
error and engage any caller handling (eg. for
cross-process task supervision).
- In some cases we may want to raise the remote error
immediately since there is no guarantee the locally
operating task(s) will attempt to execute a checkpoint
any time soon; in such cases there are 2 possible
approaches depending on the current task's work and
wrapping "thread" type:
- `trio`-native-and-graceful: only ever wait for tasks
to exec a next `trio.lowlevel.checkpoint()` assuming
that any such task must do so to interact with the
actor runtime and IPC interfaces.
- (NOT IMPLEMENTED) system-level-aggressive: maybe we
could eventually interrupt sync code (invoked using
`trio.to_thread` or some other adapter layer) with
a signal (a custom unix one for example?
https://stackoverflow.com/a/5744185) depending on the
task's wrapping thread-type such that long running
sync code should never cause the delay of actor
supervision tasks such as cancellation and respawn
logic.
'''
# If this is an error message from a context opened by
# ``Portal.open_context()`` we want to interrupt any ongoing
# (child) tasks within that context to be notified of the remote
# error relayed here.
#
# The reason we may want to raise the remote error immediately
# is that there is no guarantee the associated local task(s)
# will attempt to read from any locally opened stream any time
# soon.
#
# NOTE: this only applies when
# ``Portal.open_context()`` has been called since it is assumed
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
error = unpack_error(
error_msg,
self.chan,
)
# XXX: currently this should only be used when
# `Portal.open_context()` has been opened since it's
# assumed that other portal APIs like,
# - `Portal.run()`,
# - `ActorNursery.run_in_actor()`
# do their own error checking at their own call points and
# result processing.
# XXX: set the remote side's error so that after we cancel
# whatever task is the opener of this context it can raise
# that error as the reason.
self._remote_error = error
self._remote_error: BaseException = error
# always record the remote actor's uid since its cancellation
# state is directly linked to ours (the local one).
@ -232,35 +292,25 @@ class Context:
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{error_msg["error"]["tb_str"]}'
f'{error}'
)
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
# YES! this is way better and simpler!
if (
self._scope
):
if self._scope:
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
# self._cancelled_remote = self.chan.uid
self._scope.cancel()
# NOTE: this usage actually works here B)
# from ._debug import breakpoint
# await breakpoint()
# XXX: this will break early callee results sending
# since when `.result()` is finally called, this
# chan will be closed..
# if self._recv_chan:
# await self._recv_chan.aclose()
# this REPL usage actually works here BD
# from .devx._debug import pause
# await pause()
async def cancel(
self,
msg: str | None = None,
timeout: float = 0.616,
# timeout: float = 1000,
) -> None:
'''
@ -270,15 +320,12 @@ class Context:
Timeout quickly in an attempt to sidestep 2-generals...
'''
side = 'caller' if self._portal else 'callee'
if msg:
assert side == 'callee', 'Only callee side can provide cancel msg'
side: str = 'caller' if self._portal else 'callee'
log.cancel(
f'Cancelling {side} side of context to {self.chan.uid}'
)
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
# await _debug.breakpoint()
# breakpoint()
self._cancel_called: bool = True
if side == 'caller':
if not self._portal:
@ -286,12 +333,13 @@ class Context:
"No portal found, this is likely a callee side context"
)
cid = self.cid
cid: str = self.cid
with trio.move_on_after(timeout) as cs:
cs.shield = True
log.cancel(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
f'Cancelling stream {cid} to '
f'{self._portal.channel.uid}'
)
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
@ -310,17 +358,17 @@ class Context:
# if not self._portal.channel.connected():
if not self.chan.connected():
log.cancel(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
'May have failed to cancel remote task '
f'{cid} for {self._portal.channel.uid}'
)
else:
log.cancel(
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
'Timed out on cancel request of remote task '
f'{cid} for {self._portal.channel.uid}'
)
# callee side remote task
else:
self._cancel_msg = msg
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
@ -331,7 +379,6 @@ class Context:
@acm
async def open_stream(
self,
allow_overruns: bool | None = False,
msg_buffer_size: int | None = None,
@ -435,18 +482,19 @@ class Context:
self,
err: Exception,
) -> None:
'''
Maybe raise a remote error depending on who (which task from
which actor) requested a cancellation (if any).
'''
# NOTE: whenever the context's "opener" side (task) **is**
# the side which requested the cancellation (likekly via
# ``Context.cancel()``), we don't want to re-raise that
# cancellation signal locally (would be akin to
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
# whenever ``CancelScope.cancel()`` was called) and instead
# silently reap the expected cancellation "error"-msg.
# if 'pikerd' in err.msgdata['tb_str']:
# # from . import _debug
# # await _debug.breakpoint()
# breakpoint()
# whenever ``CancelScope.cancel()`` was called) and
# instead silently reap the expected cancellation
# "error"-msg.
if (
isinstance(err, ContextCancelled)
and (
@ -457,7 +505,18 @@ class Context:
):
return err
raise err # from None
# NOTE: currently we are masking underlying runtime errors
# which are often superfluous to user handler code. not
# sure if this is still needed / desired for all operation?
# TODO: maybe we can only NOT mask if:
# - [ ] debug mode is enabled or,
# - [ ] a certain log level is set?
# - [ ] consider using `.with_traceback()` to filter out
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
raise err from None
async def result(self) -> Any | Exception:
'''
@ -485,16 +544,12 @@ class Context:
of the remote cancellation.
'''
__tracebackhide__: bool = True
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
# from . import _debug
# await _debug.breakpoint()
re = self._remote_error
if re:
self._maybe_raise_remote_err(re)
return re
if re := self._remote_error:
return self._maybe_raise_remote_err(re)
if (
self._result == id(self)
@ -505,9 +560,9 @@ class Context:
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
msg = await self._recv_chan.receive()
try:
self._result = msg['return']
msg = await self._recv_chan.receive()
self._result: Any = msg['return']
# NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER
@ -516,6 +571,26 @@ class Context:
# await self._recv_chan.aclose()
break
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is the
# (likely) source cause of this local runtime
# task's cancellation.
if re := self._remote_error:
self._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
except KeyError: # as msgerr:
if 'yield' in msg:
@ -529,7 +604,8 @@ class Context:
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
"Received internal error at portal?"
)
err = unpack_error(
msg,
@ -537,9 +613,12 @@ class Context:
) # from msgerr
err = self._maybe_raise_remote_err(err)
self._remote_err = err
self._remote_error = err
return self._remote_error or self._result
if re := self._remote_error:
return self._maybe_raise_remote_err(re)
return self._result
async def started(
self,
@ -548,7 +627,7 @@ class Context:
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
has started and send ``value`` to the other side via IPC.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
@ -556,19 +635,17 @@ class Context:
'''
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")
f'Caller side context {self} can not call started!'
)
elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")
f'called `.started()` twice on context with {self.chan.uid}'
)
await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
async def _drain_overflows(
self,
) -> None:
@ -623,10 +700,21 @@ class Context:
self,
msg: dict,
draining: bool = False,
# draining: bool = False,
) -> bool:
'''
Deliver an IPC msg received from a transport-channel to
this context's underlying mem chan for handling by
user operating tasks; deliver a bool indicating whether the
msg was immediately sent.
If `._allow_overruns == True` (maybe) append the msg to an
"overflow queue" and start a "drainer task" (inside the
`._scope_nursery: trio.Nursery`) which ensures that such
messages are eventually sent if possible.
'''
cid = self.cid
chan = self.chan
uid = chan.uid
@ -637,8 +725,12 @@ class Context:
)
error = msg.get('error')
if error:
await self._maybe_cancel_and_set_remote_error(msg)
if error := unpack_error(
msg,
self.chan,
):
self._cancel_msg = msg
await self._maybe_cancel_and_set_remote_error(error)
if (
self._in_overrun
@ -670,6 +762,7 @@ class Context:
# the sender; the main motivation is that using bp can block the
# msg handling loop which calls into this method!
except trio.WouldBlock:
# XXX: always push an error even if the local
# receiver is in overrun state.
# await self._maybe_cancel_and_set_remote_error(msg)

View File

@ -39,8 +39,11 @@ class ActorFailure(Exception):
class RemoteActorError(Exception):
'''
Remote actor exception bundled locally
'''
# TODO: local recontruction of remote exception deats
"Remote actor exception bundled locally"
def __init__(
self,
message: str,
@ -110,18 +113,24 @@ class AsyncioCancelled(Exception):
def pack_error(
exc: BaseException,
tb=None,
tb: str | None = None,
) -> dict[str, Any]:
"""Create an "error message" for tranmission over
a channel (aka the wire).
"""
) -> dict[str, dict]:
'''
Create an "error message" encoded for wire transport via an IPC
`Channel`; expected to be unpacked on the receiver side using
`unpack_error()` below.
'''
if tb:
tb_str = ''.join(traceback.format_tb(tb))
else:
tb_str = traceback.format_exc()
error_msg = {
error_msg: dict[
str,
str | tuple[str, str]
] = {
'tb_str': tb_str,
'type_str': type(exc).__name__,
'src_actor_uid': current_actor().uid,
@ -139,23 +148,33 @@ def unpack_error(
chan=None,
err_type=RemoteActorError
) -> Exception:
) -> None | Exception:
'''
Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
into a local `RemoteActorError` (subtype).
NOTE: this routine DOES not RAISE the embedded remote error,
which is the responsibilitiy of the caller.
'''
__tracebackhide__ = True
error = msg['error']
__tracebackhide__: bool = True
tb_str = error.get('tb_str', '')
message = f"{chan.uid}\n" + tb_str
type_name = error['type_str']
error_dict: dict[str, dict] | None
if (
error_dict := msg.get('error')
) is None:
# no error field, nothing to unpack.
return None
# retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '')
message: str = f'{chan.uid}\n' + tb_str
type_name: str = error_dict['type_str']
suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled':
err_type = ContextCancelled
suberror_type = RemoteActorError
suberror_type = err_type
else: # try to lookup a suitable local error type
for ns in [
@ -164,18 +183,19 @@ def unpack_error(
eg,
trio,
]:
try:
suberror_type = getattr(ns, type_name)
if suberror_type := getattr(
ns,
type_name,
False,
):
break
except AttributeError:
continue
exc = err_type(
message,
suberror_type=suberror_type,
# unpack other fields into error type init
**msg['error'],
**error_dict,
)
return exc

View File

@ -15,8 +15,12 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Memory boundary "Portals": an API for structured
concurrency linked tasks running in disparate memory domains.
Memory "portal" contruct.
"Memory portals" are both an API and set of IPC wrapping primitives
for managing structured concurrency "cancel-scope linked" tasks
running in disparate virtual memory domains - at least in different
OS processes, possibly on different (hardware) hosts.
'''
from __future__ import annotations
@ -66,20 +70,21 @@ def _unwrap_msg(
raise unpack_error(msg, channel) from None
# TODO: maybe move this to ._exceptions?
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
class Portal:
'''
A 'portal' to a(n) (remote) ``Actor``.
A 'portal' to a memory-domain-separated `Actor`.
A portal is "opened" (and eventually closed) by one side of an
inter-actor communication context. The side which opens the portal
is equivalent to a "caller" in function parlance and usually is
either the called actor's parent (in process tree hierarchy terms)
or a client interested in scheduling work to be done remotely in a
far process.
process which has a separate (virtual) memory domain.
The portal api allows the "caller" actor to invoke remote routines
and receive results through an underlying ``tractor.Channel`` as
@ -89,9 +94,9 @@ class Portal:
like having a "portal" between the seperate actor memory spaces.
'''
# the timeout for a remote cancel request sent to
# a(n) (peer) actor.
cancel_timeout = 0.5
# global timeout for remote cancel requests sent to
# connected (peer) actors.
cancel_timeout: float = 0.5
def __init__(self, channel: Channel) -> None:
self.channel = channel
@ -191,7 +196,15 @@ class Portal:
) -> bool:
'''
Cancel the actor on the other end of this portal.
Cancel the actor runtime (and thus process) on the far
end of this portal.
**NOTE** THIS CANCELS THE ENTIRE RUNTIME AND THE
SUBPROCESS, it DOES NOT just cancel the remote task. If you
want to have a handle to cancel a remote ``tri.Task`` look
at `.open_context()` and the definition of
`._context.Context.cancel()` which CAN be used for this
purpose.
'''
if not self.channel.connected():
@ -385,12 +398,32 @@ class Portal:
) -> AsyncGenerator[tuple[Context, Any], None]:
'''
Open an inter-actor task context.
Open an inter-actor "task context"; a remote task is
scheduled and cancel-scope-state-linked to a `trio.run()` across
memory boundaries in another actor's runtime.
This is a synchronous API which allows for deterministic
setup/teardown of a remote task. The yielded ``Context`` further
allows for opening bidirectional streams, explicit cancellation
and synchronized final result collection. See ``tractor.Context``.
This is an `@acm` API which allows for deterministic setup
and teardown of a remotely scheduled task in another remote
actor. Once opened, the 2 now "linked" tasks run completely
in parallel in each actor's runtime with their enclosing
`trio.CancelScope`s kept in a synced state wherein if
either side errors or cancels an equivalent error is
relayed to the other side via an SC-compat IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "callee" task via a call
to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "callee" task calls
`.started()` in similar style to `trio.Nursery.start()`.
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.result()` api.
The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and
structurred-concurrency-synchronized final result-msg
collection. See ``tractor.Context`` for more details.
'''
# conduct target func method structural checks
@ -423,47 +456,52 @@ class Portal:
)
assert ctx._remote_func_type == 'context'
msg = await ctx._recv_chan.receive()
msg: dict = await ctx._recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first = msg['started']
ctx._started_called = True
ctx._started_called: bool = True
except KeyError:
assert msg.get('cid'), ("Received internal error at context?")
if not (cid := msg.get('cid')):
raise MessagingError(
'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
if msg.get('error'):
# raise kerr from unpack_error(msg, self.channel)
# NOTE: mask the key error with the remote one
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {ctx.cid} was expecting a `started` message'
f' but received a non-error msg:\n{pformat(msg)}'
f'Context for {cid} was expecting a `started` message'
' but received a non-error msg:\n'
f'{pformat(msg)}'
)
_err: BaseException | None = None
ctx._portal: Portal = self
uid: tuple = self.channel.uid
cid: str = ctx.cid
etype: Type[BaseException] | None = None
# deliver context instance and .started() msg value in enter
# tuple.
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException | None = None
try:
async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope
ctx._scope_nursery: trio.Nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
yield ctx, first
# when in allow_ovveruns mode there may be lingering
# overflow sender tasks remaining?
# when in allow_overruns mode there may be
# lingering overflow sender tasks remaining?
if nurse.child_tasks:
# ensure we are in overrun state with
# ``._allow_overruns=True`` bc otherwise
# XXX: ensure we are in overrun state
# with ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery!
if (
not ctx._allow_overruns
@ -471,47 +509,72 @@ class Portal:
):
raise RuntimeError(
'Context has sub-tasks but is '
'not in `allow_overruns=True` Mode!?'
'not in `allow_overruns=True` mode!?'
)
# ensure cancel of all overflow sender tasks
# started in the ctx nursery.
ctx._scope.cancel()
except ContextCancelled as err:
_err = err
# XXX: (maybe) shield/mask context-cancellations that were
# initiated by any of the context's 2 tasks. There are
# subsequently 2 operating cases for a "graceful cancel"
# of a `Context`:
#
# 1.*this* side's task called `Context.cancel()`, in
# which case we mask the `ContextCancelled` from bubbling
# to the opener (much like how `trio.Nursery` swallows
# any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
#
# 2.*the other* side's (callee/spawned) task cancelled due
# to a self or peer cancellation request in which case we
# DO let the error bubble to the opener.
except ContextCancelled as ctxc:
scope_err = ctxc
# swallow and mask cross-actor task context cancels that
# were initiated by *this* side's task.
# CASE 1: this context was never cancelled
# via a local task's call to `Context.cancel()`.
if not ctx._cancel_called:
# XXX: this should NEVER happen!
# from ._debug import breakpoint
# await breakpoint()
raise
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this and the block should exit.
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
else:
log.debug(f'Context {ctx} cancelled gracefully')
log.debug(
f'Context {ctx} cancelled gracefully with:\n'
f'{ctxc}'
)
except (
BaseException,
# - a standard error in the caller/yieldee
Exception,
# more specifically, we need to handle these but not
# sure it's worth being pedantic:
# Exception,
# trio.Cancelled,
# KeyboardInterrupt,
# - a runtime teardown exception-group and/or
# cancellation request from a caller task.
BaseExceptionGroup,
trio.Cancelled,
KeyboardInterrupt,
) as err:
etype = type(err)
scope_err = err
# cancel ourselves on any error.
# XXX: request cancel of this context on any error.
# NOTE: `Context.cancel()` is conversely NOT called in
# the `ContextCancelled` "cancellation requested" case
# above.
log.cancel(
'Context cancelled for task, sending cancel request..\n'
'Context cancelled for task due to\n'
f'{err}\n'
'Sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
try:
await ctx.cancel()
except trio.BrokenResourceError:
log.warning(
@ -520,8 +583,9 @@ class Portal:
f'actor:{uid}'
)
raise
raise # duh
# no scope error case
else:
if ctx.chan.connected():
log.info(
@ -529,10 +593,20 @@ class Portal:
f'task: {cid}\n'
f'actor: {uid}'
)
# XXX NOTE XXX: the below call to
# `Context.result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`
# which IS SET any time the far end fails and
# causes "caller side" cancellation via
# a `ContextCancelled` here.
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned '
f'value from callee `{result}`'
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
finally:
@ -540,22 +614,73 @@ class Portal:
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last
# to avoid premature stream clobbers.
if ctx._recv_chan is not None:
# should we encapsulate this in the context api?
rxchan: trio.ReceiveChannel = ctx._recv_chan
if (
rxchan
# maybe TODO: yes i know the below check is
# touching `trio` memchan internals..BUT, there are
# only a couple ways to avoid a `trio.Cancelled`
# bubbling from the `.aclose()` call below:
#
# - catch and mask it via the cancel-scope-shielded call
# as we are rn (manual and frowned upon) OR,
# - specially handle the case where `scope_err` is
# one of {`BaseExceptionGroup`, `trio.Cancelled`}
# and then presume that the `.aclose()` call will
# raise a `trio.Cancelled` and just don't call it
# in those cases..
#
# that latter approach is more logic, LOC, and more
# convoluted so for now stick with the first
# psuedo-hack-workaround where we just try to avoid
# the shielded call as much as we can detect from
# the memchan's `._closed` state..
#
# XXX MOTIVATION XXX-> we generally want to raise
# any underlying actor-runtime/internals error that
# surfaces from a bug in tractor itself so it can
# be easily detected/fixed AND, we also want to
# minimize noisy runtime tracebacks (normally due
# to the cross-actor linked task scope machinery
# teardown) displayed to user-code and instead only
# displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in
# user/app code on either end of the context.
and not rxchan._closed
):
# XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask
# out any exception group or legit (remote) ctx
# error that sourced from the remote task or its
# runtime.
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
if etype:
# XXX: since we always (maybe) re-raise (and thus also
# mask runtime machinery related
# multi-`trio.Cancelled`s) any scope error which was
# the underlying cause of this context's exit, add
# different log msgs for each of the (2) cases.
if scope_err is not None:
etype: Type[BaseException] = type(scope_err)
# CASE 2
if ctx._cancel_called:
log.cancel(
f'Context {fn_name} cancelled by caller with\n{etype}'
f'Context {fn_name} cancelled by caller with\n'
f'{etype}'
)
elif _err is not None:
# CASE 1
else:
log.cancel(
f'Context for task cancelled by callee with {etype}\n'
f'Context cancelled by callee with {etype}\n'
f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
)
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
@ -564,10 +689,9 @@ class Portal:
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
from ._debug import maybe_wait_for_debugger
await maybe_wait_for_debugger()
# remove the context from runtime tracking
# FINALLY, remove the context from runtime tracking and
# exit Bo
self.actor._contexts.pop(
(self.channel.uid, ctx.cid),
None,

View File

@ -347,14 +347,13 @@ async def _invoke(
and ctx._enter_debugger_on_cancel
)
):
# XXX: is there any case where we'll want to debug IPC
# disconnects as a default?
#
# I can't think of a reason that inspecting
# this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug
# in our transport layer itself? Going to keep this
# open ended for now.
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this
# type of failure will be useful for respawns or
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
@ -448,17 +447,18 @@ class Actor:
(swappable) network protocols.
Each "actor" is ``trio.run()`` scheduled "runtime" composed of many
concurrent tasks in a single thread. The "runtime" tasks conduct
a slew of low(er) level functions to make it possible for message
passing between actors as well as the ability to create new actors
(aka new "runtimes" in new processes which are supervised via
a nursery construct). Each task which sends messages to a task in
a "peer" (not necessarily a parent-child, depth hierarchy)) is able
to do so via an "address", which maps IPC connections across memory
boundaries, and task request id which allows for per-actor
tasks to send and receive messages to specific peer-actor tasks with
which there is an ongoing RPC/IPC dialog.
Each "actor" is ``trio.run()`` scheduled "runtime" composed of
many concurrent tasks in a single thread. The "runtime" tasks
conduct a slew of low(er) level functions to make it possible
for message passing between actors as well as the ability to
create new actors (aka new "runtimes" in new processes which
are supervised via a nursery construct). Each task which sends
messages to a task in a "peer" (not necessarily a parent-child,
depth hierarchy) is able to do so via an "address", which maps
IPC connections across memory boundaries, and a task request id
which allows for per-actor tasks to send and receive messages
to specific peer-actor tasks with which there is an ongoing
RPC/IPC dialog.
'''
# ugh, we need to get rid of this and replace with a "registry" sys

View File

@ -199,6 +199,10 @@ async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much

View File

@ -54,6 +54,60 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or stream._eoc:
log.debug(f"{stream} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error
# case is activated above.
raise src_err
# raise RuntimeError(
# 'Unknown non-yield stream msg?\n'
# f'{msg}'
# )
class MsgStream(trio.abc.Channel):
'''
@ -91,11 +145,20 @@ class MsgStream(trio.abc.Channel):
# delegate directly to underlying mem channel
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
)
async def receive(self):
'''Async receive a single msg from the IPC transport, the next
in sequence for this stream.
'''
Receive a single msg from the IPC transport, the next in
sequence sent by the far end task (possibly in order as
determined by the underlying protocol).
'''
# see ``.aclose()`` for notes on the old behaviour prior to
@ -110,43 +173,12 @@ class MsgStream(trio.abc.Channel):
msg = await self._rx_chan.receive()
return msg['yield']
except KeyError as err:
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10 match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or self._eoc:
log.debug(f"{self} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
self._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await self.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, self._ctx.chan)
else:
raise
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
)
except (
trio.ClosedResourceError, # by self._rx_chan

View File

@ -193,15 +193,39 @@ def get_logger(
'''
log = rlog = logging.getLogger(_root_name)
if name and name != _proj_name:
if (
name
and name != _proj_name
):
# handling for modules that use ``get_logger(__name__)`` to
# avoid duplicate project-package token in msg output
rname, _, tail = name.partition('.')
if rname == _root_name:
name = tail
# NOTE: for handling for modules that use ``get_logger(__name__)``
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor _ipc.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
sub_name: None | str = None
rname, _, sub_name = name.partition('.')
pkgpath, _, modfilename = sub_name.rpartition('.')
# NOTE: for tractor itself never include the last level
# module key in the name such that something like: eg.
# 'tractor.trionics._broadcast` only includes the first
# 2 tokens in the (coloured) name part.
if rname == 'tractor':
sub_name = pkgpath
if _root_name in sub_name:
duplicate, _, sub_name = sub_name.partition('.')
if not sub_name:
log = rlog
else:
log = rlog.getChild(sub_name)
log = rlog.getChild(name)
log.level = rlog.level
# add our actor-task aware adapter which will dynamically look up
@ -254,3 +278,7 @@ def get_console_log(
def get_loglevel() -> str:
return _default_loglevel
# global module logger for tractor itself
log = get_logger('tractor')

View File

@ -43,38 +43,62 @@ Built-in messaging patterns, types, APIs and helpers.
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
from __future__ import annotations
from inspect import isfunction
from pkgutil import resolve_name
class NamespacePath(str):
'''
A serializeable description of a (function) Python object location
described by the target's module path and namespace key meant as
a message-native "packet" to allows actors to point-and-load objects
by absolute reference.
A serializeable description of a (function) Python object
location described by the target's module path and namespace
key meant as a message-native "packet" to allows actors to
point-and-load objects by an absolute ``str`` (and thus
serializable) reference.
'''
_ref: object = None
_ref: object | type | None = None
def load_ref(self) -> object:
def load_ref(self) -> object | type:
if self._ref is None:
self._ref = resolve_name(self)
return self._ref
def to_tuple(
self,
@staticmethod
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
'''
Generate a minial ``str`` pair which describes a python
object's namespace path and object/type name.
) -> tuple[str, str]:
ref = self.load_ref()
return ref.__module__, getattr(ref, '__name__', '')
In more precise terms something like:
- 'py.namespace.path:object_name',
- eg.'tractor.msg:NamespacePath' will be the ``str`` form
of THIS type XD
'''
if (
isinstance(ref, object)
and not isfunction(ref)
):
name: str = type(ref).__name__
else:
name: str = getattr(ref, '__name__')
# fully qualified namespace path, tuple.
fqnp: tuple[str, str] = (
ref.__module__,
name,
)
return fqnp
@classmethod
def from_ref(
cls,
ref,
ref: type | object,
) -> NamespacePath:
return cls(':'.join(
(ref.__module__,
getattr(ref, '__name__', ''))
))
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
return cls(':'.join(fqnp))
def to_tuple(self) -> tuple[str, str]:
return self._mk_fqnp(self.load_ref())

View File

@ -70,6 +70,7 @@ async def _enter_and_wait(
unwrapped: dict[int, T],
all_entered: trio.Event,
parent_exit: trio.Event,
seed: int,
) -> None:
'''
@ -80,7 +81,10 @@ async def _enter_and_wait(
async with mngr as value:
unwrapped[id(mngr)] = value
if all(unwrapped.values()):
if all(
val != seed
for val in unwrapped.values()
):
all_entered.set()
await parent_exit.wait()
@ -91,7 +95,13 @@ async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[Optional[T], ...], None]:
) -> AsyncGenerator[
tuple[
T | None,
...
],
None,
]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
@ -104,7 +114,11 @@ async def gather_contexts(
entered and exited, and cancellation just works.
'''
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)
seed: int = id(mngrs)
unwrapped: dict[int, T | None] = {}.fromkeys(
(id(mngr) for mngr in mngrs),
seed,
)
all_entered = trio.Event()
parent_exit = trio.Event()
@ -116,8 +130,9 @@ async def gather_contexts(
if not mngrs:
raise ValueError(
'input mngrs is empty?\n'
'Did try to use inline generator syntax?'
'`.trionics.gather_contexts()` input mngrs is empty?\n'
'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence type intead!'
)
async with trio.open_nursery() as n:
@ -128,6 +143,7 @@ async def gather_contexts(
unwrapped,
all_entered,
parent_exit,
seed,
)
# deliver control once all managers have started up