Compare commits

...

21 Commits

Author SHA1 Message Date
Tyler Goodlet b22f7dcae0 Resolve remaining debug-request race causing hangs
More or less by pedantically separating and managing root and subactor
request syncing events to always be managed by the locking IPC context
task-funcs:
- for the root's "child"-side, `lock_tty_for_child()` directly creates
  and sets a new `Lock.req_handler_finished` inside a `finally:`
- for the sub's "parent"-side, `request_root_stdio_lock()` does the same
  with a new `DebugStatus.req_finished` event and separates it from
  the `.repl_release` event (which indicates a "c" or "q" from user and
  thus exit of the REPL session) as well as sets a new `.req_task:
  trio.Task` to explicitly distinguish from the app-user-task that
  enters the REPL vs. the paired bg task used to request the global
  root's stdio mutex alongside it.
- apply the `__pld_spec__` on "child"-side of the ctx using the new
  `Portal.open_context(pld_spec)` parameter support; drops use of any
  `ContextVar` malarky used prior for `PldRx` mgmt.
- removing `Lock.no_remote_has_tty` since it was a nebulous name and
  from the prior "everything is in a `Lock`" design..

------ - ------

More rigorous impl to handle various edge cases in `._pause()`:
- rejig `_enter_repl_sync()` to wrap the `debug_func == None` case
  inside maybe-internal-error handler blocks.
- better logic for recurrent vs. multi-task contention for REPL entry in
  subactors, by guarding using `DebugStatus.req_task` and by now waiting
  on the new `DebugStatus.req_finished` for the multi-task contention
  case.
- even better internal error handling and reporting for when this code
  is hacked on and possibly broken ;p

------ - ------

Updates to `.pause_from_sync()` support:
- add optional `actor`, `task` kwargs to `_set_trace()` to allow
  compat with the new explicit `debug_func` calling in `._pause()` and
  pass a `threading.Thread` for `task` in the `.to_thread()` usage case.
- add an `except` block that tries to show the frame on any internal
  error.

------ - ------

Relatedly includes a buncha cleanups/simplifications somewhat in
prep for some coming refinements (around `DebugStatus`):
- use all the new attrs mentioned above as needed in the SIGINT shielder.
- wait on `Lock.req_handler_finished` in `maybe_wait_for_debugger()`.
- dropping a ton of masked legacy code left in during the recent reworks.
- better comments, like on the use of `Context._scope` for shielding on
  the "child"-side to avoid the need to manage yet another cs.
- add/change-to lotsa `log.devx()` level emissions for those infos which
  are handy while hacking on the debugger but not ideal/necessary to be
  user visible.
- obvi add lotsa follow up todo notes!
2024-05-21 10:19:41 -04:00
Tyler Goodlet fde62c72be Show runtime nursery frames on internal errors
Much like other recent changes attempt to detect runtime-bug-causing
crashes and only show the runtime-endpoint frame when present.

Adds a `ActorNursery._scope_error: BaseException|None` attr to aid with
detection. Also toss in some todo notes for removing and replacing the
`.run_in_actor()` method API.
2024-05-20 17:04:30 -04:00
Tyler Goodlet 4ef77bb64f Set `_ctxvar_Context` for child-side RPC tasks
Just inside `._invoke()` after the `ctx: Context` is retrieved.

Also try our best to *not hide* internal frames when a non-user-code
crash happens, normally either due to a runtime RPC EP bug or
a transport failure.
2024-05-20 16:23:29 -04:00
Tyler Goodlet e78fdf2f69 Make `log.devx()` level below `.pdb()`
Kinda like a "runtime"-y level for `.pdb()` (which is more or less like
an `.info()` for our debugger subsys) which can be used to report
internals info for those hacking on `.devx` tools.

Also, inject only the *last* 6 digits of the `id(Task)` in
`pformat_task_uid()` output by default.
2024-05-20 16:13:57 -04:00
Tyler Goodlet 13bc3c308d Add error suppress flag to `current_ipc_ctx()` 2024-05-20 16:12:51 -04:00
Tyler Goodlet 60fc43e530 Shield channel closing in `_connect_chan()` 2024-05-20 16:11:59 -04:00
Tyler Goodlet 30afcd2b6b Adjust `Portal` usage of `Context.pld_rx`
Pass the new `ipc` arg and try to show api frames when an unexpected
internal error is detected.
2024-05-20 16:07:57 -04:00
Tyler Goodlet c80f020ebc Expose `tractor.current_ipc_ctx()` at pkg level 2024-05-20 15:47:01 -04:00
Tyler Goodlet 262a0e36c6 Allocate a `PldRx` per `Context`, new pld-spec API
Since the state mgmt becomes quite messy with multiple sub-tasks inside
an IPC ctx, AND bc generally speaking the payload-type-spec should map
1-to-1 with the `Context`, it doesn't make a lot of sense to be using
`ContextVar`s to modify the `Context.pld_rx: PldRx` instance.

Instead, always allocate a full instance inside `mk_context()` with the
default `.pld_rx: PldRx` set to use the `msg._ops._def_any_pldec: MsgDec`

In support, simplify the `.msg._ops` impl and APIs:
- drop `_ctxvar_PldRx`, `_def_pld_rx` and `current_pldrx()`.
- rename `PldRx._pldec` -> `._pld_dec`.
- rename the unused `PldRx.apply_to_ipc()` -> `.wraps_ipc()`.
- add a required `PldRx._ctx: Context` attr since it is needed
  internally in some meths and each pld-rx now maps to a specific ctx.
- modify all recv methods to accept a `ipc: Context|MsgStream` (instead
  of a `ctx` arg) since both have a ref to the same `._rx_chan` and there
  are only a couple spots (in `.dec_msg()`) where we need the `ctx`
  explicitly (which can now be easily accessed via a new `MsgStream.ctx`
  property, see below).
- always show the `.dec_msg()` frame in tbs if there's a reference error
  when calling `_raise_from_unexpected_msg()` in the fallthrough case.
- implement `limit_plds()` as light wrapper around getting the
  `current_ipc_ctx()` and mutating its `MsgDec` via
  `Context.pld_rx.limit_plds()`.
- add a `maybe_limit_plds()` which just provides an `@acm` equivalent of
  `limit_plds()` handy for composing in a `async with ():` style block
  (avoiding additional indent levels in the body of async funcs).

Obvi extend the `Context` and `MsgStream` interfaces as needed
to match the above:
- add a `Context.pld_rx` pub prop.
- new private refs to `Context._started_msg: Started` and
  a `._started_pld` (mostly for internal debugging / testing / logging)
  and set inside `.open_context()` immediately after the syncing phase.
- a `Context.has_outcome() -> bool:` predicate which can be used to more
  easily determine if the ctx errored or has a final result.
- pub props for `MsgStream.ctx: Context` and `.chan: Channel` providing
  full `ipc`-arg compat with the `PldRx` method signatures.
2024-05-20 15:46:28 -04:00
Tyler Goodlet d93135acd8 Include truncated `id(trio.Task)` for task info in log header 2024-05-15 09:36:22 -04:00
Tyler Goodlet b23780c102 Make `request_root_stdio_lock()` post-mortem-able
Finally got this working so that if/when an internal bug is introduced
to this request task-func, we can actually REPL-debug the lock request
task itself B)

As in, if the subactor's lock request task internally errors we,
- ensure the task always terminates (by calling `DebugStatus.release()`)
  and explicitly reports (via a `log.exception()`) the internal error.
- capture the error instance and set as a new `DebugStatus.req_err` and
  always check for it on final teardown - in which case we also,
 - ensure it's reraised from a new `DebugRequestError`.
 - unhide the stack frames for `_pause()`, `_enter_repl_sync()` so that
   the dev can upward inspect the `_pause()` call stack sanely.

Supporting internal impl changes,
- add `DebugStatus.cancel()` and `.req_err`.
- don't ever cancel the request task from
  `PdbREPL.set_[continue/quit]()` only when there's some internal error
  that would likely result in a hang and stale lock state with the root.
- only release the root's lock when the current ask is also the owner
  (avoids bad release errors).
- also show internal `._pause()`-related frames on any `repl_err`.

Other temp-dev-tweaks,
- make pld-dec change log msgs info level again while solving this
  final context-vars race stuff..
- drop the debug pld-dec instance match asserts for now since
  the problem is already caught (and now debug-able B) by an attr-error
  on the decoded-as-`dict` started msg, and instead add in
  a `log.exception()` trace to see which task is triggering the case
  where the debug `MsgDec` isn't set correctly vs. when we think it's
  being applied.
2024-05-14 21:01:20 -04:00
Tyler Goodlet 31de5f6648 Always release debug request from `._post_mortem()`
Since obviously the thread is likely expected to halt and raise after
the REPL session exits; this was a regression from the prior impl. The
main reason for this is that otherwise the request task will never
unblock if the user steps through the crashed task using 'next' since
the `.do_next()` handler doesn't by default release the request since in
the `.pause()` case this would end the session too early.

Other,
- toss in draft `Pdb.user_exception()`, though doesn't seem to ever
  trigger?
- only release `Lock._debug_lock` when already locked.
2024-05-14 11:39:04 -04:00
Tyler Goodlet 236083b6e4 Rename `.msg.types.Msg` -> `PayloadMsg` 2024-05-10 13:15:45 -04:00
Tyler Goodlet d2dee87b36 Modernize streaming example script
- add typing,
- apply multi-line call style,
- use 'cancel' log level,
- enable debug mode.
2024-05-09 16:51:51 -04:00
Tyler Goodlet 5cb0cc0f0b Update tests for `PldRx` and `Context` changes
Mostly adjustments for the new pld-receiver semantics/shim-layer which
results more often in the direct delivery of `RemoteActorError`s from
IPC API primitives (like `Portal.result()`) instead of being embedded in
an `ExceptionGroup` bundled from an embedded nursery.

Tossed usage of the `debug_mode: bool` fixture to a couple problematic
tests while i was working on them.

