Compare commits

...

21 Commits

Author SHA1 Message Date
Tyler Goodlet b22f7dcae0 Resolve remaining debug-request race causing hangs
More or less by pedantically separating and managing root and subactor
request syncing events to always be managed by the locking IPC context
task-funcs:
- for the root's "child"-side, `lock_tty_for_child()` directly creates
  and sets a new `Lock.req_handler_finished` inside a `finally:`
- for the sub's "parent"-side, `request_root_stdio_lock()` does the same
  with a new `DebugStatus.req_finished` event and separates it from
  the `.repl_release` event (which indicates a "c" or "q" from user and
  thus exit of the REPL session) as well as sets a new `.req_task:
  trio.Task` to explicitly distinguish from the app-user-task that
  enters the REPL vs. the paired bg task used to request the global
  root's stdio mutex alongside it.
- apply the `__pld_spec__` on "child"-side of the ctx using the new
  `Portal.open_context(pld_spec)` parameter support; drops use of any
  `ContextVar` malarky used prior for `PldRx` mgmt.
- removing `Lock.no_remote_has_tty` since it was a nebulous name and
  from the prior "everything is in a `Lock`" design..

------ - ------

More rigorous impl to handle various edge cases in `._pause()`:
- rejig `_enter_repl_sync()` to wrap the `debug_func == None` case
  inside maybe-internal-error handler blocks.
- better logic for recurrent vs. multi-task contention for REPL entry in
  subactors, by guarding using `DebugStatus.req_task` and by now waiting
  on the new `DebugStatus.req_finished` for the multi-task contention
  case.
- even better internal error handling and reporting for when this code
  is hacked on and possibly broken ;p

------ - ------

Updates to `.pause_from_sync()` support:
- add optional `actor`, `task` kwargs to `_set_trace()` to allow
  compat with the new explicit `debug_func` calling in `._pause()` and
  pass a `threading.Thread` for `task` in the `.to_thread()` usage case.
- add an `except` block that tries to show the frame on any internal
  error.

------ - ------

Relatedly includes a buncha cleanups/simplifications somewhat in
prep for some coming refinements (around `DebugStatus`):
- use all the new attrs mentioned above as needed in the SIGINT shielder.
- wait on `Lock.req_handler_finished` in `maybe_wait_for_debugger()`.
- dropping a ton of masked legacy code left in during the recent reworks.
- better comments, like on the use of `Context._scope` for shielding on
  the "child"-side to avoid the need to manage yet another cs.
- add/change-to lotsa `log.devx()` level emissions for those infos which
  are handy while hacking on the debugger but not ideal/necessary to be
  user visible.
- obvi add lotsa follow up todo notes!
2024-05-21 10:19:41 -04:00
Tyler Goodlet fde62c72be Show runtime nursery frames on internal errors
Much like other recent changes attempt to detect runtime-bug-causing
crashes and only show the runtime-endpoint frame when present.

Adds a `ActorNursery._scope_error: BaseException|None` attr to aid with
detection. Also toss in some todo notes for removing and replacing the
`.run_in_actor()` method API.
2024-05-20 17:04:30 -04:00
Tyler Goodlet 4ef77bb64f Set `_ctxvar_Context` for child-side RPC tasks
Just inside `._invoke()` after the `ctx: Context` is retrieved.

Also try our best to *not hide* internal frames when a non-user-code
crash happens, normally either due to a runtime RPC EP bug or
a transport failure.
2024-05-20 16:23:29 -04:00
Tyler Goodlet e78fdf2f69 Make `log.devx()` level below `.pdb()`
Kinda like a "runtime"-y level for `.pdb()` (which is more or less like
an `.info()` for our debugger subsys) which can be used to report
internals info for those hacking on `.devx` tools.

Also, inject only the *last* 6 digits of the `id(Task)` in
`pformat_task_uid()` output by default.
2024-05-20 16:13:57 -04:00
Tyler Goodlet 13bc3c308d Add error suppress flag to `current_ipc_ctx()` 2024-05-20 16:12:51 -04:00
Tyler Goodlet 60fc43e530 Shield channel closing in `_connect_chan()` 2024-05-20 16:11:59 -04:00
Tyler Goodlet 30afcd2b6b Adjust `Portal` usage of `Context.pld_rx`
Pass the new `ipc` arg and try to show api frames when an unexpected
internal error is detected.
2024-05-20 16:07:57 -04:00
Tyler Goodlet c80f020ebc Expose `tractor.current_ipc_ctx()` at pkg level 2024-05-20 15:47:01 -04:00
Tyler Goodlet 262a0e36c6 Allocate a `PldRx` per `Context`, new pld-spec API
Since the state mgmt becomes quite messy with multiple sub-tasks inside
an IPC ctx, AND bc generally speaking the payload-type-spec should map
1-to-1 with the `Context`, it doesn't make a lot of sense to be using
`ContextVar`s to modify the `Context.pld_rx: PldRx` instance.

Instead, always allocate a full instance inside `mk_context()` with the
default `.pld_rx: PldRx` set to use the `msg._ops._def_any_pldec: MsgDec`

In support, simplify the `.msg._ops` impl and APIs:
- drop `_ctxvar_PldRx`, `_def_pld_rx` and `current_pldrx()`.
- rename `PldRx._pldec` -> `._pld_dec`.
- rename the unused `PldRx.apply_to_ipc()` -> `.wraps_ipc()`.
- add a required `PldRx._ctx: Context` attr since it is needed
  internally in some meths and each pld-rx now maps to a specific ctx.
- modify all recv methods to accept a `ipc: Context|MsgStream` (instead
  of a `ctx` arg) since both have a ref to the same `._rx_chan` and there
  are only a couple spots (in `.dec_msg()`) where we need the `ctx`
  explicitly (which can now be easily accessed via a new `MsgStream.ctx`
  property, see below).
- always show the `.dec_msg()` frame in tbs if there's a reference error
  when calling `_raise_from_unexpected_msg()` in the fallthrough case.
- implement `limit_plds()` as light wrapper around getting the
  `current_ipc_ctx()` and mutating its `MsgDec` via
  `Context.pld_rx.limit_plds()`.
- add a `maybe_limit_plds()` which just provides an `@acm` equivalent of
  `limit_plds()` handy for composing in a `async with ():` style block
  (avoiding additional indent levels in the body of async funcs).

Obvi extend the `Context` and `MsgStream` interfaces as needed
to match the above:
- add a `Context.pld_rx` pub prop.
- new private refs to `Context._started_msg: Started` and
  a `._started_pld` (mostly for internal debugging / testing / logging)
  and set inside `.open_context()` immediately after the syncing phase.
- a `Context.has_outcome() -> bool:` predicate which can be used to more
  easily determine if the ctx errored or has a final result.
- pub props for `MsgStream.ctx: Context` and `.chan: Channel` providing
  full `ipc`-arg compat with the `PldRx` method signatures.
2024-05-20 15:46:28 -04:00
Tyler Goodlet d93135acd8 Include truncated `id(trio.Task)` for task info in log header 2024-05-15 09:36:22 -04:00
Tyler Goodlet b23780c102 Make `request_root_stdio_lock()` post-mortem-able
Finally got this working so that if/when an internal bug is introduced
to this request task-func, we can actually REPL-debug the lock request
task itself B)

As in, if the subactor's lock request task internally errors we,
- ensure the task always terminates (by calling `DebugStatus.release()`)
  and explicitly reports (via a `log.exception()`) the internal error.
- capture the error instance and set as a new `DebugStatus.req_err` and
  always check for it on final teardown - in which case we also,
 - ensure it's reraised from a new `DebugRequestError`.
 - unhide the stack frames for `_pause()`, `_enter_repl_sync()` so that
   the dev can upward inspect the `_pause()` call stack sanely.