Also includes detailed assertion updates to the inter-peer cancellation
suite in terms of,
- `Context.canceller` state correctly matching the true src actor when
  expecting a ctxc.
- any rxed `ContextCancelled` should instance match the `Context._local/remote_error`
  as should the `.msgdata` and `._ipc_msg`.
2024-05-09 16:48:53 -04:00
Tyler Goodlet fc075e96c6 Hide some API frames, port to new `._debug` apis
- start tossing in `__tracebackhide__`s to various eps which don't need
  to show in tbs or in the pdb REPL.
- port final `._maybe_enter_pm()` to pass a `api_frame`.
- start comment-marking up some API eps with `@api_frame`
  in prep for actually using the new frame-stack tracing.
2024-05-09 16:04:34 -04:00
Tyler Goodlet d6ca4771ce Use `.recv_msg_w_pld()` for final `Portal.result()`
Woops, due to a `None` test against the `._final_result`, any actual
final `None` result would be received but not acked as such causing
a spawning test to hang. Fix it by instead receiving and assigning both
a `._final_result_msg: PayloadMsg` and `._final_result_pld`.

NB: as mentioned in many recent comments surrounding this API layer,
really this whole `Portal`-has-final-result interface/semantics should
be entirely removed as should the `ActorNursery.run_in_actor()` API(s).
Instead it should all be replaced by a wrapping "high level" API
(`tractor.hilevel` ?) which combines a task nursery, `Portal.open_context()`
and underlying `Context` APIs + an `outcome.Outcome` to accomplish the
same "run a single task in a spawned actor and return it's result"; aka
a "one-shot-task-actor".
2024-05-09 09:47:13 -04:00
Tyler Goodlet c5a0cfc639 Rename `.msg.types.Msg` -> `PayloadMsg` 2024-05-08 15:07:34 -04:00
Tyler Goodlet f85314ecab Adjust `._runtime` to report `DebugStatus.req_ctx`
- inside the `Actor.cancel()`'s maybe-wait-on-debugger delay,
  report the full debug request status and it's affiliated lock request
  IPC ctx.
- use the new `.req_ctx.chan.uid` to do the local nursery lookup during
  channel teardown handling.
- another couple log fmt tweaks.
2024-05-08 15:06:50 -04:00
Tyler Goodlet c929bc15c9 Add `pexpect` to dev deps for testing 2024-05-08 14:53:10 -04:00
Tyler Goodlet 6690968236 Rework and first draft of `.devx._frame_stack.py`
Proto-ing a little suite of call-stack-frame annotation-for-scanning
sub-systems for the purposes of both,
- the `.devx._debug`er and its
  traceback and frame introspection needs when entering the REPL,
- detailed trace-style logging such that we can explicitly report
  on "which and where" `tractor`'s APIs are used in the "app" code.

Deats:
- change mod name obvi from `._code` and adjust client mod imports.
- using `wrapt` (for perf) implement a `@api_frame` annot decorator
  which both stashes per-call-stack-frame instances of `CallerInfo` in
  a table and marks the function such that API endpoints can be easily
  found via runtime stack scanning despite any internal impl changes.
- add a global `_frame2callerinfo_cache: dict[FrameType, CallerInfo]`
  table for providing the per func-frame info caching.
- Re-implement `CallerInfo` to require less (types of) inputs:
  |_ `_api_func: Callable`, a ref to the (singleton) func def.
  |_ `_api_frame: FrameType` taken from the `@api_frame` marked `tractor`-API
     func's runtime call-stack, from which we can determine the
     app code's `.caller_frame`.
  |_`_caller_frames_up: int|None` allowing the specific `@api_frame` to
    determine "how many frames up" the application / calling code is.
  And, a better set of derived attrs:
  |_`caller_frame: FrameType` which finds and caches the API-eps calling
    frame.
  |_`caller_frame: FrameType` which finds and caches the API-eps calling
- add a new attempt at "getting a method ref from its runtime frame"
  with `get_ns_and_func_from_frame()` using a heuristic that the
  `CodeType.co_qualname: str` should have a "." in it for methods.
  - main issue is still that the func-ref lookup will require searching
    for the method's instance type by name, and that name isn't
    guaranteed to be defined in any particular ns..
   |_rn we try to read it from the `FrameType.f_locals` but that is
     going to obvi fail any time the method is called in a module where
     it's type is not also defined/imported.
  - returns both the ns and the func ref FYI.
2024-05-08 14:51:56 -04:00
25 changed files with 1318 additions and 893 deletions

View File

@ -1,6 +1,11 @@
import time import time
import trio import trio
import tractor import tractor
from tractor import (
ActorNursery,
MsgStream,
Portal,
)
# this is the first 2 actors, streamer_1 and streamer_2 # this is the first 2 actors, streamer_1 and streamer_2
@ -12,14 +17,18 @@ async def stream_data(seed):
# this is the third actor; the aggregator # this is the third actor; the aggregator
async def aggregate(seed): async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream '''
Ensure that the two streams we receive match but only stream
a single set of values to the parent. a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery: '''
portals = [] an: ActorNursery
async with tractor.open_nursery() as an:
portals: list[Portal] = []
for i in range(1, 3): for i in range(1, 3):
# fork point
portal = await nursery.start_actor( # fork/spawn call
portal = await an.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -43,7 +52,11 @@ async def aggregate(seed):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for portal in portals: for portal in portals:
n.start_soon(push_to_chan, portal, send_chan.clone()) n.start_soon(
push_to_chan,
portal,
send_chan.clone(),
)
# close this local task's reference to send side # close this local task's reference to send side
await send_chan.aclose() await send_chan.aclose()
@ -60,7 +73,7 @@ async def aggregate(seed):
print("FINISHED ITERATING in aggregator") print("FINISHED ITERATING in aggregator")
await nursery.cancel() await an.cancel()
print("WAITING on `ActorNursery` to finish") print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!") print("AGGREGATOR COMPLETE!")
@ -75,18 +88,21 @@ async def main() -> list[int]:
''' '''
# yes, a nursery which spawns `trio`-"actors" B) # yes, a nursery which spawns `trio`-"actors" B)
nursery: tractor.ActorNursery an: ActorNursery
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
loglevel='cancel',
debug_mode=True,
) as an:
seed = int(1e3) seed = int(1e3)
pre_start = time.time() pre_start = time.time()
portal: tractor.Portal = await nursery.start_actor( portal: Portal = await an.start_actor(
name='aggregator', name='aggregator',
enable_modules=[__name__], enable_modules=[__name__],
) )
stream: tractor.MsgStream stream: MsgStream
async with portal.open_stream_from( async with portal.open_stream_from(
aggregate, aggregate,
seed=seed, seed=seed,
@ -95,11 +111,12 @@ async def main() -> list[int]:
start = time.time() start = time.time()
# the portal call returns exactly what you'd expect # the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally # as if the remote "aggregate" function was called locally
result_stream = [] result_stream: list[int] = []
async for value in stream: async for value in stream:
result_stream.append(value) result_stream.append(value)
await portal.cancel_actor() cancelled: bool = await portal.cancel_actor()
assert cancelled
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")

View File

@ -55,6 +55,7 @@ xontrib-vox = "^0.0.1"
optional = false optional = false
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pytest = "^8.2.0" pytest = "^8.2.0"
pexpect = "^4.9.0"
# only for xonsh as sh.. # only for xonsh as sh..
xontrib-vox = "^0.0.1" xontrib-vox = "^0.0.1"

View File

@ -97,6 +97,7 @@ def test_ipc_channel_break_during_stream(
examples_dir() / 'advanced_faults' examples_dir() / 'advanced_faults'
/ 'ipc_failure_during_stream.py', / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
consider_namespace_packages=False,
) )
# by def we expect KBI from user after a simulated "hang # by def we expect KBI from user after a simulated "hang

View File

@ -89,17 +89,30 @@ def test_remote_error(reg_addr, args_err):
assert excinfo.value.boxed_type == errtype assert excinfo.value.boxed_type == errtype
else: else:
# the root task will also error on the `.result()` call # the root task will also error on the `Portal.result()`
# so we expect an error from there AND the child. # call so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo: # |_ tho seems like on new `trio` this doesn't always
# happen?
with pytest.raises((
BaseExceptionGroup,
tractor.RemoteActorError,
)) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed errors # ensure boxed errors are `errtype`
for exc in excinfo.value.exceptions: err: BaseException = excinfo.value
if isinstance(err, BaseExceptionGroup):
suberrs: list[BaseException] = err.exceptions
else:
suberrs: list[BaseException] = [err]
for exc in suberrs:
assert exc.boxed_type == errtype assert exc.boxed_type == errtype
def test_multierror(reg_addr): def test_multierror(
reg_addr: tuple[str, int],
):
''' '''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors. more then one actor errors.

View File

@ -444,6 +444,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
infect_asyncio=True, infect_asyncio=True,
fan_out=fan_out, fan_out=fan_out,
) )
# should raise RAE diectly
await portal.result() await portal.result()
trio.run(main) trio.run(main)
@ -461,12 +462,11 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo: with pytest.raises(RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed errors # ensure boxed error type
for exc in excinfo.value.exceptions: excinfo.value.boxed_type == Exception
assert exc.boxed_type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr): def test_trio_closes_early_and_channel_exits(reg_addr):
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
exit_early=True, exit_early=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should trigger remote actor error # should raise RAE diectly
await portal.result() await portal.result()
# should be a quiet exit on a simple channel exit # should be a quiet exit on a simple channel exit
@ -492,15 +492,17 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
aio_raise_err=True, aio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should trigger remote actor error # should trigger RAE directly, not an eg.
await portal.result() await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo: with pytest.raises(
# NOTE: bc we directly wait on `Portal.result()` instead
# of capturing it inside the `ActorNursery` machinery.
expected_exception=RemoteActorError,
) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed errors excinfo.value.boxed_type == Exception
for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception
@tractor.context @tractor.context

View File

@ -55,9 +55,10 @@ from tractor._testing import (
@tractor.context @tractor.context
async def sleep_forever( async def open_stream_then_sleep_forever(
ctx: Context, ctx: Context,
expect_ctxc: bool = False, expect_ctxc: bool = False,
) -> None: ) -> None:
''' '''
Sync the context, open a stream then just sleep. Sync the context, open a stream then just sleep.
@ -67,6 +68,10 @@ async def sleep_forever(
''' '''
try: try:
await ctx.started() await ctx.started()
# NOTE: the below means this child will send a `Stop`
# to it's parent-side task despite that side never
# opening a stream itself.
async with ctx.open_stream(): async with ctx.open_stream():
await trio.sleep_forever() await trio.sleep_forever()
@ -100,7 +105,7 @@ async def error_before_started(
''' '''
async with tractor.wait_for_actor('sleeper') as p2: async with tractor.wait_for_actor('sleeper') as p2:
async with ( async with (
p2.open_context(sleep_forever) as (peer_ctx, first), p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(), peer_ctx.open_stream(),
): ):
# NOTE: this WAS inside an @acm body but i factored it # NOTE: this WAS inside an @acm body but i factored it
@ -204,9 +209,13 @@ async def stream_ints(
@tractor.context @tractor.context
async def stream_from_peer( async def stream_from_peer(
ctx: Context, ctx: Context,
debug_mode: bool,
peer_name: str = 'sleeper', peer_name: str = 'sleeper',
) -> None: ) -> None:
# sanity
assert tractor._state.debug_mode() == debug_mode
peer: Portal peer: Portal
try: try:
async with ( async with (
@ -240,26 +249,54 @@ async def stream_from_peer(
assert msg is not None assert msg is not None
print(msg) print(msg)
# NOTE: cancellation of the (sleeper) peer should always # NOTE: cancellation of the (sleeper) peer should always cause
# cause a `ContextCancelled` raise in this streaming # a `ContextCancelled` raise in this streaming actor.
# actor. except ContextCancelled as _ctxc:
except ContextCancelled as ctxc: ctxc = _ctxc
ctxerr = ctxc
assert peer_ctx._remote_error is ctxerr # print("TRYING TO ENTER PAUSSE!!!")
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata # await tractor.pause(shield=True)
re: ContextCancelled = peer_ctx._remote_error
# XXX YES, bc exact same msg instances # XXX YES XXX, remote error should be unpacked only once!
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg assert (
re
is
peer_ctx.maybe_error
is
ctxc
is
peer_ctx._local_error
)
# NOTE: these errors should all match!
# ------ - ------
# XXX [2024-05-03] XXX
# ------ - ------
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
# where the `Error()` msg was directly raising the ctxc
# instead of just returning up to the caller inside
# `Context.return()` which would results in a diff instance of
# the same remote error bubbling out above vs what was
# already unpacked and set inside `Context.
assert (
peer_ctx._remote_error.msgdata
==
ctxc.msgdata
)
# ^-XXX-^ notice the data is of course the exact same.. so
# the above larger assert makes sense to also always be true!
# XXX NO, bc new one always created for property accesss # XXX YES XXX, bc should be exact same msg instances
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg
# XXX NO XXX, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
# the peer ctx is the canceller even though it's canceller # the peer ctx is the canceller even though it's canceller
# is the "canceller" XD # is the "canceller" XD
assert peer_name in peer_ctx.canceller assert peer_name in peer_ctx.canceller
assert "canceller" in ctxerr.canceller assert "canceller" in ctxc.canceller
# caller peer should not be the cancel requester # caller peer should not be the cancel requester
assert not ctx.cancel_called assert not ctx.cancel_called
@ -283,12 +320,13 @@ async def stream_from_peer(
# TODO / NOTE `.canceller` won't have been set yet # TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside # here because that machinery is inside
# `.open_context().__aexit__()` BUT, if we had # `Portal.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last # a way to know immediately (from the last
# checkpoint) that cancellation was due to # checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see, # a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
# #
# await tractor.pause()
# assert 'canceller' in ctx.canceller # assert 'canceller' in ctx.canceller
# root/parent actor task should NEVER HAVE cancelled us! # root/parent actor task should NEVER HAVE cancelled us!
@ -392,12 +430,13 @@ def test_peer_canceller(
try: try:
async with ( async with (
sleeper.open_context( sleeper.open_context(
sleep_forever, open_stream_then_sleep_forever,
expect_ctxc=True, expect_ctxc=True,
) as (sleeper_ctx, sent), ) as (sleeper_ctx, sent),
just_caller.open_context( just_caller.open_context(
stream_from_peer, stream_from_peer,
debug_mode=debug_mode,
) as (caller_ctx, sent), ) as (caller_ctx, sent),
canceller.open_context( canceller.open_context(
@ -423,10 +462,11 @@ def test_peer_canceller(
# should always raise since this root task does # should always raise since this root task does
# not request the sleeper cancellation ;) # not request the sleeper cancellation ;)
except ContextCancelled as ctxerr: except ContextCancelled as _ctxc:
ctxc = _ctxc
print( print(
'CAUGHT REMOTE CONTEXT CANCEL\n\n' 'CAUGHT REMOTE CONTEXT CANCEL\n\n'
f'{ctxerr}\n' f'{ctxc}\n'
) )
# canceller and caller peers should not # canceller and caller peers should not
@ -437,7 +477,7 @@ def test_peer_canceller(
# we were not the actor, our peer was # we were not the actor, our peer was
assert not sleeper_ctx.cancel_acked assert not sleeper_ctx.cancel_acked
assert ctxerr.canceller[0] == 'canceller' assert ctxc.canceller[0] == 'canceller'
# XXX NOTE XXX: since THIS `ContextCancelled` # XXX NOTE XXX: since THIS `ContextCancelled`
# HAS NOT YET bubbled up to the # HAS NOT YET bubbled up to the
@ -448,7 +488,7 @@ def test_peer_canceller(
# CASE_1: error-during-ctxc-handling, # CASE_1: error-during-ctxc-handling,
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown') raise RuntimeError('Simulated RTE re-raise during ctxc handling')
# CASE_2: standard teardown inside in `.open_context()` block # CASE_2: standard teardown inside in `.open_context()` block
raise raise
@ -513,6 +553,9 @@ def test_peer_canceller(
# should be cancelled by US. # should be cancelled by US.
# #
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
print(f'loc_err: {_loc_err}\n')
assert isinstance(loc_err, RuntimeError)
# since we do a rte reraise above, the # since we do a rte reraise above, the
# `.open_context()` error handling should have # `.open_context()` error handling should have
# raised a local rte, thus the internal # raised a local rte, thus the internal
@ -521,9 +564,6 @@ def test_peer_canceller(
# a `trio.Cancelled` due to a local # a `trio.Cancelled` due to a local
# `._scope.cancel()` call. # `._scope.cancel()` call.
assert not sleeper_ctx._scope.cancelled_caught assert not sleeper_ctx._scope.cancelled_caught
assert isinstance(loc_err, RuntimeError)
print(f'_loc_err: {_loc_err}\n')
# assert sleeper_ctx._local_error is _loc_err # assert sleeper_ctx._local_error is _loc_err
# assert sleeper_ctx._local_error is _loc_err # assert sleeper_ctx._local_error is _loc_err
assert not ( assert not (
@ -560,10 +600,13 @@ def test_peer_canceller(
else: # the other 2 ctxs else: # the other 2 ctxs
assert ( assert (
isinstance(re, ContextCancelled)
and (
re.canceller re.canceller
== ==
canceller.channel.uid canceller.channel.uid
) )
)
# since the sleeper errors while handling a # since the sleeper errors while handling a
# peer-cancelled (by ctxc) scenario, we expect # peer-cancelled (by ctxc) scenario, we expect
@ -811,8 +854,7 @@ async def serve_subactors(
async with open_nursery() as an: async with open_nursery() as an:
# sanity # sanity
if debug_mode: assert tractor._state.debug_mode() == debug_mode
assert tractor._state.debug_mode()
await ctx.started(peer_name) await ctx.started(peer_name)
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
@ -1091,7 +1133,6 @@ def test_peer_spawns_and_cancels_service_subactor(
'-> root checking `client_ctx.result()`,\n' '-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n' f'-> checking that sub-spawn {peer_name} is down\n'
) )
# else:
try: try:
res = await client_ctx.result(hide_tb=False) res = await client_ctx.result(hide_tb=False)

View File

@ -2,7 +2,9 @@
Spawning basics Spawning basics
""" """
from typing import Optional from typing import (
Any,
)
import pytest import pytest
import trio import trio
@ -25,13 +27,11 @@ async def spawn(
async with tractor.open_root_actor( async with tractor.open_root_actor(
arbiter_addr=reg_addr, arbiter_addr=reg_addr,
): ):
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter assert actor.is_arbiter == is_arbiter
data = data_to_pass_down data = data_to_pass_down
if actor.is_arbiter: if actor.is_arbiter:
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
# forks here # forks here
@ -95,7 +95,9 @@ async def test_movie_theatre_convo(start_method):
await portal.cancel_actor() await portal.cancel_actor()
async def cellar_door(return_value: Optional[str]): async def cellar_door(
return_value: str|None,
):
return return_value return return_value
@ -105,16 +107,18 @@ async def cellar_door(return_value: Optional[str]):
) )
@tractor_test @tractor_test
async def test_most_beautiful_word( async def test_most_beautiful_word(
start_method, start_method: str,
return_value return_value: Any,
debug_mode: bool,
): ):
''' '''
The main ``tractor`` routine. The main ``tractor`` routine.
''' '''
with trio.fail_after(1): with trio.fail_after(1):
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=debug_mode,
) as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(
cellar_door, cellar_door,
return_value=return_value, return_value=return_value,

View File

@ -42,6 +42,7 @@ from ._supervise import (
from ._state import ( from ._state import (
current_actor as current_actor, current_actor as current_actor,
is_root_process as is_root_process, is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx,
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled as ContextCancelled, ContextCancelled as ContextCancelled,

View File

@ -41,6 +41,7 @@ from typing import (
Callable, Callable,
Mapping, Mapping,
Type, Type,
TypeAlias,
TYPE_CHECKING, TYPE_CHECKING,
Union, Union,
) )
@ -94,7 +95,7 @@ if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
from ._ipc import MsgTransport from ._ipc import MsgTransport
from .devx._code import ( from .devx._frame_stack import (
CallerInfo, CallerInfo,
) )
@ -155,6 +156,41 @@ class Context:
# payload receiver # payload receiver
_pld_rx: msgops.PldRx _pld_rx: msgops.PldRx
@property
def pld_rx(self) -> msgops.PldRx:
'''
The current `tractor.Context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER its container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been
delivered up from `tractor`'s transport layer but BEFORE the
data is yielded to `tractor` application code.
The "IPC-primitive API" is normally one of a `Context` (this)` or a `MsgStream`
or some higher level API using one of them.
For ex. `pld_data: PayloadT = MsgStream.receive()` implicitly
calls into the stream's parent `Context.pld_rx.recv_pld().` to
receive the latest `PayloadMsg.pld` value.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by the
interchange backend.
- for `msgspec` see <PUTLINKHERE>.
Note that the `PldRx` itself is a per-`Context` instance that
normally only changes when some (sub-)task, on a given "side"
of the IPC ctx (either a "child"-side RPC or inside
a "parent"-side `Portal.open_context()` block), modifies it
using the `.msg._ops.limit_plds()` API.
'''
return self._pld_rx
# full "namespace-path" to target RPC function # full "namespace-path" to target RPC function
_nsf: NamespacePath _nsf: NamespacePath
@ -231,6 +267,8 @@ class Context:
# init and streaming state # init and streaming state
_started_called: bool = False _started_called: bool = False
_started_msg: MsgType|None = None
_started_pld: Any = None
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
@ -623,7 +661,7 @@ class Context:
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n' f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n' f'=> {self.side!r}: {self._actor.uid}\n\n'
f'{error}' f'{error}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
@ -678,7 +716,7 @@ class Context:
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
# f'{pformat(self)}\n' # f'{pformat(self)}\n'
f'{error}\n' f'{error}'
) )
if self._canceller is None: if self._canceller is None:
@ -724,8 +762,10 @@ class Context:
) )
else: else:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
# mk_pdb().set_trace()
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
if ( if (
cs cs
and and
@ -805,6 +845,7 @@ class Context:
# f'{ci.api_nsp}()\n' # f'{ci.api_nsp}()\n'
# ) # )
# TODO: use `.dev._frame_stack` scanning to find caller!
return 'Portal.open_context()' return 'Portal.open_context()'
async def cancel( async def cancel(
@ -1304,17 +1345,6 @@ class Context:
ctx=self, ctx=self,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
for msg in drained_msgs:
# TODO: mask this by default..
if isinstance(msg, Return):
# from .devx import pause
# await pause()
# raise InternalError(
log.warning(
'Final `return` msg should never be drained !?!?\n\n'
f'{msg}\n'
)
drained_status: str = ( drained_status: str = (
'Ctx drained to final outcome msg\n\n' 'Ctx drained to final outcome msg\n\n'
@ -1435,6 +1465,10 @@ class Context:
self._result self._result
) )
@property
def has_outcome(self) -> bool:
return bool(self.maybe_error) or self._final_result_is_set()
# @property # @property
def repr_outcome( def repr_outcome(
self, self,
@ -1637,8 +1671,6 @@ class Context:
) )
if rt_started != started_msg: if rt_started != started_msg:
# TODO: break these methods out from the struct subtype?
# TODO: make that one a mod func too.. # TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__( diff = pretty_struct.Struct.__sub__(
rt_started, rt_started,
@ -1674,6 +1706,8 @@ class Context:
) from verr ) from verr
self._started_called = True self._started_called = True
self._started_msg = started_msg
self._started_pld = value
async def _drain_overflows( async def _drain_overflows(
self, self,
@ -1961,6 +1995,7 @@ async def open_context_from_portal(
portal: Portal, portal: Portal,
func: Callable, func: Callable,
pld_spec: TypeAlias|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will # TODO: if we set this the wrapping `@acm` body will
@ -2026,7 +2061,7 @@ async def open_context_from_portal(
# XXX NOTE XXX: currenly we do NOT allow opening a contex # XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing # with "self" since the local feeder mem-chan processing
# is not built for it. # is not built for it.
if portal.channel.uid == portal.actor.uid: if (uid := portal.channel.uid) == portal.actor.uid:
raise RuntimeError( raise RuntimeError(
'** !! Invalid Operation !! **\n' '** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n' 'Can not open an IPC ctx with the local actor!\n'
@ -2054,6 +2089,21 @@ async def open_context_from_portal(
assert ctx._caller_info assert ctx._caller_info
_ctxvar_Context.set(ctx) _ctxvar_Context.set(ctx)
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=pld_spec,
) as maybe_msgdec,
):
if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the # XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered # `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will # in `._maybe_cancel_and_set_remote_error()` will
@ -2061,25 +2111,23 @@ async def open_context_from_portal(
# -> it's expected that if there is an error in this phase of # -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
first: Any = await ctx._pld_rx.recv_pld( started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ctx=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
passthrough_non_pld_msgs=False,
) )
# from .devx import pause
# await pause()
ctx._started_called: bool = True ctx._started_called: bool = True
ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first
uid: tuple = portal.channel.uid # NOTE: this in an implicit runtime nursery used to,
cid: str = ctx.cid # - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
# placeholder for any exception raised in the runtime ctx._scope_nursery: trio.Nursery = tn
# or by user tasks which cause this context's closure. ctx._scope: trio.CancelScope = tn.cancel_scope
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with trio.open_nursery() as nurse:
# NOTE: used to start overrun queuing tasks
ctx._scope_nursery: trio.Nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope
# deliver context instance and .started() msg value # deliver context instance and .started() msg value
# in enter tuple. # in enter tuple.
@ -2126,13 +2174,13 @@ async def open_context_from_portal(
# when in allow_overruns mode there may be # when in allow_overruns mode there may be
# lingering overflow sender tasks remaining? # lingering overflow sender tasks remaining?
if nurse.child_tasks: if tn.child_tasks:
# XXX: ensure we are in overrun state # XXX: ensure we are in overrun state
# with ``._allow_overruns=True`` bc otherwise # with ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery! # there should be no tasks in this nursery!
if ( if (
not ctx._allow_overruns not ctx._allow_overruns
or len(nurse.child_tasks) > 1 or len(tn.child_tasks) > 1
): ):
raise InternalError( raise InternalError(
'Context has sub-tasks but is ' 'Context has sub-tasks but is '
@ -2304,8 +2352,8 @@ async def open_context_from_portal(
): ):
log.warning( log.warning(
'IPC connection for context is broken?\n' 'IPC connection for context is broken?\n'
f'task:{cid}\n' f'task: {ctx.cid}\n'
f'actor:{uid}' f'actor: {uid}'
) )
raise # duh raise # duh
@ -2455,9 +2503,8 @@ async def open_context_from_portal(
and ctx.cancel_acked and ctx.cancel_acked
): ):
log.cancel( log.cancel(
'Context cancelled by {ctx.side!r}-side task\n' f'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n' f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n' f'{repr(scope_err)}\n'
) )
@ -2485,7 +2532,7 @@ async def open_context_from_portal(
f'cid: {ctx.cid}\n' f'cid: {ctx.cid}\n'
) )
portal.actor._contexts.pop( portal.actor._contexts.pop(
(uid, cid), (uid, ctx.cid),
None, None,
) )
@ -2513,11 +2560,12 @@ def mk_context(
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high! # TODO: only scan caller-info if log level so high!
from .devx._code import find_caller_info from .devx._frame_stack import find_caller_info
caller_info: CallerInfo|None = find_caller_info() caller_info: CallerInfo|None = find_caller_info()
# TODO: when/how do we apply `.limit_plds()` from here? pld_rx = msgops.PldRx(
pld_rx: msgops.PldRx = msgops.current_pldrx() _pld_dec=msgops._def_any_pldec,
)
ctx = Context( ctx = Context(
chan=chan, chan=chan,
@ -2531,13 +2579,16 @@ def mk_context(
_caller_info=caller_info, _caller_info=caller_info,
**kwargs, **kwargs,
) )
pld_rx._ctx = ctx
ctx._result = Unresolved ctx._result = Unresolved
return ctx return ctx
# TODO: use the new type-parameters to annotate this in 3.13? # TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types # -[ ] https://peps.python.org/pep-0718/#unknown-types
def context(func: Callable) -> Callable: def context(
func: Callable,
) -> Callable:
''' '''
Mark an (async) function as an SC-supervised, inter-`Actor`, Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more child-`trio.Task`, IPC endpoint otherwise known more

View File

@ -716,4 +716,5 @@ async def _connect_chan(
chan = Channel((host, port)) chan = Channel((host, port))
await chan.connect() await chan.connect()
yield chan yield chan
with trio.CancelScope(shield=True):
await chan.aclose() await chan.aclose()

View File

@ -47,6 +47,7 @@ from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
# Error, # Error,
PayloadMsg,
NamespacePath, NamespacePath,
Return, Return,
) )
@ -98,7 +99,8 @@ class Portal:
self.chan = channel self.chan = channel
# during the portal's lifetime # during the portal's lifetime
self._final_result: Any|None = None self._final_result_pld: Any|None = None
self._final_result_msg: PayloadMsg|None = None
# When set to a ``Context`` (when _submit_for_result is called) # When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some # it is expected that ``result()`` will be awaited at some
@ -132,7 +134,7 @@ class Portal:
'A pending main result has already been submitted' 'A pending main result has already been submitted'
) )
self._expect_result_ctx = await self.actor.start_remote_task( self._expect_result_ctx: Context = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath(f'{ns}:{func}'), nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs, kwargs=kwargs,
@ -163,13 +165,22 @@ class Portal:
# expecting a "main" result # expecting a "main" result
assert self._expect_result_ctx assert self._expect_result_ctx
if self._final_result is None: if self._final_result_msg is None:
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld( try:
ctx=self._expect_result_ctx, (
self._final_result_msg,
self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
ipc=self._expect_result_ctx,
expect_msg=Return, expect_msg=Return,
) )
except BaseException as err:
# TODO: wrap this into `@api_frame` optionally with
# some kinda filtering mechanism like log levels?
__tracebackhide__: bool = False
raise err
return self._final_result return self._final_result_pld
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
@ -301,7 +312,7 @@ class Portal:
portal=self, portal=self,
) )
return await ctx._pld_rx.recv_pld( return await ctx._pld_rx.recv_pld(
ctx=ctx, ipc=ctx,
expect_msg=Return, expect_msg=Return,
) )
@ -320,6 +331,8 @@ class Portal:
remote rpc task or a local async generator instance. remote rpc task or a local async generator instance.
''' '''
__runtimeframe__: int = 1 # noqa
if isinstance(func, str): if isinstance(func, str):
warnings.warn( warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now" "`Portal.run(namespace: str, funcname: str)` is now"
@ -353,7 +366,7 @@ class Portal:
portal=self, portal=self,
) )
return await ctx._pld_rx.recv_pld( return await ctx._pld_rx.recv_pld(
ctx=ctx, ipc=ctx,
expect_msg=Return, expect_msg=Return,
) )

View File

@ -18,7 +18,7 @@
Root actor runtime ignition(s). Root actor runtime ignition(s).
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import importlib import importlib
import logging import logging
@ -60,7 +60,7 @@ _default_lo_addrs: list[tuple[str, int]] = [(
logger = log.get_logger('tractor') logger = log.get_logger('tractor')
@asynccontextmanager @acm
async def open_root_actor( async def open_root_actor(
*, *,
@ -96,6 +96,7 @@ async def open_root_actor(
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
''' '''
__tracebackhide__ = True
# TODO: stick this in a `@cm` defined in `devx._debug`? # TODO: stick this in a `@cm` defined in `devx._debug`?
# #
# Override the global debugger hook to make it play nice with # Override the global debugger hook to make it play nice with
@ -358,7 +359,11 @@ async def open_root_actor(
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
entered: bool = await _debug._maybe_enter_pm(err) import inspect
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if ( if (
not entered not entered

View File

@ -70,7 +70,6 @@ from .msg import (
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck, CancelAck,
Error, Error,
Msg,
MsgType, MsgType,
Return, Return,
Start, Start,
@ -250,10 +249,17 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# NOTE: we normally always hide this frame in call-stack tracebacks
# if the crash originated from an RPC task (since normally the
# user is only going to care about their own code not this
# internal runtime frame) and we DID NOT
# fail due to an IPC transport error!
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode! # TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn: # async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()` # => see matching comment in side `._debug._pause()`
rpc_err: BaseException|None = None
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -264,16 +270,7 @@ async def _errors_relayed_via_ipc(
BaseExceptionGroup, BaseExceptionGroup,
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
rpc_err = err
# NOTE: always hide this frame from debug REPL call stack
# if the crash originated from an RPC task and we DID NOT
# fail due to an IPC transport error!
if (
is_rpc
and
chan.connected()
):
__tracebackhide__: bool = hide_tb
# TODO: maybe we'll want different "levels" of debugging # TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
@ -318,11 +315,19 @@ async def _errors_relayed_via_ipc(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but
# didn't, maybe there was an internal error in
# the above code and we do want to show this
# frame!
if _state.debug_mode():
__tracebackhide__: bool = False
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
f'|_{ctx}' f'|_{ctx}'
) )
# ALWAYS try to ship RPC errors back to parent/caller task # ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc: if is_rpc:
@ -355,6 +360,20 @@ async def _errors_relayed_via_ipc(
# `Actor._service_n`, we add "handles" to each such that # `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled. # they can be individually ccancelled.
finally: finally:
# if the error is not from user code and instead a failure
# of a runtime RPC or transport failure we do prolly want to
# show this frame
if (
rpc_err
and (
not is_rpc
or
not chan.connected()
)
):
__tracebackhide__: bool = False
try: try:
ctx: Context ctx: Context
func: Callable func: Callable
@ -444,9 +463,10 @@ async def _invoke(
# open the stream with this option. # open the stream with this option.
# allow_overruns=True, # allow_overruns=True,
) )
context: bool = False context_ep_func: bool = False
assert not _state._ctxvar_Context.get() # set the current IPC ctx var for this RPC task
_state._ctxvar_Context.set(ctx)
# TODO: deprecate this style.. # TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
@ -475,7 +495,7 @@ async def _invoke(
# handle decorated ``@tractor.context`` async function # handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False): elif getattr(func, '_tractor_context_function', False):
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
context = True context_ep_func = True
# errors raised inside this block are propgated back to caller # errors raised inside this block are propgated back to caller
async with _errors_relayed_via_ipc( async with _errors_relayed_via_ipc(
@ -501,7 +521,7 @@ async def _invoke(
raise raise
# TODO: impl all these cases in terms of the `Context` one! # TODO: impl all these cases in terms of the `Context` one!
if not context: if not context_ep_func:
await _invoke_non_context( await _invoke_non_context(
actor, actor,
cancel_scope, cancel_scope,
@ -571,7 +591,6 @@ async def _invoke(
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
ctx._scope_nursery = tn ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our # TODO: should would be nice to have our
@ -831,7 +850,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
assert actor._service_n # state sanity assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
@ -844,7 +863,7 @@ async def process_messages(
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: Msg|None = None msg: MsgType|None = None
try: try:
# NOTE: this internal scope allows for keeping this # NOTE: this internal scope allows for keeping this
# message loop running despite the current task having # message loop running despite the current task having

View File

@ -644,7 +644,7 @@ class Actor:
peers_str: str = '' peers_str: str = ''
for uid, chans in self._peers.items(): for uid, chans in self._peers.items():
peers_str += ( peers_str += (
f'|_ uid: {uid}\n' f'uid: {uid}\n'
) )
for i, chan in enumerate(chans): for i, chan in enumerate(chans):
peers_str += ( peers_str += (
@ -678,10 +678,12 @@ class Actor:
# XXX => YES IT DOES, when i was testing ctl-c # XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to # from broken debug TTY locking due to
# msg-spec races on application using RunVar... # msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if ( if (
pdb_user_uid (ctx_in_debug := pdb_lock.ctx_in_debug)
and local_nursery and
(pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
): ):
entry: tuple|None = local_nursery._children.get( entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid) tuple(pdb_user_uid)
@ -1169,13 +1171,17 @@ class Actor:
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
dbcs = _debug.DebugStatus.req_cs debug_req = _debug.DebugStatus
if dbcs is not None: lock_req_ctx: Context = debug_req.req_ctx
if lock_req_ctx is not None:
msg += ( msg += (
'-> Cancelling active debugger request..\n' '-> Cancelling active debugger request..\n'
f'|_{_debug.Lock.pformat()}' f'|_{_debug.Lock.repr()}\n\n'
f'|_{lock_req_ctx}\n\n'
) )
dbcs.cancel() # lock_req_ctx._scope.cancel()
# TODO: wrap this in a method-API..
debug_req.req_cs.cancel()
# self-cancel **all** ongoing RPC tasks # self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks( await self.cancel_rpc_tasks(
@ -1375,15 +1381,17 @@ class Actor:
"IPC channel's " "IPC channel's "
) )
rent_chan_repr: str = ( rent_chan_repr: str = (
f'|_{parent_chan}' f' |_{parent_chan}\n\n'
if parent_chan if parent_chan
else '' else ''
) )
log.cancel( log.cancel(
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n' f'Cancelling {descr} RPC tasks\n\n'
f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n' f'<= canceller: {req_uid}\n'
f' {rent_chan_repr}\n' f'{rent_chan_repr}'
# f'{self}\n' f'=> cancellee: {self.uid}\n'
f' |_{self}.cancel_rpc_tasks()\n'
f' |_tasks: {len(tasks)}\n'
# f'{tasks_str}' # f'{tasks_str}'
) )
for ( for (
@ -1413,7 +1421,7 @@ class Actor:
if tasks: if tasks:
log.cancel( log.cancel(
'Waiting for remaining rpc tasks to complete\n' 'Waiting for remaining rpc tasks to complete\n'
f'|_{tasks}' f'|_{tasks_str}'
) )
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
@ -1466,7 +1474,10 @@ class Actor:
assert self._parent_chan, "No parent channel for this actor?" assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chans(self, uid: tuple[str, str]) -> list[Channel]: def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
''' '''
Return all IPC channels to the actor with provided `uid`. Return all IPC channels to the actor with provided `uid`.
@ -1626,7 +1637,9 @@ async def async_main(
# tranport address bind errors - normally it's # tranport address bind errors - normally it's
# something silly like the wrong socket-address # something silly like the wrong socket-address
# passed via a config or CLI Bo # passed via a config or CLI Bo
entered_debug = await _debug._maybe_enter_pm(oserr) entered_debug = await _debug._maybe_enter_pm(
oserr,
)
if entered_debug: if entered_debug:
log.runtime('Exited debug REPL..') log.runtime('Exited debug REPL..')
raise raise

View File

@ -142,7 +142,9 @@ async def exhaust_portal(
''' '''
__tracebackhide__ = True __tracebackhide__ = True
try: try:
log.debug(f"Waiting on final result from {actor.uid}") log.debug(
f'Waiting on final result from {actor.uid}'
)
# XXX: streams should never be reaped here since they should # XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
@ -195,7 +197,10 @@ async def cancel_on_completion(
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# an exception group and we still send out a cancel request # an exception group and we still send out a cancel request
result: Any|Exception = await exhaust_portal(portal, actor) result: Any|Exception = await exhaust_portal(
portal,
actor,
)
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid]: Exception = result errors[actor.uid]: Exception = result
log.cancel( log.cancel(
@ -503,14 +508,6 @@ async def trio_proc(
) )
) )
# await chan.send({
# '_parent_main_data': subactor._parent_main_data,
# 'enable_modules': subactor.enable_modules,
# 'reg_addrs': subactor.reg_addrs,
# 'bind_addrs': bind_addrs,
# '_runtime_vars': _runtime_vars,
# })
# track subactor in current nursery # track subactor in current nursery
curr_actor: Actor = current_actor() curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
@ -554,8 +551,8 @@ async def trio_proc(
# killing the process too early. # killing the process too early.
if proc: if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}') log.cancel(f'Hard reap sequence starting for {subactor.uid}')
with trio.CancelScope(shield=True):
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if cancelled_during_spawn:
# Try again to avoid TTY clobbering. # Try again to avoid TTY clobbering.

View File

@ -124,9 +124,15 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
) )
def current_ipc_ctx() -> Context: def current_ipc_ctx(
error_on_not_set: bool = False,
) -> Context|None:
ctx: Context = _ctxvar_Context.get() ctx: Context = _ctxvar_Context.get()
if not ctx:
if (
not ctx
and error_on_not_set
):
from ._exceptions import InternalError from ._exceptions import InternalError
raise InternalError( raise InternalError(
'No IPC context has been allocated for this task yet?\n' 'No IPC context has been allocated for this task yet?\n'

View File

@ -52,6 +52,7 @@ from tractor.msg import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._context import Context from ._context import Context
from ._ipc import Channel
log = get_logger(__name__) log = get_logger(__name__)
@ -65,10 +66,10 @@ log = get_logger(__name__)
class MsgStream(trio.abc.Channel): class MsgStream(trio.abc.Channel):
''' '''
A bidirectional message stream for receiving logically sequenced A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC ``Channel``. values over an inter-actor IPC `Channel`.
This is the type returned to a local task which entered either This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``. `Portal.open_stream_from()` or `Context.open_stream()`.
Termination rules: Termination rules:
@ -95,6 +96,22 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool|trio.EndOfChannel = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
@property
def ctx(self) -> Context:
'''
This stream's IPC `Context` ref.
'''
return self._ctx
@property
def chan(self) -> Channel:
'''
Ref to the containing `Context`'s transport `Channel`.
'''
return self._ctx.chan
# TODO: could we make this a direct method bind to `PldRx`? # TODO: could we make this a direct method bind to `PldRx`?
# -> receive_nowait = PldRx.recv_pld # -> receive_nowait = PldRx.recv_pld
# |_ means latter would have to accept `MsgStream`-as-`self`? # |_ means latter would have to accept `MsgStream`-as-`self`?
@ -109,7 +126,7 @@ class MsgStream(trio.abc.Channel):
): ):
ctx: Context = self._ctx ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait( return ctx._pld_rx.recv_pld_nowait(
ctx=ctx, ipc=self,
expect_msg=expect_msg, expect_msg=expect_msg,
) )
@ -148,7 +165,7 @@ class MsgStream(trio.abc.Channel):
try: try:
ctx: Context = self._ctx ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ctx=ctx) return await ctx._pld_rx.recv_pld(ipc=self)
# XXX: the stream terminates on either of: # XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure # - via `self._rx_chan.receive()` raising after manual closure

View File

@ -84,6 +84,7 @@ class ActorNursery:
ria_nursery: trio.Nursery, ria_nursery: trio.Nursery,
da_nursery: trio.Nursery, da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException], errors: dict[tuple[str, str], BaseException],
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
@ -105,6 +106,7 @@ class ActorNursery:
self._at_least_one_child_in_debug: bool = False self._at_least_one_child_in_debug: bool = False
self.errors = errors self.errors = errors
self.exited = trio.Event() self.exited = trio.Event()
self._scope_error: BaseException|None = None
# NOTE: when no explicit call is made to # NOTE: when no explicit call is made to
# `.open_root_actor()` by application code, # `.open_root_actor()` by application code,
@ -117,7 +119,9 @@ class ActorNursery:
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
*, *,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr], bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
@ -125,6 +129,7 @@ class ActorNursery:
nursery: trio.Nursery|None = None, nursery: trio.Nursery|None = None,
debug_mode: bool|None = None, debug_mode: bool|None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> Portal: ) -> Portal:
''' '''
Start a (daemon) actor: an process that has no designated Start a (daemon) actor: an process that has no designated
@ -189,6 +194,13 @@ class ActorNursery:
) )
) )
# TODO: DEPRECATE THIS:
# -[ ] impl instead as a hilevel wrapper on
# top of a `@context` style invocation.
# |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side.
# -[ ] use @api_frame on the wrapper
async def run_in_actor( async def run_in_actor(
self, self,
@ -221,7 +233,7 @@ class ActorNursery:
# use the explicit function name if not provided # use the explicit function name if not provided
name = fn.__name__ name = fn.__name__
portal = await self.start_actor( portal: Portal = await self.start_actor(
name, name,
enable_modules=[mod_path] + ( enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or [] enable_modules or rpc_module_paths or []
@ -250,6 +262,7 @@ class ActorNursery:
) )
return portal return portal
# @api_frame
async def cancel( async def cancel(
self, self,
hard_kill: bool = False, hard_kill: bool = False,
@ -347,8 +360,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay? # normally don't need to show user by default
__tracebackhide__ = True __tracebackhide__: bool = True
outer_err: BaseException|None = None
inner_err: BaseException|None = None
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {} errors: dict[tuple[str, str], BaseException] = {}
@ -358,7 +374,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# handling errors that are generated by the inner nursery in # handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for # a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using # actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``). # `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller # errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery: async with trio.open_nursery() as da_nursery:
@ -393,7 +409,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
) )
an._join_procs.set() an._join_procs.set()
except BaseException as inner_err: except BaseException as _inner_err:
inner_err = _inner_err
errors[actor.uid] = inner_err errors[actor.uid] = inner_err
# If we error in the root but the debugger is # If we error in the root but the debugger is
@ -471,8 +488,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
trio.Cancelled trio.Cancelled
) as _outer_err:
outer_err = _outer_err
) as err: an._scope_error = outer_err or inner_err
# XXX: yet another guard before allowing the cancel # XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug. # sequence in case a (single) child is in debug.
@ -487,7 +506,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
if an._children: if an._children:
log.cancel( log.cancel(
'Actor-nursery cancelling due error type:\n' 'Actor-nursery cancelling due error type:\n'
f'{err}\n' f'{outer_err}\n'
) )
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await an.cancel() await an.cancel()
@ -514,11 +533,19 @@ async def _open_and_supervise_one_cancels_all_nursery(
else: else:
raise list(errors.values())[0] raise list(errors.values())[0]
# show frame on any (likely) internal error
if (
not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
# da_nursery scope end - nursery checkpoint # da_nursery scope end - nursery checkpoint
# final exit # final exit
@acm @acm
# @api_frame
async def open_nursery( async def open_nursery(
**kwargs, **kwargs,
@ -538,6 +565,7 @@ async def open_nursery(
which cancellation scopes correspond to each spawned subactor set. which cancellation scopes correspond to each spawned subactor set.
''' '''
__tracebackhide__: bool = True
implicit_runtime: bool = False implicit_runtime: bool = False
actor: Actor = current_actor(err_on_no_runtime=False) actor: Actor = current_actor(err_on_no_runtime=False)
an: ActorNursery|None = None an: ActorNursery|None = None
@ -588,6 +616,14 @@ async def open_nursery(
an.exited.set() an.exited.set()
finally: finally:
# show frame on any internal runtime-scope error
if (
an
and not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
msg: str = ( msg: str = (
'Actor-nursery exited\n' 'Actor-nursery exited\n'
f'|_{an}\n' f'|_{an}\n'

File diff suppressed because it is too large Load Diff

View File

@ -20,11 +20,8 @@ as it pertains to improving the grok-ability of our runtime!
''' '''
from __future__ import annotations from __future__ import annotations
from functools import partial
import inspect import inspect
# import msgspec
# from pprint import pformat
import textwrap
import traceback
from types import ( from types import (
FrameType, FrameType,
FunctionType, FunctionType,
@ -32,9 +29,8 @@ from types import (
# CodeType, # CodeType,
) )
from typing import ( from typing import (
# Any, Any,
Callable, Callable,
# TYPE_CHECKING,
Type, Type,
) )
@ -42,6 +38,7 @@ from tractor.msg import (
pretty_struct, pretty_struct,
NamespacePath, NamespacePath,
) )
import wrapt
# TODO: yeah, i don't love this and we should prolly just # TODO: yeah, i don't love this and we should prolly just
@ -83,6 +80,31 @@ def get_class_from_frame(fr: FrameType) -> (
return None return None
def get_ns_and_func_from_frame(
frame: FrameType,
) -> Callable:
'''
Return the corresponding function object reference from
a `FrameType`, and return it and it's parent namespace `dict`.
'''
ns: dict[str, Any]
# for a method, go up a frame and lookup the name in locals()
if '.' in (qualname := frame.f_code.co_qualname):
cls_name, _, func_name = qualname.partition('.')
ns = frame.f_back.f_locals[cls_name].__dict__
else:
func_name: str = frame.f_code.co_name
ns = frame.f_globals
return (
ns,
ns[func_name],
)
def func_ref_from_frame( def func_ref_from_frame(
frame: FrameType, frame: FrameType,
) -> Callable: ) -> Callable:
@ -98,34 +120,63 @@ def func_ref_from_frame(
) )
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct): class CallerInfo(pretty_struct.Struct):
rt_fi: inspect.FrameInfo # https://docs.python.org/dev/reference/datamodel.html#frame-objects
call_frame: FrameType # https://docs.python.org/dev/library/inspect.html#the-interpreter-stack
_api_frame: FrameType
@property @property
def api_func_ref(self) -> Callable|None: def api_frame(self) -> FrameType:
return func_ref_from_frame(self.rt_fi.frame) try:
self._api_frame.clear()
except RuntimeError:
# log.warning(
print(
f'Frame {self._api_frame} for {self.api_func} is still active!'
)
return self._api_frame
_api_func: Callable
@property
def api_func(self) -> Callable:
return self._api_func
_caller_frames_up: int|None = 1
_caller_frame: FrameType|None = None # cached after first stack scan
@property @property
def api_nsp(self) -> NamespacePath|None: def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func_ref func: FunctionType = self.api_func
if func: if func:
return NamespacePath.from_ref(func) return NamespacePath.from_ref(func)
return '<unknown>' return '<unknown>'
@property @property
def caller_func_ref(self) -> Callable|None: def caller_frame(self) -> FrameType:
return func_ref_from_frame(self.call_frame)
# if not already cached, scan up stack explicitly by
# configured count.
if not self._caller_frame:
if self._caller_frames_up:
for _ in range(self._caller_frames_up):
caller_frame: FrameType|None = self.api_frame.f_back
if not caller_frame:
raise ValueError(
'No frame exists {self._caller_frames_up} up from\n'
f'{self.api_frame} @ {self.api_nsp}\n'
)
self._caller_frame = caller_frame
return self._caller_frame
@property @property
def caller_nsp(self) -> NamespacePath|None: def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.caller_func_ref func: FunctionType = self.api_func
if func: if func:
return NamespacePath.from_ref(func) return NamespacePath.from_ref(func)
@ -172,108 +223,66 @@ def find_caller_info(
call_frame = call_frame.f_back call_frame = call_frame.f_back
return CallerInfo( return CallerInfo(
rt_fi=fi, _api_frame=rt_frame,
call_frame=call_frame, _api_func=func_ref_from_frame(rt_frame),
_caller_frames_up=go_up_iframes,
) )
return None return None
def pformat_boxed_tb( _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str: # TODO: -[x] move all this into new `.devx._code`!
''' # -[ ] consider rename to _callstack?
Create a "boxed" looking traceback string. # -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
def api_frame(
wrapped: Callable|None = None,
*,
caller_frames_up: int = 1,
Useful for emphasizing traceback text content as being an ) -> Callable:
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
Any other parent/container "fields" can be passed in the # handle the decorator called WITHOUT () case,
`fields_str` input along with other prefix/indent settings. # i.e. just @api_frame, NOT @api_frame(extra=<blah>)
if wrapped is None:
return partial(
api_frame,
caller_frames_up=caller_frames_up,
)
''' @wrapt.decorator
if ( async def wrapper(
fields_str wrapped: Callable,
and instance: object,
field_prefix args: tuple,
kwargs: dict,
): ):
fields: str = textwrap.indent( # maybe cache the API frame for this call
fields_str, global _frame2callerinfo_cache
prefix=field_prefix, this_frame: FrameType = inspect.currentframe()
) api_frame: FrameType = this_frame.f_back
else:
fields = fields_str or ''
tb_body = tb_str if not _frame2callerinfo_cache.get(api_frame):
if tb_body_indent: _frame2callerinfo_cache[api_frame] = CallerInfo(
tb_body: str = textwrap.indent( _api_frame=api_frame,
tb_str, _api_func=wrapped,
prefix=tb_body_indent * ' ', _caller_frames_up=caller_frames_up,
) )
tb_box: str = ( return wrapped(*args, **kwargs)
# orig # annotate the function as a "api function", meaning it is
# f' |\n' # a function for which the function above it in the call stack should be
# f' ------ - ------\n\n' # non-`tractor` code aka "user code".
# f'{tb_str}\n' #
# f' ------ - ------\n' # in the global frame cache for easy lookup from a given
# f' _|\n' # func-instance
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
f'|\n' wrapped.__api_func__: bool = True
f' ------ - ------\n\n' return wrapper(wrapped)
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str

View File

@ -57,8 +57,8 @@ LEVELS: dict[str, int] = {
'TRANSPORT': 5, 'TRANSPORT': 5,
'RUNTIME': 15, 'RUNTIME': 15,
'CANCEL': 16, 'CANCEL': 16,
'DEVX': 400,
'PDB': 500, 'PDB': 500,
'DEVX': 600,
} }
# _custom_levels: set[str] = { # _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys() # lvlname.lower for lvlname in LEVELS.keys()
@ -137,7 +137,7 @@ class StackLevelAdapter(LoggerAdapter):
"Developer experience" sub-sys statuses. "Developer experience" sub-sys statuses.
''' '''
return self.log(600, msg) return self.log(400, msg)
def log( def log(
self, self,
@ -202,8 +202,29 @@ class StackLevelAdapter(LoggerAdapter):
) )
# TODO IDEA:
# -[ ] do per task-name and actor-name color coding
# -[ ] unique color per task-id and actor-uuid
def pformat_task_uid(
id_part: str = 'tail'
):
'''
Return `str`-ified unique for a `trio.Task` via a combo of its
`.name: str` and `id()` truncated output.
'''
task: trio.Task = trio.lowlevel.current_task()
tid: str = str(id(task))
if id_part == 'tail':
tid_part: str = tid[-6:]
else:
tid_part: str = tid[:6]
return f'{task.name}[{tid_part}]'
_conc_name_getters = { _conc_name_getters = {
'task': lambda: trio.lowlevel.current_task().name, 'task': pformat_task_uid,
'actor': lambda: current_actor(), 'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name, 'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6], 'actor_uid': lambda: current_actor().uid[1][:6],
@ -211,7 +232,10 @@ _conc_name_getters = {
class ActorContextInfo(Mapping): class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names" '''
Dyanmic lookup for local actor and task names.
'''
_context_keys = ( _context_keys = (
'task', 'task',
'actor', 'actor',

View File

@ -44,7 +44,7 @@ from ._codec import (
# ) # )
from .types import ( from .types import (
Msg as Msg, PayloadMsg as PayloadMsg,
Aid as Aid, Aid as Aid,
SpawnSpec as SpawnSpec, SpawnSpec as SpawnSpec,

View File

@ -432,7 +432,7 @@ class MsgCodec(Struct):
# ) -> Any|Struct: # ) -> Any|Struct:
# msg: Msg = codec.dec.decode(msg) # msg: PayloadMsg = codec.dec.decode(msg)
# payload_tag: str = msg.header.payload_tag # payload_tag: str = msg.header.payload_tag
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag] # payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
# return payload_dec.decode(msg.pld) # return payload_dec.decode(msg.pld)

View File

@ -22,10 +22,9 @@ operational helpers for processing transaction flows.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
# asynccontextmanager as acm, asynccontextmanager as acm,
contextmanager as cm, contextmanager as cm,
) )
from contextvars import ContextVar
from typing import ( from typing import (
Any, Any,
Type, Type,
@ -50,6 +49,7 @@ from tractor._exceptions import (
_mk_msg_type_err, _mk_msg_type_err,
pack_from_raise, pack_from_raise,
) )
from tractor._state import current_ipc_ctx
from ._codec import ( from ._codec import (
mk_dec, mk_dec,
MsgDec, MsgDec,
@ -75,7 +75,7 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
_def_any_pldec: MsgDec = mk_dec() _def_any_pldec: MsgDec[Any] = mk_dec()
class PldRx(Struct): class PldRx(Struct):
@ -104,15 +104,19 @@ class PldRx(Struct):
''' '''
# TODO: better to bind it here? # TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel # _rx_mc: trio.MemoryReceiveChannel
_pldec: MsgDec _pld_dec: MsgDec
_ctx: Context|None = None
_ipc: Context|MsgStream|None = None _ipc: Context|MsgStream|None = None
@property @property
def pld_dec(self) -> MsgDec: def pld_dec(self) -> MsgDec:
return self._pldec return self._pld_dec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm @cm
def apply_to_ipc( def wraps_ipc(
self, self,
ipc_prim: Context|MsgStream, ipc_prim: Context|MsgStream,
@ -140,49 +144,50 @@ class PldRx(Struct):
exit. exit.
''' '''
orig_dec: MsgDec = self._pldec orig_dec: MsgDec = self._pld_dec
limit_dec: MsgDec = mk_dec(spec=spec) limit_dec: MsgDec = mk_dec(spec=spec)
try: try:
self._pldec = limit_dec self._pld_dec = limit_dec
yield limit_dec yield limit_dec
finally: finally:
self._pldec = orig_dec self._pld_dec = orig_dec
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._pldec.dec return self._pld_dec.dec
def recv_pld_nowait( def recv_pld_nowait(
self, self,
# TODO: make this `MsgStream` compat as well, see above^ # TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream, # ipc_prim: Context|MsgStream,
ctx: Context, ipc: Context|MsgStream,
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = False,
**dec_msg_kwargs, **dec_msg_kwargs,
) -> Any|Raw: ) -> Any|Raw:
__tracebackhide__: bool = True __tracebackhide__: bool = hide_tb
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ctx._rx_chan.receive_nowait() ipc._rx_chan.receive_nowait()
) )
return self.dec_msg( return self.dec_msg(
msg, msg,
ctx=ctx, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=hide_tb,
**dec_msg_kwargs, **dec_msg_kwargs,
) )
async def recv_pld( async def recv_pld(
self, self,
ctx: Context, ipc: Context|MsgStream,
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True, hide_tb: bool = True,
@ -200,11 +205,11 @@ class PldRx(Struct):
or or
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ctx._rx_chan.receive() await ipc._rx_chan.receive()
) )
return self.dec_msg( return self.dec_msg(
msg=msg, msg=msg,
ctx=ctx, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
**dec_msg_kwargs, **dec_msg_kwargs,
) )
@ -212,7 +217,7 @@ class PldRx(Struct):
def dec_msg( def dec_msg(
self, self,
msg: MsgType, msg: MsgType,
ctx: Context, ipc: Context|MsgStream,
expect_msg: Type[MsgType]|None, expect_msg: Type[MsgType]|None,
raise_error: bool = True, raise_error: bool = True,
@ -225,6 +230,9 @@ class PldRx(Struct):
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
_src_err = None
src_err: BaseException|None = None
match msg: match msg:
# payload-data shuttle msg; deliver the `.pld` value # payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code. # directly to IPC (primitive) client-consumer code.
@ -234,7 +242,7 @@ class PldRx(Struct):
|Return(pld=pld) # termination phase |Return(pld=pld) # termination phase
): ):
try: try:
pld: PayloadT = self._pldec.decode(pld) pld: PayloadT = self._pld_dec.decode(pld)
log.runtime( log.runtime(
'Decoded msg payload\n\n' 'Decoded msg payload\n\n'
f'{msg}\n\n' f'{msg}\n\n'
@ -243,25 +251,30 @@ class PldRx(Struct):
) )
return pld return pld
# XXX pld-type failure # XXX pld-value type failure
except ValidationError as src_err: except ValidationError as valerr:
# pack mgterr into error-msg for
# reraise below; ensure remote-actor-err
# info is displayed nicely?
msgterr: MsgTypeError = _mk_msg_type_err( msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg, msg=msg,
codec=self.pld_dec, codec=self.pld_dec,
src_validation_error=src_err, src_validation_error=valerr,
is_invalid_payload=True, is_invalid_payload=True,
) )
msg: Error = pack_from_raise( msg: Error = pack_from_raise(
local_err=msgterr, local_err=msgterr,
cid=msg.cid, cid=msg.cid,
src_uid=ctx.chan.uid, src_uid=ipc.chan.uid,
) )
src_err = valerr
# XXX some other decoder specific failure? # XXX some other decoder specific failure?
# except TypeError as src_error: # except TypeError as src_error:
# from .devx import mk_pdb # from .devx import mk_pdb
# mk_pdb().set_trace() # mk_pdb().set_trace()
# raise src_error # raise src_error
# ^-TODO-^ can remove?
# a runtime-internal RPC endpoint response. # a runtime-internal RPC endpoint response.
# always passthrough since (internal) runtime # always passthrough since (internal) runtime
@ -299,6 +312,7 @@ class PldRx(Struct):
return src_err return src_err
case Stop(cid=cid): case Stop(cid=cid):
ctx: Context = getattr(ipc, 'ctx', ipc)
message: str = ( message: str = (
f'{ctx.side!r}-side of ctx received stream-`Stop` from ' f'{ctx.side!r}-side of ctx received stream-`Stop` from '
f'{ctx.peer_side!r} peer ?\n' f'{ctx.peer_side!r} peer ?\n'
@ -341,14 +355,21 @@ class PldRx(Struct):
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note # |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
# #
# fallthrough and raise from `src_err` # fallthrough and raise from `src_err`
try:
_raise_from_unexpected_msg( _raise_from_unexpected_msg(
ctx=ctx, ctx=getattr(ipc, 'ctx', ipc),
msg=msg, msg=msg,
src_err=src_err, src_err=src_err,
log=log, log=log,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
except UnboundLocalError:
# XXX if there's an internal lookup error in the above
# code (prolly on `src_err`) we want to show this frame
# in the tb!
__tracebackhide__: bool = False
raise
async def recv_msg_w_pld( async def recv_msg_w_pld(
self, self,
@ -378,52 +399,13 @@ class PldRx(Struct):
# msg instance? # msg instance?
pld: PayloadT = self.dec_msg( pld: PayloadT = self.dec_msg(
msg, msg,
ctx=ipc, ipc=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
**kwargs, **kwargs,
) )
return msg, pld return msg, pld
# Always maintain a task-context-global `PldRx`
_def_pld_rx: PldRx = PldRx(
_pldec=_def_any_pldec,
)
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
'pld_rx',
default=_def_pld_rx,
)
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()
@cm @cm
def limit_plds( def limit_plds(
spec: Union[Type[Struct]], spec: Union[Type[Struct]],
@ -439,29 +421,55 @@ def limit_plds(
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
try: try:
# sanity on orig settings curr_ctx: Context = current_ipc_ctx()
orig_pldrx: PldRx = current_pldrx() rx: PldRx = curr_ctx._pld_rx
orig_pldec: MsgDec = orig_pldrx.pld_dec orig_pldec: MsgDec = rx.pld_dec
with orig_pldrx.limit_plds( with rx.limit_plds(
spec=spec, spec=spec,
**kwargs, **kwargs,
) as pldec: ) as pldec:
log.info( log.runtime(
'Applying payload-decoder\n\n' 'Applying payload-decoder\n\n'
f'{pldec}\n' f'{pldec}\n'
) )
yield pldec yield pldec
finally: finally:
log.info( log.runtime(
'Reverted to previous payload-decoder\n\n' 'Reverted to previous payload-decoder\n\n'
f'{orig_pldec}\n' f'{orig_pldec}\n'
) )
assert ( # sanity on orig settings
(pldrx := current_pldrx()) is orig_pldrx assert rx.pld_dec is orig_pldec
and
pldrx.pld_dec is orig_pldec
) @acm
async def maybe_limit_plds(
ctx: Context,
spec: Union[Type[Struct]]|None = None,
**kwargs,
) -> MsgDec|None:
'''
Async compat maybe-payload type limiter.
Mostly for use inside other internal `@acm`s such that a separate
indent block isn't needed when an async one is already being
used.
'''
if spec is None:
yield None
return
# sanity on scoping
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
yield msgdec
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
async def drain_to_final_msg( async def drain_to_final_msg(
@ -543,21 +551,12 @@ async def drain_to_final_msg(
match msg: match msg:
# final result arrived! # final result arrived!
case Return( case Return():
# cid=cid,
# pld=res,
):
# ctx._result: Any = res
ctx._result: Any = pld
log.runtime( log.runtime(
'Context delivered final draining msg:\n' 'Context delivered final draining msg:\n'
f'{pretty_struct.pformat(msg)}' f'{pretty_struct.pformat(msg)}'
) )
# XXX: only close the rx mem chan AFTER ctx._result: Any = pld
# a final result is retreived.
# if ctx._rx_chan:
# await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right?
result_msg = msg result_msg = msg
break break
@ -664,24 +663,6 @@ async def drain_to_final_msg(
result_msg = msg result_msg = msg
break # OOOOOF, yeah obvi we need this.. break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=ctx._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is ctx._cancel_msg
# assert error.msgdata == ctx._remote_error.msgdata
# assert error.ipc_msg == ctx._remote_error.ipc_msg
# from .devx._debug import pause
# await pause()
# ctx._maybe_cancel_and_set_remote_error(error)
# ctx._maybe_raise_remote_err(error)
else: else:
# bubble the original src key error # bubble the original src key error
raise raise

View File

@ -56,8 +56,7 @@ log = get_logger('tractor.msgspec')
PayloadT = TypeVar('PayloadT') PayloadT = TypeVar('PayloadT')
# TODO: PayloadMsg class PayloadMsg(
class Msg(
Struct, Struct,
Generic[PayloadT], Generic[PayloadT],
@ -110,6 +109,10 @@ class Msg(
pld: Raw pld: Raw
# TODO: complete rename
Msg = PayloadMsg
class Aid( class Aid(
Struct, Struct,
tag=True, tag=True,
@ -299,7 +302,7 @@ class StartAck(
class Started( class Started(
Msg, PayloadMsg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -313,12 +316,12 @@ class Started(
# TODO: instead of using our existing `Start` # TODO: instead of using our existing `Start`
# for this (as we did with the original `{'cmd': ..}` style) # for this (as we did with the original `{'cmd': ..}` style)
# class Cancel(Msg): # class Cancel:
# cid: str # cid: str
class Yield( class Yield(
Msg, PayloadMsg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -345,7 +348,7 @@ class Stop(
# TODO: is `Result` or `Out[come]` a better name? # TODO: is `Result` or `Out[come]` a better name?
class Return( class Return(
Msg, PayloadMsg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -357,7 +360,7 @@ class Return(
class CancelAck( class CancelAck(
Msg, PayloadMsg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -463,14 +466,14 @@ def from_dict_msg(
# TODO: should be make a msg version of `ContextCancelled?` # TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`? # and/or with a scope field or a full `ActorCancelled`?
# class Cancelled(Msg): # class Cancelled(MsgType):
# cid: str # cid: str
# TODO what about overruns? # TODO what about overruns?
# class Overrun(Msg): # class Overrun(MsgType):
# cid: str # cid: str
_runtime_msgs: list[Msg] = [ _runtime_msgs: list[Struct] = [
# identity handshake on first IPC `Channel` contact. # identity handshake on first IPC `Channel` contact.
Aid, Aid,
@ -496,9 +499,9 @@ _runtime_msgs: list[Msg] = [
] ]
# the no-outcome-yet IAC (inter-actor-communication) sub-set which # the no-outcome-yet IAC (inter-actor-communication) sub-set which
# can be `Msg.pld` payload field type-limited by application code # can be `PayloadMsg.pld` payload field type-limited by application code
# using `apply_codec()` and `limit_msg_spec()`. # using `apply_codec()` and `limit_msg_spec()`.
_payload_msgs: list[Msg] = [ _payload_msgs: list[PayloadMsg] = [
# first <value> from `Context.started(<value>)` # first <value> from `Context.started(<value>)`
Started, Started,
@ -541,8 +544,8 @@ def mk_msg_spec(
] = 'indexed_generics', ] = 'indexed_generics',
) -> tuple[ ) -> tuple[
Union[Type[Msg]], Union[MsgType],
list[Type[Msg]], list[MsgType],
]: ]:
''' '''
Create a payload-(data-)type-parameterized IPC message specification. Create a payload-(data-)type-parameterized IPC message specification.
@ -554,7 +557,7 @@ def mk_msg_spec(
determined by the input `payload_type_union: Union[Type]`. determined by the input `payload_type_union: Union[Type]`.
''' '''
submsg_types: list[Type[Msg]] = Msg.__subclasses__() submsg_types: list[MsgType] = Msg.__subclasses__()
bases: tuple = ( bases: tuple = (
# XXX NOTE XXX the below generic-parameterization seems to # XXX NOTE XXX the below generic-parameterization seems to
# be THE ONLY way to get this to work correctly in terms # be THE ONLY way to get this to work correctly in terms