Supporting internal impl changes,
- add `DebugStatus.cancel()` and `.req_err`.
- don't ever cancel the request task from
  `PdbREPL.set_[continue/quit]()` only when there's some internal error
  that would likely result in a hang and stale lock state with the root.
- only release the root's lock when the current ask is also the owner
  (avoids bad release errors).
- also show internal `._pause()`-related frames on any `repl_err`.

Other temp-dev-tweaks,
- make pld-dec change log msgs info level again while solving this
  final context-vars race stuff..
- drop the debug pld-dec instance match asserts for now since
  the problem is already caught (and now debug-able B) by an attr-error
  on the decoded-as-`dict` started msg, and instead add in
  a `log.exception()` trace to see which task is triggering the case
  where the debug `MsgDec` isn't set correctly vs. when we think it's
  being applied.
2024-05-14 21:01:20 -04:00
Tyler Goodlet 31de5f6648 Always release debug request from `._post_mortem()`
Since obviously the thread is likely expected to halt and raise after
the REPL session exits; this was a regression from the prior impl. The
main reason for this is that otherwise the request task will never
unblock if the user steps through the crashed task using 'next' since
the `.do_next()` handler doesn't by default release the request since in
the `.pause()` case this would end the session too early.

Other,
- toss in draft `Pdb.user_exception()`, though doesn't seem to ever
  trigger?
- only release `Lock._debug_lock` when already locked.
2024-05-14 11:39:04 -04:00
Tyler Goodlet 236083b6e4 Rename `.msg.types.Msg` -> `PayloadMsg` 2024-05-10 13:15:45 -04:00
Tyler Goodlet d2dee87b36 Modernize streaming example script
- add typing,
- apply multi-line call style,
- use 'cancel' log level,
- enable debug mode.
2024-05-09 16:51:51 -04:00
Tyler Goodlet 5cb0cc0f0b Update tests for `PldRx` and `Context` changes
Mostly adjustments for the new pld-receiver semantics/shim-layer which
results more often in the direct delivery of `RemoteActorError`s from
IPC API primitives (like `Portal.result()`) instead of being embedded in
an `ExceptionGroup` bundled from an embedded nursery.

Tossed usage of the `debug_mode: bool` fixture to a couple problematic
tests while i was working on them.

Also includes detailed assertion updates to the inter-peer cancellation
suite in terms of,
- `Context.canceller` state correctly matching the true src actor when
  expecting a ctxc.
- any rxed `ContextCancelled` should instance match the `Context._local/remote_error`
  as should the `.msgdata` and `._ipc_msg`.
2024-05-09 16:48:53 -04:00
Tyler Goodlet fc075e96c6 Hide some API frames, port to new `._debug` apis
- start tossing in `__tracebackhide__`s to various eps which don't need
  to show in tbs or in the pdb REPL.
- port final `._maybe_enter_pm()` to pass a `api_frame`.
- start comment-marking up some API eps with `@api_frame`
  in prep for actually using the new frame-stack tracing.
2024-05-09 16:04:34 -04:00
Tyler Goodlet d6ca4771ce Use `.recv_msg_w_pld()` for final `Portal.result()`
Woops, due to a `None` test against the `._final_result`, any actual
final `None` result would be received but not acked as such causing
a spawning test to hang. Fix it by instead receiving and assigning both
a `._final_result_msg: PayloadMsg` and `._final_result_pld`.

NB: as mentioned in many recent comments surrounding this API layer,
really this whole `Portal`-has-final-result interface/semantics should
be entirely removed as should the `ActorNursery.run_in_actor()` API(s).
Instead it should all be replaced by a wrapping "high level" API
(`tractor.hilevel` ?) which combines a task nursery, `Portal.open_context()`
and underlying `Context` APIs + an `outcome.Outcome` to accomplish the
same "run a single task in a spawned actor and return it's result"; aka
a "one-shot-task-actor".
2024-05-09 09:47:13 -04:00
Tyler Goodlet c5a0cfc639 Rename `.msg.types.Msg` -> `PayloadMsg` 2024-05-08 15:07:34 -04:00
Tyler Goodlet f85314ecab Adjust `._runtime` to report `DebugStatus.req_ctx`
- inside the `Actor.cancel()`'s maybe-wait-on-debugger delay,
  report the full debug request status and it's affiliated lock request
  IPC ctx.
- use the new `.req_ctx.chan.uid` to do the local nursery lookup during
  channel teardown handling.
- another couple log fmt tweaks.
2024-05-08 15:06:50 -04:00
Tyler Goodlet c929bc15c9 Add `pexpect` to dev deps for testing 2024-05-08 14:53:10 -04:00
Tyler Goodlet 6690968236 Rework and first draft of `.devx._frame_stack.py`
Proto-ing a little suite of call-stack-frame annotation-for-scanning
sub-systems for the purposes of both,
- the `.devx._debug`er and its
  traceback and frame introspection needs when entering the REPL,
- detailed trace-style logging such that we can explicitly report
  on "which and where" `tractor`'s APIs are used in the "app" code.

Deats:
- change mod name obvi from `._code` and adjust client mod imports.
- using `wrapt` (for perf) implement a `@api_frame` annot decorator
  which both stashes per-call-stack-frame instances of `CallerInfo` in
  a table and marks the function such that API endpoints can be easily
  found via runtime stack scanning despite any internal impl changes.
- add a global `_frame2callerinfo_cache: dict[FrameType, CallerInfo]`
  table for providing the per func-frame info caching.
- Re-implement `CallerInfo` to require less (types of) inputs:
  |_ `_api_func: Callable`, a ref to the (singleton) func def.
  |_ `_api_frame: FrameType` taken from the `@api_frame` marked `tractor`-API
     func's runtime call-stack, from which we can determine the
     app code's `.caller_frame`.
  |_`_caller_frames_up: int|None` allowing the specific `@api_frame` to
    determine "how many frames up" the application / calling code is.
  And, a better set of derived attrs:
  |_`caller_frame: FrameType` which finds and caches the API-eps calling
    frame.
  |_`caller_frame: FrameType` which finds and caches the API-eps calling
- add a new attempt at "getting a method ref from its runtime frame"
  with `get_ns_and_func_from_frame()` using a heuristic that the
  `CodeType.co_qualname: str` should have a "." in it for methods.
  - main issue is still that the func-ref lookup will require searching
    for the method's instance type by name, and that name isn't
    guaranteed to be defined in any particular ns..
   |_rn we try to read it from the `FrameType.f_locals` but that is
     going to obvi fail any time the method is called in a module where
     it's type is not also defined/imported.
  - returns both the ns and the func ref FYI.
2024-05-08 14:51:56 -04:00
25 changed files with 1318 additions and 893 deletions

View File

@ -1,6 +1,11 @@
import time
import trio
import tractor
from tractor import (
ActorNursery,
MsgStream,
Portal,
)
# this is the first 2 actors, streamer_1 and streamer_2
@ -12,14 +17,18 @@ async def stream_data(seed):
# this is the third actor; the aggregator
async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream
'''
Ensure that the two streams we receive match but only stream
a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery:
portals = []
'''
an: ActorNursery
async with tractor.open_nursery() as an:
portals: list[Portal] = []
for i in range(1, 3):
# fork point
portal = await nursery.start_actor(
# fork/spawn call
portal = await an.start_actor(
name=f'streamer_{i}',
enable_modules=[__name__],
)
@ -43,7 +52,11 @@ async def aggregate(seed):
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(push_to_chan, portal, send_chan.clone())
n.start_soon(
push_to_chan,
portal,
send_chan.clone(),
)
# close this local task's reference to send side
await send_chan.aclose()
@ -60,7 +73,7 @@ async def aggregate(seed):
print("FINISHED ITERATING in aggregator")
await nursery.cancel()
await an.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
@ -75,18 +88,21 @@ async def main() -> list[int]:
'''
# yes, a nursery which spawns `trio`-"actors" B)
nursery: tractor.ActorNursery
async with tractor.open_nursery() as nursery:
an: ActorNursery
async with tractor.open_nursery(
loglevel='cancel',
debug_mode=True,
) as an:
seed = int(1e3)
pre_start = time.time()
portal: tractor.Portal = await nursery.start_actor(
portal: Portal = await an.start_actor(
name='aggregator',
enable_modules=[__name__],
)
stream: tractor.MsgStream
stream: MsgStream
async with portal.open_stream_from(
aggregate,
seed=seed,
@ -95,11 +111,12 @@ async def main() -> list[int]:
start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
result_stream: list[int] = []
async for value in stream:
result_stream.append(value)
await portal.cancel_actor()
cancelled: bool = await portal.cancel_actor()
assert cancelled
print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")

View File

@ -55,6 +55,7 @@ xontrib-vox = "^0.0.1"
optional = false
[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
pexpect = "^4.9.0"
# only for xonsh as sh..
xontrib-vox = "^0.0.1"

View File

@ -97,6 +97,7 @@ def test_ipc_channel_break_during_stream(
examples_dir() / 'advanced_faults'
/ 'ipc_failure_during_stream.py',
root=examples_dir(),
consider_namespace_packages=False,
)
# by def we expect KBI from user after a simulated "hang

View File

@ -89,17 +89,30 @@ def test_remote_error(reg_addr, args_err):
assert excinfo.value.boxed_type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
# the root task will also error on the `Portal.result()`
# call so we expect an error from there AND the child.
# |_ tho seems like on new `trio` this doesn't always
# happen?
with pytest.raises((
BaseExceptionGroup,
tractor.RemoteActorError,
)) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
# ensure boxed errors are `errtype`
err: BaseException = excinfo.value
if isinstance(err, BaseExceptionGroup):
suberrs: list[BaseException] = err.exceptions
else:
suberrs: list[BaseException] = [err]
for exc in suberrs:
assert exc.boxed_type == errtype
def test_multierror(reg_addr):
def test_multierror(
reg_addr: tuple[str, int],
):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.

View File

@ -444,6 +444,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
infect_asyncio=True,
fan_out=fan_out,
)
# should raise RAE diectly
await portal.result()
trio.run(main)
@ -461,12 +462,11 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception
# ensure boxed error type
excinfo.value.boxed_type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr):
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
exit_early=True,
infect_asyncio=True,
)
# should trigger remote actor error
# should raise RAE diectly
await portal.result()
# should be a quiet exit on a simple channel exit
@ -492,15 +492,17 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
aio_raise_err=True,
infect_asyncio=True,
)
# should trigger remote actor error
# should trigger RAE directly, not an eg.
await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo:
with pytest.raises(
# NOTE: bc we directly wait on `Portal.result()` instead
# of capturing it inside the `ActorNursery` machinery.
expected_exception=RemoteActorError,
) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception
excinfo.value.boxed_type == Exception
@tractor.context

View File

@ -55,9 +55,10 @@ from tractor._testing import (
@tractor.context
async def sleep_forever(
async def open_stream_then_sleep_forever(
ctx: Context,
expect_ctxc: bool = False,
) -> None:
'''
Sync the context, open a stream then just sleep.
@ -67,6 +68,10 @@ async def sleep_forever(
'''
try:
await ctx.started()
# NOTE: the below means this child will send a `Stop`
# to it's parent-side task despite that side never
# opening a stream itself.
async with ctx.open_stream():
await trio.sleep_forever()
@ -100,7 +105,7 @@ async def error_before_started(
'''
async with tractor.wait_for_actor('sleeper') as p2:
async with (
p2.open_context(sleep_forever) as (peer_ctx, first),
p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(),
):
# NOTE: this WAS inside an @acm body but i factored it
@ -204,9 +209,13 @@ async def stream_ints(
@tractor.context
async def stream_from_peer(
ctx: Context,
debug_mode: bool,
peer_name: str = 'sleeper',
) -> None:
# sanity
assert tractor._state.debug_mode() == debug_mode
peer: Portal
try:
async with (
@ -240,26 +249,54 @@ async def stream_from_peer(
assert msg is not None
print(msg)
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxc:
ctxerr = ctxc
# NOTE: cancellation of the (sleeper) peer should always cause
# a `ContextCancelled` raise in this streaming actor.
except ContextCancelled as _ctxc:
ctxc = _ctxc
assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
# print("TRYING TO ENTER PAUSSE!!!")
# await tractor.pause(shield=True)
re: ContextCancelled = peer_ctx._remote_error
# XXX YES, bc exact same msg instances
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
# XXX YES XXX, remote error should be unpacked only once!
assert (
re
is
peer_ctx.maybe_error
is
ctxc
is
peer_ctx._local_error
)
# NOTE: these errors should all match!
# ------ - ------
# XXX [2024-05-03] XXX
# ------ - ------
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
# where the `Error()` msg was directly raising the ctxc
# instead of just returning up to the caller inside
# `Context.return()` which would results in a diff instance of
# the same remote error bubbling out above vs what was
# already unpacked and set inside `Context.
assert (
peer_ctx._remote_error.msgdata
==
ctxc.msgdata
)
# ^-XXX-^ notice the data is of course the exact same.. so
# the above larger assert makes sense to also always be true!
# XXX NO, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
# XXX YES XXX, bc should be exact same msg instances
assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg
# XXX NO XXX, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
# the peer ctx is the canceller even though it's canceller
# is the "canceller" XD
assert peer_name in peer_ctx.canceller
assert "canceller" in ctxerr.canceller
assert "canceller" in ctxc.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
@ -283,12 +320,13 @@ async def stream_from_peer(
# TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside
# `.open_context().__aexit__()` BUT, if we had
# `Portal.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last
# checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368
#
# await tractor.pause()
# assert 'canceller' in ctx.canceller
# root/parent actor task should NEVER HAVE cancelled us!
@ -392,12 +430,13 @@ def test_peer_canceller(
try:
async with (
sleeper.open_context(
sleep_forever,
open_stream_then_sleep_forever,
expect_ctxc=True,
) as (sleeper_ctx, sent),
just_caller.open_context(
stream_from_peer,
debug_mode=debug_mode,
) as (caller_ctx, sent),
canceller.open_context(
@ -423,10 +462,11 @@ def test_peer_canceller(
# should always raise since this root task does
# not request the sleeper cancellation ;)
except ContextCancelled as ctxerr:
except ContextCancelled as _ctxc:
ctxc = _ctxc
print(
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
f'{ctxerr}\n'
f'{ctxc}\n'
)
# canceller and caller peers should not
@ -437,7 +477,7 @@ def test_peer_canceller(
# we were not the actor, our peer was
assert not sleeper_ctx.cancel_acked
assert ctxerr.canceller[0] == 'canceller'
assert ctxc.canceller[0] == 'canceller'
# XXX NOTE XXX: since THIS `ContextCancelled`
# HAS NOT YET bubbled up to the
@ -448,7 +488,7 @@ def test_peer_canceller(
# CASE_1: error-during-ctxc-handling,
if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown')
raise RuntimeError('Simulated RTE re-raise during ctxc handling')
# CASE_2: standard teardown inside in `.open_context()` block
raise
@ -513,6 +553,9 @@ def test_peer_canceller(
# should be cancelled by US.
#
if error_during_ctxerr_handling:
print(f'loc_err: {_loc_err}\n')
assert isinstance(loc_err, RuntimeError)
# since we do a rte reraise above, the
# `.open_context()` error handling should have
# raised a local rte, thus the internal
@ -521,9 +564,6 @@ def test_peer_canceller(
# a `trio.Cancelled` due to a local
# `._scope.cancel()` call.
assert not sleeper_ctx._scope.cancelled_caught
assert isinstance(loc_err, RuntimeError)
print(f'_loc_err: {_loc_err}\n')
# assert sleeper_ctx._local_error is _loc_err
# assert sleeper_ctx._local_error is _loc_err
assert not (
@ -560,9 +600,12 @@ def test_peer_canceller(
else: # the other 2 ctxs
assert (
re.canceller
==
canceller.channel.uid
isinstance(re, ContextCancelled)
and (
re.canceller
==
canceller.channel.uid
)
)
# since the sleeper errors while handling a
@ -811,8 +854,7 @@ async def serve_subactors(
async with open_nursery() as an:
# sanity
if debug_mode:
assert tractor._state.debug_mode()
assert tractor._state.debug_mode() == debug_mode
await ctx.started(peer_name)
async with ctx.open_stream() as ipc:
@ -1091,7 +1133,6 @@ def test_peer_spawns_and_cancels_service_subactor(
'-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n'
)
# else:
try:
res = await client_ctx.result(hide_tb=False)

View File

@ -2,7 +2,9 @@
Spawning basics
"""
from typing import Optional
from typing import (
Any,
)
import pytest
import trio
@ -25,13 +27,11 @@ async def spawn(
async with tractor.open_root_actor(
arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter
data = data_to_pass_down
if actor.is_arbiter:
async with tractor.open_nursery() as nursery:
# forks here
@ -95,7 +95,9 @@ async def test_movie_theatre_convo(start_method):
await portal.cancel_actor()
async def cellar_door(return_value: Optional[str]):
async def cellar_door(
return_value: str|None,
):
return return_value
@ -105,16 +107,18 @@ async def cellar_door(return_value: Optional[str]):
)
@tractor_test
async def test_most_beautiful_word(
start_method,
return_value
start_method: str,
return_value: Any,
debug_mode: bool,
):
'''
The main ``tractor`` routine.
'''
with trio.fail_after(1):
async with tractor.open_nursery() as n:
async with tractor.open_nursery(
debug_mode=debug_mode,
) as n:
portal = await n.run_in_actor(
cellar_door,
return_value=return_value,

View File

@ -42,6 +42,7 @@ from ._supervise import (
from ._state import (
current_actor as current_actor,
is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx,
)
from ._exceptions import (
ContextCancelled as ContextCancelled,

View File

@ -41,6 +41,7 @@ from typing import (
Callable,
Mapping,
Type,
TypeAlias,
TYPE_CHECKING,
Union,
)
@ -94,7 +95,7 @@ if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
from ._ipc import MsgTransport
from .devx._code import (
from .devx._frame_stack import (
CallerInfo,
)
@ -155,6 +156,41 @@ class Context:
# payload receiver
_pld_rx: msgops.PldRx
@property
def pld_rx(self) -> msgops.PldRx:
'''
The current `tractor.Context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER its container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been
delivered up from `tractor`'s transport layer but BEFORE the
data is yielded to `tractor` application code.
The "IPC-primitive API" is normally one of a `Context` (this)` or a `MsgStream`
or some higher level API using one of them.
For ex. `pld_data: PayloadT = MsgStream.receive()` implicitly
calls into the stream's parent `Context.pld_rx.recv_pld().` to
receive the latest `PayloadMsg.pld` value.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by the
interchange backend.
- for `msgspec` see <PUTLINKHERE>.
Note that the `PldRx` itself is a per-`Context` instance that
normally only changes when some (sub-)task, on a given "side"
of the IPC ctx (either a "child"-side RPC or inside
a "parent"-side `Portal.open_context()` block), modifies it
using the `.msg._ops.limit_plds()` API.
'''
return self._pld_rx
# full "namespace-path" to target RPC function
_nsf: NamespacePath
@ -231,6 +267,8 @@ class Context:
# init and streaming state
_started_called: bool = False
_started_msg: MsgType|None = None
_started_pld: Any = None
_stream_opened: bool = False
_stream: MsgStream|None = None
@ -623,7 +661,7 @@ class Context:
log.runtime(
'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n'
f'=> {self.side!r}: {self._actor.uid}\n\n'
f'{error}'
)
self._remote_error: BaseException = error
@ -678,7 +716,7 @@ class Context:
log.error(
f'Remote context error:\n\n'
# f'{pformat(self)}\n'
f'{error}\n'
f'{error}'
)
if self._canceller is None:
@ -724,8 +762,10 @@ class Context:
)
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
# mk_pdb().set_trace()
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
if (
cs
and
@ -805,6 +845,7 @@ class Context:
# f'{ci.api_nsp}()\n'
# )
# TODO: use `.dev._frame_stack` scanning to find caller!
return 'Portal.open_context()'
async def cancel(
@ -1304,17 +1345,6 @@ class Context:
ctx=self,
hide_tb=hide_tb,
)
for msg in drained_msgs:
# TODO: mask this by default..
if isinstance(msg, Return):
# from .devx import pause
# await pause()
# raise InternalError(
log.warning(
'Final `return` msg should never be drained !?!?\n\n'
f'{msg}\n'
)
drained_status: str = (
'Ctx drained to final outcome msg\n\n'
@ -1435,6 +1465,10 @@ class Context:
self._result
)
@property
def has_outcome(self) -> bool:
return bool(self.maybe_error) or self._final_result_is_set()
# @property
def repr_outcome(
self,
@ -1637,8 +1671,6 @@ class Context:
)
if rt_started != started_msg:
# TODO: break these methods out from the struct subtype?
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
rt_started,
@ -1674,6 +1706,8 @@ class Context:
) from verr
self._started_called = True
self._started_msg = started_msg
self._started_pld = value
async def _drain_overflows(
self,
@ -1961,6 +1995,7 @@ async def open_context_from_portal(
portal: Portal,
func: Callable,
pld_spec: TypeAlias|None = None,
allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will
@ -2026,7 +2061,7 @@ async def open_context_from_portal(
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if portal.channel.uid == portal.actor.uid:
if (uid := portal.channel.uid) == portal.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
@ -2054,32 +2089,45 @@ async def open_context_from_portal(
assert ctx._caller_info
_ctxvar_Context.set(ctx)
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
first: Any = await ctx._pld_rx.recv_pld(
ctx=ctx,
expect_msg=Started,
)
ctx._started_called: bool = True
uid: tuple = portal.channel.uid
cid: str = ctx.cid
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with trio.open_nursery() as nurse:
async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=pld_spec,
) as maybe_msgdec,
):
if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec
# NOTE: used to start overrun queuing tasks
ctx._scope_nursery: trio.Nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
)
# from .devx import pause
# await pause()
ctx._started_called: bool = True
ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
@ -2126,13 +2174,13 @@ async def open_context_from_portal(
# when in allow_overruns mode there may be
# lingering overflow sender tasks remaining?
if nurse.child_tasks:
if tn.child_tasks:
# XXX: ensure we are in overrun state
# with ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery!
if (
not ctx._allow_overruns
or len(nurse.child_tasks) > 1
or len(tn.child_tasks) > 1
):
raise InternalError(
'Context has sub-tasks but is '
@ -2304,8 +2352,8 @@ async def open_context_from_portal(
):
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
f'task: {ctx.cid}\n'
f'actor: {uid}'
)
raise # duh
@ -2455,9 +2503,8 @@ async def open_context_from_portal(
and ctx.cancel_acked
):
log.cancel(
'Context cancelled by {ctx.side!r}-side task\n'
f'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
)
@ -2485,7 +2532,7 @@ async def open_context_from_portal(
f'cid: {ctx.cid}\n'
)
portal.actor._contexts.pop(
(uid, cid),
(uid, ctx.cid),
None,
)
@ -2513,11 +2560,12 @@ def mk_context(
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
from .devx._code import find_caller_info
from .devx._frame_stack import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
# TODO: when/how do we apply `.limit_plds()` from here?
pld_rx: msgops.PldRx = msgops.current_pldrx()
pld_rx = msgops.PldRx(
_pld_dec=msgops._def_any_pldec,
)
ctx = Context(
chan=chan,
@ -2531,13 +2579,16 @@ def mk_context(
_caller_info=caller_info,
**kwargs,
)
pld_rx._ctx = ctx
ctx._result = Unresolved
return ctx
# TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types
def context(func: Callable) -> Callable:
def context(
func: Callable,
) -> Callable:
'''
Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more

View File

@ -716,4 +716,5 @@ async def _connect_chan(
chan = Channel((host, port))
await chan.connect()
yield chan
await chan.aclose()
with trio.CancelScope(shield=True):
await chan.aclose()

View File

@ -47,6 +47,7 @@ from ._ipc import Channel
from .log import get_logger
from .msg import (
# Error,
PayloadMsg,
NamespacePath,
Return,
)
@ -98,7 +99,8 @@ class Portal:
self.chan = channel
# during the portal's lifetime
self._final_result: Any|None = None
self._final_result_pld: Any|None = None
self._final_result_msg: PayloadMsg|None = None
# When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some
@ -132,7 +134,7 @@ class Portal:
'A pending main result has already been submitted'
)
self._expect_result_ctx = await self.actor.start_remote_task(
self._expect_result_ctx: Context = await self.actor.start_remote_task(
self.channel,
nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs,
@ -163,13 +165,22 @@ class Portal:
# expecting a "main" result
assert self._expect_result_ctx
if self._final_result is None:
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
ctx=self._expect_result_ctx,
expect_msg=Return,
)
if self._final_result_msg is None:
try:
(
self._final_result_msg,
self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
ipc=self._expect_result_ctx,
expect_msg=Return,
)
except BaseException as err:
# TODO: wrap this into `@api_frame` optionally with
# some kinda filtering mechanism like log levels?
__tracebackhide__: bool = False
raise err
return self._final_result
return self._final_result_pld
async def _cancel_streams(self):
# terminate all locally running async generator
@ -301,7 +312,7 @@ class Portal:
portal=self,
)
return await ctx._pld_rx.recv_pld(
ctx=ctx,
ipc=ctx,
expect_msg=Return,
)
@ -320,6 +331,8 @@ class Portal:
remote rpc task or a local async generator instance.
'''
__runtimeframe__: int = 1 # noqa
if isinstance(func, str):
warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now"
@ -353,7 +366,7 @@ class Portal:
portal=self,
)
return await ctx._pld_rx.recv_pld(
ctx=ctx,
ipc=ctx,
expect_msg=Return,
)

View File

@ -18,7 +18,7 @@
Root actor runtime ignition(s).
'''
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as acm
from functools import partial
import importlib
import logging
@ -60,7 +60,7 @@ _default_lo_addrs: list[tuple[str, int]] = [(
logger = log.get_logger('tractor')
@asynccontextmanager
@acm
async def open_root_actor(
*,
@ -96,6 +96,7 @@ async def open_root_actor(
Runtime init entry point for ``tractor``.
'''
__tracebackhide__ = True
# TODO: stick this in a `@cm` defined in `devx._debug`?
#
# Override the global debugger hook to make it play nice with
@ -358,7 +359,11 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
entered: bool = await _debug._maybe_enter_pm(err)
import inspect
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if (
not entered

View File

@ -70,7 +70,6 @@ from .msg import (
from tractor.msg.types import (
CancelAck,
Error,
Msg,
MsgType,
Return,
Start,
@ -250,10 +249,17 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED,
) -> None:
# NOTE: we normally always hide this frame in call-stack tracebacks
# if the crash originated from an RPC task (since normally the
# user is only going to care about their own code not this
# internal runtime frame) and we DID NOT
# fail due to an IPC transport error!
__tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()`
rpc_err: BaseException|None = None
try:
yield # run RPC invoke body
@ -264,16 +270,7 @@ async def _errors_relayed_via_ipc(
BaseExceptionGroup,
KeyboardInterrupt,
) as err:
# NOTE: always hide this frame from debug REPL call stack
# if the crash originated from an RPC task and we DID NOT
# fail due to an IPC transport error!
if (
is_rpc
and
chan.connected()
):
__tracebackhide__: bool = hide_tb
rpc_err = err
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
@ -318,11 +315,19 @@ async def _errors_relayed_via_ipc(
api_frame=inspect.currentframe(),
)
if not entered_debug:
# if we prolly should have entered the REPL but
# didn't, maybe there was an internal error in
# the above code and we do want to show this
# frame!
if _state.debug_mode():
__tracebackhide__: bool = False
log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc:
@ -355,6 +360,20 @@ async def _errors_relayed_via_ipc(
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
# if the error is not from user code and instead a failure
# of a runtime RPC or transport failure we do prolly want to
# show this frame
if (
rpc_err
and (
not is_rpc
or
not chan.connected()
)
):
__tracebackhide__: bool = False
try:
ctx: Context
func: Callable
@ -444,9 +463,10 @@ async def _invoke(
# open the stream with this option.
# allow_overruns=True,
)
context: bool = False
context_ep_func: bool = False
assert not _state._ctxvar_Context.get()
# set the current IPC ctx var for this RPC task
_state._ctxvar_Context.set(ctx)
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
@ -475,7 +495,7 @@ async def _invoke(
# handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False):
kwargs['ctx'] = ctx
context = True
context_ep_func = True
# errors raised inside this block are propgated back to caller
async with _errors_relayed_via_ipc(
@ -501,7 +521,7 @@ async def _invoke(
raise
# TODO: impl all these cases in terms of the `Context` one!
if not context:
if not context_ep_func:
await _invoke_non_context(
actor,
cancel_scope,
@ -571,7 +591,6 @@ async def _invoke(
async with trio.open_nursery() as tn:
ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx)
# TODO: should would be nice to have our
@ -831,7 +850,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ).
'''
assert actor._service_n # state sanity
assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
@ -844,7 +863,7 @@ async def process_messages(
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
nursery_cancelled_before_task: bool = False
msg: Msg|None = None
msg: MsgType|None = None
try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having

View File

@ -644,7 +644,7 @@ class Actor:
peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'|_ uid: {uid}\n'
f'uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
@ -678,10 +678,12 @@ class Actor:
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
pdb_user_uid
and local_nursery
(ctx_in_debug := pdb_lock.ctx_in_debug)
and
(pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
):
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
@ -1169,13 +1171,17 @@ class Actor:
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
dbcs = _debug.DebugStatus.req_cs
if dbcs is not None:
debug_req = _debug.DebugStatus
lock_req_ctx: Context = debug_req.req_ctx
if lock_req_ctx is not None:
msg += (
'-> Cancelling active debugger request..\n'
f'|_{_debug.Lock.pformat()}'
f'|_{_debug.Lock.repr()}\n\n'
f'|_{lock_req_ctx}\n\n'
)
dbcs.cancel()
# lock_req_ctx._scope.cancel()
# TODO: wrap this in a method-API..
debug_req.req_cs.cancel()
# self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks(
@ -1375,15 +1381,17 @@ class Actor:
"IPC channel's "
)
rent_chan_repr: str = (
f'|_{parent_chan}'
f' |_{parent_chan}\n\n'
if parent_chan
else ''
)
log.cancel(
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
f' {rent_chan_repr}\n'
# f'{self}\n'
f'Cancelling {descr} RPC tasks\n\n'
f'<= canceller: {req_uid}\n'
f'{rent_chan_repr}'
f'=> cancellee: {self.uid}\n'
f' |_{self}.cancel_rpc_tasks()\n'
f' |_tasks: {len(tasks)}\n'
# f'{tasks_str}'
)
for (
@ -1413,7 +1421,7 @@ class Actor:
if tasks:
log.cancel(
'Waiting for remaining rpc tasks to complete\n'
f'|_{tasks}'
f'|_{tasks_str}'
)
await self._ongoing_rpc_tasks.wait()
@ -1466,7 +1474,10 @@ class Actor:
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
'''
Return all IPC channels to the actor with provided `uid`.
@ -1626,7 +1637,9 @@ async def async_main(
# tranport address bind errors - normally it's
# something silly like the wrong socket-address
# passed via a config or CLI Bo
entered_debug = await _debug._maybe_enter_pm(oserr)
entered_debug = await _debug._maybe_enter_pm(
oserr,
)
if entered_debug:
log.runtime('Exited debug REPL..')
raise

View File

@ -142,7 +142,9 @@ async def exhaust_portal(
'''
__tracebackhide__ = True
try:
log.debug(f"Waiting on final result from {actor.uid}")
log.debug(
f'Waiting on final result from {actor.uid}'
)
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
@ -195,7 +197,10 @@ async def cancel_on_completion(
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# an exception group and we still send out a cancel request
result: Any|Exception = await exhaust_portal(portal, actor)
result: Any|Exception = await exhaust_portal(
portal,
actor,
)
if isinstance(result, Exception):
errors[actor.uid]: Exception = result
log.cancel(
@ -503,14 +508,6 @@ async def trio_proc(
)
)
# await chan.send({
# '_parent_main_data': subactor._parent_main_data,
# 'enable_modules': subactor.enable_modules,
# 'reg_addrs': subactor.reg_addrs,
# 'bind_addrs': bind_addrs,
# '_runtime_vars': _runtime_vars,
# })
# track subactor in current nursery
curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
@ -554,8 +551,8 @@ async def trio_proc(
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
with trio.CancelScope(shield=True):
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.

View File

@ -124,9 +124,15 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
)
def current_ipc_ctx() -> Context:
def current_ipc_ctx(
error_on_not_set: bool = False,
) -> Context|None:
ctx: Context = _ctxvar_Context.get()
if not ctx:
if (
not ctx
and error_on_not_set
):
from ._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'

View File

@ -52,6 +52,7 @@ from tractor.msg import (
if TYPE_CHECKING:
from ._context import Context
from ._ipc import Channel
log = get_logger(__name__)
@ -65,10 +66,10 @@ log = get_logger(__name__)
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC ``Channel``.
values over an inter-actor IPC `Channel`.
This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``.
`Portal.open_stream_from()` or `Context.open_stream()`.
Termination rules:
@ -95,6 +96,22 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False
@property
def ctx(self) -> Context:
'''
This stream's IPC `Context` ref.
'''
return self._ctx
@property
def chan(self) -> Channel:
'''
Ref to the containing `Context`'s transport `Channel`.
'''
return self._ctx.chan
# TODO: could we make this a direct method bind to `PldRx`?
# -> receive_nowait = PldRx.recv_pld
# |_ means latter would have to accept `MsgStream`-as-`self`?
@ -109,7 +126,7 @@ class MsgStream(trio.abc.Channel):
):
ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait(
ctx=ctx,
ipc=self,
expect_msg=expect_msg,
)
@ -148,7 +165,7 @@ class MsgStream(trio.abc.Channel):
try:
ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ctx=ctx)
return await ctx._pld_rx.recv_pld(ipc=self)
# XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure

View File

@ -84,6 +84,7 @@ class ActorNursery:
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -105,6 +106,7 @@ class ActorNursery:
self._at_least_one_child_in_debug: bool = False
self.errors = errors
self.exited = trio.Event()
self._scope_error: BaseException|None = None
# NOTE: when no explicit call is made to
# `.open_root_actor()` by application code,
@ -117,7 +119,9 @@ class ActorNursery:
async def start_actor(
self,
name: str,
*,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None,
@ -125,6 +129,7 @@ class ActorNursery:
nursery: trio.Nursery|None = None,
debug_mode: bool|None = None,
infect_asyncio: bool = False,
) -> Portal:
'''
Start a (daemon) actor: an process that has no designated
@ -189,6 +194,13 @@ class ActorNursery:
)
)
# TODO: DEPRECATE THIS:
# -[ ] impl instead as a hilevel wrapper on
# top of a `@context` style invocation.
# |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side.
# -[ ] use @api_frame on the wrapper
async def run_in_actor(
self,
@ -221,7 +233,7 @@ class ActorNursery:
# use the explicit function name if not provided
name = fn.__name__
portal = await self.start_actor(
portal: Portal = await self.start_actor(
name,
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
@ -250,6 +262,7 @@ class ActorNursery:
)
return portal
# @api_frame
async def cancel(
self,
hard_kill: bool = False,
@ -347,8 +360,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay?
__tracebackhide__ = True
# normally don't need to show user by default
__tracebackhide__: bool = True
outer_err: BaseException|None = None
inner_err: BaseException|None = None
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}
@ -358,7 +374,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
# `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery:
@ -393,7 +409,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
)
an._join_procs.set()
except BaseException as inner_err:
except BaseException as _inner_err:
inner_err = _inner_err
errors[actor.uid] = inner_err
# If we error in the root but the debugger is
@ -471,8 +488,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
Exception,
BaseExceptionGroup,
trio.Cancelled
) as _outer_err:
outer_err = _outer_err
) as err:
an._scope_error = outer_err or inner_err
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
@ -487,7 +506,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
if an._children:
log.cancel(
'Actor-nursery cancelling due error type:\n'
f'{err}\n'
f'{outer_err}\n'
)
with trio.CancelScope(shield=True):
await an.cancel()
@ -514,11 +533,19 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
raise list(errors.values())[0]
# show frame on any (likely) internal error
if (
not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
# da_nursery scope end - nursery checkpoint
# final exit
@acm
# @api_frame
async def open_nursery(
**kwargs,
@ -538,6 +565,7 @@ async def open_nursery(
which cancellation scopes correspond to each spawned subactor set.
'''
__tracebackhide__: bool = True
implicit_runtime: bool = False
actor: Actor = current_actor(err_on_no_runtime=False)
an: ActorNursery|None = None
@ -588,6 +616,14 @@ async def open_nursery(
an.exited.set()
finally:
# show frame on any internal runtime-scope error
if (
an
and not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
msg: str = (
'Actor-nursery exited\n'
f'|_{an}\n'

File diff suppressed because it is too large Load Diff

View File

@ -20,11 +20,8 @@ as it pertains to improving the grok-ability of our runtime!
'''
from __future__ import annotations
from functools import partial
import inspect
# import msgspec
# from pprint import pformat
import textwrap
import traceback
from types import (
FrameType,
FunctionType,
@ -32,9 +29,8 @@ from types import (
# CodeType,
)
from typing import (
# Any,
Any,
Callable,
# TYPE_CHECKING,
Type,
)
@ -42,6 +38,7 @@ from tractor.msg import (
pretty_struct,
NamespacePath,
)
import wrapt
# TODO: yeah, i don't love this and we should prolly just
@ -83,6 +80,31 @@ def get_class_from_frame(fr: FrameType) -> (
return None
def get_ns_and_func_from_frame(
frame: FrameType,
) -> Callable:
'''
Return the corresponding function object reference from
a `FrameType`, and return it and it's parent namespace `dict`.
'''
ns: dict[str, Any]
# for a method, go up a frame and lookup the name in locals()
if '.' in (qualname := frame.f_code.co_qualname):
cls_name, _, func_name = qualname.partition('.')
ns = frame.f_back.f_locals[cls_name].__dict__
else:
func_name: str = frame.f_code.co_name
ns = frame.f_globals
return (
ns,
ns[func_name],
)
def func_ref_from_frame(
frame: FrameType,
) -> Callable:
@ -98,34 +120,63 @@ def func_ref_from_frame(
)
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct):
rt_fi: inspect.FrameInfo
call_frame: FrameType
# https://docs.python.org/dev/reference/datamodel.html#frame-objects
# https://docs.python.org/dev/library/inspect.html#the-interpreter-stack
_api_frame: FrameType
@property
def api_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.rt_fi.frame)
def api_frame(self) -> FrameType:
try:
self._api_frame.clear()
except RuntimeError:
# log.warning(
print(
f'Frame {self._api_frame} for {self.api_func} is still active!'
)
return self._api_frame
_api_func: Callable
@property
def api_func(self) -> Callable:
return self._api_func
_caller_frames_up: int|None = 1
_caller_frame: FrameType|None = None # cached after first stack scan
@property
def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func_ref
func: FunctionType = self.api_func
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
@property
def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
def caller_frame(self) -> FrameType:
# if not already cached, scan up stack explicitly by
# configured count.
if not self._caller_frame:
if self._caller_frames_up:
for _ in range(self._caller_frames_up):
caller_frame: FrameType|None = self.api_frame.f_back
if not caller_frame:
raise ValueError(
'No frame exists {self._caller_frames_up} up from\n'
f'{self.api_frame} @ {self.api_nsp}\n'
)
self._caller_frame = caller_frame
return self._caller_frame
@property
def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.caller_func_ref
func: FunctionType = self.api_func
if func:
return NamespacePath.from_ref(func)
@ -172,108 +223,66 @@ def find_caller_info(
call_frame = call_frame.f_back
return CallerInfo(
rt_fi=fi,
call_frame=call_frame,
_api_frame=rt_frame,
_api_func=func_ref_from_frame(rt_frame),
_caller_frames_up=go_up_iframes,
)
return None
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
# TODO: -[x] move all this into new `.devx._code`!
# -[ ] consider rename to _callstack?
# -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
def api_frame(
wrapped: Callable|None = None,
*,
caller_frames_up: int = 1,
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
) -> Callable:
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
# handle the decorator called WITHOUT () case,
# i.e. just @api_frame, NOT @api_frame(extra=<blah>)
if wrapped is None:
return partial(
api_frame,
caller_frames_up=caller_frames_up,
)
'''
if (
fields_str
and
field_prefix
@wrapt.decorator
async def wrapper(
wrapped: Callable,
instance: object,
args: tuple,
kwargs: dict,
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
# maybe cache the API frame for this call
global _frame2callerinfo_cache
this_frame: FrameType = inspect.currentframe()
api_frame: FrameType = this_frame.f_back
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
if not _frame2callerinfo_cache.get(api_frame):
_frame2callerinfo_cache[api_frame] = CallerInfo(
_api_frame=api_frame,
_api_func=wrapped,
_caller_frames_up=caller_frames_up,
)
tb_box: str = (
return wrapped(*args, **kwargs)
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str
# annotate the function as a "api function", meaning it is
# a function for which the function above it in the call stack should be
# non-`tractor` code aka "user code".
#
# in the global frame cache for easy lookup from a given
# func-instance
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
wrapped.__api_func__: bool = True
return wrapper(wrapped)

View File

@ -57,8 +57,8 @@ LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'CANCEL': 16,
'DEVX': 400,
'PDB': 500,
'DEVX': 600,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
@ -137,7 +137,7 @@ class StackLevelAdapter(LoggerAdapter):
"Developer experience" sub-sys statuses.
'''
return self.log(600, msg)
return self.log(400, msg)
def log(
self,
@ -202,8 +202,29 @@ class StackLevelAdapter(LoggerAdapter):
)
# TODO IDEA:
# -[ ] do per task-name and actor-name color coding
# -[ ] unique color per task-id and actor-uuid
def pformat_task_uid(
id_part: str = 'tail'
):
'''
Return `str`-ified unique for a `trio.Task` via a combo of its
`.name: str` and `id()` truncated output.
'''
task: trio.Task = trio.lowlevel.current_task()
tid: str = str(id(task))
if id_part == 'tail':
tid_part: str = tid[-6:]
else:
tid_part: str = tid[:6]
return f'{task.name}[{tid_part}]'
_conc_name_getters = {
'task': lambda: trio.lowlevel.current_task().name,
'task': pformat_task_uid,
'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6],
@ -211,7 +232,10 @@ _conc_name_getters = {
class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names"
'''
Dyanmic lookup for local actor and task names.
'''
_context_keys = (
'task',
'actor',

View File

@ -44,7 +44,7 @@ from ._codec import (
# )
from .types import (
Msg as Msg,
PayloadMsg as PayloadMsg,
Aid as Aid,
SpawnSpec as SpawnSpec,

View File

@ -432,7 +432,7 @@ class MsgCodec(Struct):
# ) -> Any|Struct:
# msg: Msg = codec.dec.decode(msg)
# msg: PayloadMsg = codec.dec.decode(msg)
# payload_tag: str = msg.header.payload_tag
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
# return payload_dec.decode(msg.pld)

View File

@ -22,10 +22,9 @@ operational helpers for processing transaction flows.
'''
from __future__ import annotations
from contextlib import (
# asynccontextmanager as acm,
asynccontextmanager as acm,
contextmanager as cm,
)
from contextvars import ContextVar
from typing import (
Any,
Type,
@ -50,6 +49,7 @@ from tractor._exceptions import (
_mk_msg_type_err,
pack_from_raise,
)
from tractor._state import current_ipc_ctx
from ._codec import (
mk_dec,
MsgDec,
@ -75,7 +75,7 @@ if TYPE_CHECKING:
log = get_logger(__name__)
_def_any_pldec: MsgDec = mk_dec()
_def_any_pldec: MsgDec[Any] = mk_dec()
class PldRx(Struct):
@ -104,15 +104,19 @@ class PldRx(Struct):
'''
# TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel
_pldec: MsgDec
_pld_dec: MsgDec
_ctx: Context|None = None
_ipc: Context|MsgStream|None = None
@property
def pld_dec(self) -> MsgDec:
return self._pldec
return self._pld_dec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm
def apply_to_ipc(
def wraps_ipc(
self,
ipc_prim: Context|MsgStream,
@ -140,49 +144,50 @@ class PldRx(Struct):
exit.
'''
orig_dec: MsgDec = self._pldec
orig_dec: MsgDec = self._pld_dec
limit_dec: MsgDec = mk_dec(spec=spec)
try:
self._pldec = limit_dec
self._pld_dec = limit_dec
yield limit_dec
finally:
self._pldec = orig_dec
self._pld_dec = orig_dec
@property
def dec(self) -> msgpack.Decoder:
return self._pldec.dec
return self._pld_dec.dec
def recv_pld_nowait(
self,
# TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream,
ctx: Context,
ipc: Context|MsgStream,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = False,
**dec_msg_kwargs,
) -> Any|Raw:
__tracebackhide__: bool = True
__tracebackhide__: bool = hide_tb
msg: MsgType = (
ipc_msg
or
# sync-rx msg from underlying IPC feeder (mem-)chan
ctx._rx_chan.receive_nowait()
ipc._rx_chan.receive_nowait()
)
return self.dec_msg(
msg,
ctx=ctx,
ipc=ipc,
expect_msg=expect_msg,
hide_tb=hide_tb,
**dec_msg_kwargs,
)
async def recv_pld(
self,
ctx: Context,
ipc: Context|MsgStream,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
@ -200,11 +205,11 @@ class PldRx(Struct):
or
# async-rx msg from underlying IPC feeder (mem-)chan
await ctx._rx_chan.receive()
await ipc._rx_chan.receive()
)
return self.dec_msg(
msg=msg,
ctx=ctx,
ipc=ipc,
expect_msg=expect_msg,
**dec_msg_kwargs,
)
@ -212,7 +217,7 @@ class PldRx(Struct):
def dec_msg(
self,
msg: MsgType,
ctx: Context,
ipc: Context|MsgStream,
expect_msg: Type[MsgType]|None,
raise_error: bool = True,
@ -225,6 +230,9 @@ class PldRx(Struct):
'''
__tracebackhide__: bool = hide_tb
_src_err = None
src_err: BaseException|None = None
match msg:
# payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code.
@ -234,7 +242,7 @@ class PldRx(Struct):
|Return(pld=pld) # termination phase
):
try:
pld: PayloadT = self._pldec.decode(pld)
pld: PayloadT = self._pld_dec.decode(pld)
log.runtime(
'Decoded msg payload\n\n'
f'{msg}\n\n'
@ -243,25 +251,30 @@ class PldRx(Struct):
)
return pld
# XXX pld-type failure
except ValidationError as src_err:
# XXX pld-value type failure
except ValidationError as valerr:
# pack mgterr into error-msg for
# reraise below; ensure remote-actor-err
# info is displayed nicely?
msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg,
codec=self.pld_dec,
src_validation_error=src_err,
src_validation_error=valerr,
is_invalid_payload=True,
)
msg: Error = pack_from_raise(
local_err=msgterr,
cid=msg.cid,
src_uid=ctx.chan.uid,
src_uid=ipc.chan.uid,
)
src_err = valerr
# XXX some other decoder specific failure?
# except TypeError as src_error:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise src_error
# ^-TODO-^ can remove?
# a runtime-internal RPC endpoint response.
# always passthrough since (internal) runtime
@ -299,6 +312,7 @@ class PldRx(Struct):
return src_err
case Stop(cid=cid):
ctx: Context = getattr(ipc, 'ctx', ipc)
message: str = (
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
f'{ctx.peer_side!r} peer ?\n'
@ -341,14 +355,21 @@ class PldRx(Struct):
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
#
# fallthrough and raise from `src_err`
_raise_from_unexpected_msg(
ctx=ctx,
msg=msg,
src_err=src_err,
log=log,
expect_msg=expect_msg,
hide_tb=hide_tb,
)
try:
_raise_from_unexpected_msg(
ctx=getattr(ipc, 'ctx', ipc),
msg=msg,
src_err=src_err,
log=log,
expect_msg=expect_msg,
hide_tb=hide_tb,
)
except UnboundLocalError:
# XXX if there's an internal lookup error in the above
# code (prolly on `src_err`) we want to show this frame
# in the tb!
__tracebackhide__: bool = False
raise
async def recv_msg_w_pld(
self,
@ -378,52 +399,13 @@ class PldRx(Struct):
# msg instance?
pld: PayloadT = self.dec_msg(
msg,
ctx=ipc,
ipc=ipc,
expect_msg=expect_msg,
**kwargs,
)
return msg, pld
# Always maintain a task-context-global `PldRx`
_def_pld_rx: PldRx = PldRx(
_pldec=_def_any_pldec,
)
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
'pld_rx',
default=_def_pld_rx,
)
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()
@cm
def limit_plds(
spec: Union[Type[Struct]],
@ -439,29 +421,55 @@ def limit_plds(
'''
__tracebackhide__: bool = True
try:
# sanity on orig settings
orig_pldrx: PldRx = current_pldrx()
orig_pldec: MsgDec = orig_pldrx.pld_dec
curr_ctx: Context = current_ipc_ctx()
rx: PldRx = curr_ctx._pld_rx
orig_pldec: MsgDec = rx.pld_dec
with orig_pldrx.limit_plds(
with rx.limit_plds(
spec=spec,
**kwargs,
) as pldec:
log.info(
log.runtime(
'Applying payload-decoder\n\n'
f'{pldec}\n'
)
yield pldec
finally:
log.info(
log.runtime(
'Reverted to previous payload-decoder\n\n'
f'{orig_pldec}\n'
)
assert (
(pldrx := current_pldrx()) is orig_pldrx
and
pldrx.pld_dec is orig_pldec
)
# sanity on orig settings
assert rx.pld_dec is orig_pldec
@acm
async def maybe_limit_plds(
ctx: Context,
spec: Union[Type[Struct]]|None = None,
**kwargs,
) -> MsgDec|None:
'''
Async compat maybe-payload type limiter.
Mostly for use inside other internal `@acm`s such that a separate
indent block isn't needed when an async one is already being
used.
'''
if spec is None:
yield None
return
# sanity on scoping
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
yield msgdec
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
async def drain_to_final_msg(
@ -543,21 +551,12 @@ async def drain_to_final_msg(
match msg:
# final result arrived!
case Return(
# cid=cid,
# pld=res,
):
# ctx._result: Any = res
ctx._result: Any = pld
case Return():
log.runtime(
'Context delivered final draining msg:\n'
f'{pretty_struct.pformat(msg)}'
)
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._rx_chan:
# await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right?
ctx._result: Any = pld
result_msg = msg
break
@ -664,24 +663,6 @@ async def drain_to_final_msg(
result_msg = msg
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=ctx._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is ctx._cancel_msg
# assert error.msgdata == ctx._remote_error.msgdata
# assert error.ipc_msg == ctx._remote_error.ipc_msg
# from .devx._debug import pause
# await pause()
# ctx._maybe_cancel_and_set_remote_error(error)
# ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise

View File

@ -56,8 +56,7 @@ log = get_logger('tractor.msgspec')
PayloadT = TypeVar('PayloadT')
# TODO: PayloadMsg
class Msg(
class PayloadMsg(
Struct,
Generic[PayloadT],
@ -110,6 +109,10 @@ class Msg(
pld: Raw
# TODO: complete rename
Msg = PayloadMsg
class Aid(
Struct,
tag=True,
@ -299,7 +302,7 @@ class StartAck(
class Started(
Msg,
PayloadMsg,
Generic[PayloadT],
):
'''
@ -313,12 +316,12 @@ class Started(
# TODO: instead of using our existing `Start`
# for this (as we did with the original `{'cmd': ..}` style)
# class Cancel(Msg):
# class Cancel:
# cid: str
class Yield(
Msg,
PayloadMsg,
Generic[PayloadT],
):
'''
@ -345,7 +348,7 @@ class Stop(
# TODO: is `Result` or `Out[come]` a better name?
class Return(
Msg,
PayloadMsg,
Generic[PayloadT],
):
'''
@ -357,7 +360,7 @@ class Return(
class CancelAck(
Msg,
PayloadMsg,
Generic[PayloadT],
):
'''
@ -463,14 +466,14 @@ def from_dict_msg(
# TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`?
# class Cancelled(Msg):
# class Cancelled(MsgType):
# cid: str
# TODO what about overruns?
# class Overrun(Msg):
# class Overrun(MsgType):
# cid: str
_runtime_msgs: list[Msg] = [
_runtime_msgs: list[Struct] = [
# identity handshake on first IPC `Channel` contact.
Aid,
@ -496,9 +499,9 @@ _runtime_msgs: list[Msg] = [
]
# the no-outcome-yet IAC (inter-actor-communication) sub-set which
# can be `Msg.pld` payload field type-limited by application code
# can be `PayloadMsg.pld` payload field type-limited by application code
# using `apply_codec()` and `limit_msg_spec()`.
_payload_msgs: list[Msg] = [
_payload_msgs: list[PayloadMsg] = [
# first <value> from `Context.started(<value>)`
Started,
@ -541,8 +544,8 @@ def mk_msg_spec(
] = 'indexed_generics',
) -> tuple[
Union[Type[Msg]],
list[Type[Msg]],
Union[MsgType],
list[MsgType],
]:
'''
Create a payload-(data-)type-parameterized IPC message specification.
@ -554,7 +557,7 @@ def mk_msg_spec(
determined by the input `payload_type_union: Union[Type]`.
'''
submsg_types: list[Type[Msg]] = Msg.__subclasses__()
submsg_types: list[MsgType] = Msg.__subclasses__()
bases: tuple = (
# XXX NOTE XXX the below generic-parameterization seems to
# be THE ONLY way to get this to work correctly in terms