Compare commits

..

No commits in common. "b22f7dcae042dae0a9d068021a76f2c818489d7d" and "343b7c971249b25480c6a59d8974856107e736b7" have entirely different histories.

25 changed files with 893 additions and 1318 deletions

View File

@ -1,11 +1,6 @@
import time
import trio
import tractor
from tractor import (
ActorNursery,
MsgStream,
Portal,
)
# this is the first 2 actors, streamer_1 and streamer_2
@ -17,18 +12,14 @@ async def stream_data(seed):
# this is the third actor; the aggregator
async def aggregate(seed):
'''
Ensure that the two streams we receive match but only stream
"""Ensure that the two streams we receive match but only stream
a single set of values to the parent.
'''
an: ActorNursery
async with tractor.open_nursery() as an:
portals: list[Portal] = []
"""
async with tractor.open_nursery() as nursery:
portals = []
for i in range(1, 3):
# fork/spawn call
portal = await an.start_actor(
# fork point
portal = await nursery.start_actor(
name=f'streamer_{i}',
enable_modules=[__name__],
)
@ -52,11 +43,7 @@ async def aggregate(seed):
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(
push_to_chan,
portal,
send_chan.clone(),
)
n.start_soon(push_to_chan, portal, send_chan.clone())
# close this local task's reference to send side
await send_chan.aclose()
@ -73,7 +60,7 @@ async def aggregate(seed):
print("FINISHED ITERATING in aggregator")
await an.cancel()
await nursery.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
@ -88,21 +75,18 @@ async def main() -> list[int]:
'''
# yes, a nursery which spawns `trio`-"actors" B)
an: ActorNursery
async with tractor.open_nursery(
loglevel='cancel',
debug_mode=True,
) as an:
nursery: tractor.ActorNursery
async with tractor.open_nursery() as nursery:
seed = int(1e3)
pre_start = time.time()
portal: Portal = await an.start_actor(
portal: tractor.Portal = await nursery.start_actor(
name='aggregator',
enable_modules=[__name__],
)
stream: MsgStream
stream: tractor.MsgStream
async with portal.open_stream_from(
aggregate,
seed=seed,
@ -111,12 +95,11 @@ async def main() -> list[int]:
start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream: list[int] = []
result_stream = []
async for value in stream:
result_stream.append(value)
cancelled: bool = await portal.cancel_actor()
assert cancelled
await portal.cancel_actor()
print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")

View File

@ -55,7 +55,6 @@ xontrib-vox = "^0.0.1"
optional = false
[tool.poetry.group.dev.dependencies]
pytest = "^8.2.0"
pexpect = "^4.9.0"
# only for xonsh as sh..
xontrib-vox = "^0.0.1"

View File

@ -97,7 +97,6 @@ def test_ipc_channel_break_during_stream(
examples_dir() / 'advanced_faults'
/ 'ipc_failure_during_stream.py',
root=examples_dir(),
consider_namespace_packages=False,
)
# by def we expect KBI from user after a simulated "hang

View File

@ -89,30 +89,17 @@ def test_remote_error(reg_addr, args_err):
assert excinfo.value.boxed_type == errtype
else:
# the root task will also error on the `Portal.result()`
# call so we expect an error from there AND the child.
# |_ tho seems like on new `trio` this doesn't always
# happen?
with pytest.raises((
BaseExceptionGroup,
tractor.RemoteActorError,
)) as excinfo:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors are `errtype`
err: BaseException = excinfo.value
if isinstance(err, BaseExceptionGroup):
suberrs: list[BaseException] = err.exceptions
else:
suberrs: list[BaseException] = [err]
for exc in suberrs:
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.boxed_type == errtype
def test_multierror(
reg_addr: tuple[str, int],
):
def test_multierror(reg_addr):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.

View File

@ -444,7 +444,6 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
infect_asyncio=True,
fan_out=fan_out,
)
# should raise RAE diectly
await portal.result()
trio.run(main)
@ -462,11 +461,12 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed error type
excinfo.value.boxed_type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr):
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
exit_early=True,
infect_asyncio=True,
)
# should raise RAE diectly
# should trigger remote actor error
await portal.result()
# should be a quiet exit on a simple channel exit
@ -492,17 +492,15 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
aio_raise_err=True,
infect_asyncio=True,
)
# should trigger RAE directly, not an eg.
# should trigger remote actor error
await portal.result()
with pytest.raises(
# NOTE: bc we directly wait on `Portal.result()` instead
# of capturing it inside the `ActorNursery` machinery.
expected_exception=RemoteActorError,
) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
excinfo.value.boxed_type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception
@tractor.context

View File

@ -55,10 +55,9 @@ from tractor._testing import (
@tractor.context
async def open_stream_then_sleep_forever(
async def sleep_forever(
ctx: Context,
expect_ctxc: bool = False,
) -> None:
'''
Sync the context, open a stream then just sleep.
@ -68,10 +67,6 @@ async def open_stream_then_sleep_forever(
'''
try:
await ctx.started()
# NOTE: the below means this child will send a `Stop`
# to it's parent-side task despite that side never
# opening a stream itself.
async with ctx.open_stream():
await trio.sleep_forever()
@ -105,7 +100,7 @@ async def error_before_started(
'''
async with tractor.wait_for_actor('sleeper') as p2:
async with (
p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first),
p2.open_context(sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(),
):
# NOTE: this WAS inside an @acm body but i factored it
@ -209,13 +204,9 @@ async def stream_ints(
@tractor.context
async def stream_from_peer(
ctx: Context,
debug_mode: bool,
peer_name: str = 'sleeper',
) -> None:
# sanity
assert tractor._state.debug_mode() == debug_mode
peer: Portal
try:
async with (
@ -249,54 +240,26 @@ async def stream_from_peer(
assert msg is not None
print(msg)
# NOTE: cancellation of the (sleeper) peer should always cause
# a `ContextCancelled` raise in this streaming actor.
except ContextCancelled as _ctxc:
ctxc = _ctxc
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxc:
ctxerr = ctxc
# print("TRYING TO ENTER PAUSSE!!!")
# await tractor.pause(shield=True)
re: ContextCancelled = peer_ctx._remote_error
assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
# XXX YES XXX, remote error should be unpacked only once!
assert (
re
is
peer_ctx.maybe_error
is
ctxc
is
peer_ctx._local_error
)
# NOTE: these errors should all match!
# ------ - ------
# XXX [2024-05-03] XXX
# ------ - ------
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
# where the `Error()` msg was directly raising the ctxc
# instead of just returning up to the caller inside
# `Context.return()` which would results in a diff instance of
# the same remote error bubbling out above vs what was
# already unpacked and set inside `Context.
assert (
peer_ctx._remote_error.msgdata
==
ctxc.msgdata
)
# ^-XXX-^ notice the data is of course the exact same.. so
# the above larger assert makes sense to also always be true!
# XXX YES, bc exact same msg instances
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
# XXX YES XXX, bc should be exact same msg instances
assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg
# XXX NO XXX, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
# XXX NO, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
# the peer ctx is the canceller even though it's canceller
# is the "canceller" XD
assert peer_name in peer_ctx.canceller
assert "canceller" in ctxc.canceller
assert "canceller" in ctxerr.canceller
# caller peer should not be the cancel requester
assert not ctx.cancel_called
@ -320,13 +283,12 @@ async def stream_from_peer(
# TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside
# `Portal.open_context().__aexit__()` BUT, if we had
# `.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last
# checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368
#
# await tractor.pause()
# assert 'canceller' in ctx.canceller
# root/parent actor task should NEVER HAVE cancelled us!
@ -430,13 +392,12 @@ def test_peer_canceller(
try:
async with (
sleeper.open_context(
open_stream_then_sleep_forever,
sleep_forever,
expect_ctxc=True,
) as (sleeper_ctx, sent),
just_caller.open_context(
stream_from_peer,
debug_mode=debug_mode,
) as (caller_ctx, sent),
canceller.open_context(
@ -462,11 +423,10 @@ def test_peer_canceller(
# should always raise since this root task does
# not request the sleeper cancellation ;)
except ContextCancelled as _ctxc:
ctxc = _ctxc
except ContextCancelled as ctxerr:
print(
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
f'{ctxc}\n'
f'{ctxerr}\n'
)
# canceller and caller peers should not
@ -477,7 +437,7 @@ def test_peer_canceller(
# we were not the actor, our peer was
assert not sleeper_ctx.cancel_acked
assert ctxc.canceller[0] == 'canceller'
assert ctxerr.canceller[0] == 'canceller'
# XXX NOTE XXX: since THIS `ContextCancelled`
# HAS NOT YET bubbled up to the
@ -488,7 +448,7 @@ def test_peer_canceller(
# CASE_1: error-during-ctxc-handling,
if error_during_ctxerr_handling:
raise RuntimeError('Simulated RTE re-raise during ctxc handling')
raise RuntimeError('Simulated error during teardown')
# CASE_2: standard teardown inside in `.open_context()` block
raise
@ -553,9 +513,6 @@ def test_peer_canceller(
# should be cancelled by US.
#
if error_during_ctxerr_handling:
print(f'loc_err: {_loc_err}\n')
assert isinstance(loc_err, RuntimeError)
# since we do a rte reraise above, the
# `.open_context()` error handling should have
# raised a local rte, thus the internal
@ -564,6 +521,9 @@ def test_peer_canceller(
# a `trio.Cancelled` due to a local
# `._scope.cancel()` call.
assert not sleeper_ctx._scope.cancelled_caught
assert isinstance(loc_err, RuntimeError)
print(f'_loc_err: {_loc_err}\n')
# assert sleeper_ctx._local_error is _loc_err
# assert sleeper_ctx._local_error is _loc_err
assert not (
@ -600,13 +560,10 @@ def test_peer_canceller(
else: # the other 2 ctxs
assert (
isinstance(re, ContextCancelled)
and (
re.canceller
==
canceller.channel.uid
)
)
# since the sleeper errors while handling a
# peer-cancelled (by ctxc) scenario, we expect
@ -854,7 +811,8 @@ async def serve_subactors(
async with open_nursery() as an:
# sanity
assert tractor._state.debug_mode() == debug_mode
if debug_mode:
assert tractor._state.debug_mode()
await ctx.started(peer_name)
async with ctx.open_stream() as ipc:
@ -1133,6 +1091,7 @@ def test_peer_spawns_and_cancels_service_subactor(
'-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n'
)
# else:
try:
res = await client_ctx.result(hide_tb=False)

View File

@ -2,9 +2,7 @@
Spawning basics
"""
from typing import (
Any,
)
from typing import Optional
import pytest
import trio
@ -27,11 +25,13 @@ async def spawn(
async with tractor.open_root_actor(
arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter
data = data_to_pass_down
if actor.is_arbiter:
async with tractor.open_nursery() as nursery:
# forks here
@ -95,9 +95,7 @@ async def test_movie_theatre_convo(start_method):
await portal.cancel_actor()
async def cellar_door(
return_value: str|None,
):
async def cellar_door(return_value: Optional[str]):
return return_value
@ -107,18 +105,16 @@ async def cellar_door(
)
@tractor_test
async def test_most_beautiful_word(
start_method: str,
return_value: Any,
debug_mode: bool,
start_method,
return_value
):
'''
The main ``tractor`` routine.
'''
with trio.fail_after(1):
async with tractor.open_nursery(
debug_mode=debug_mode,
) as n:
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
cellar_door,
return_value=return_value,

View File

@ -42,7 +42,6 @@ from ._supervise import (
from ._state import (
current_actor as current_actor,
is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx,
)
from ._exceptions import (
ContextCancelled as ContextCancelled,

View File

@ -41,7 +41,6 @@ from typing import (
Callable,
Mapping,
Type,
TypeAlias,
TYPE_CHECKING,
Union,
)
@ -95,7 +94,7 @@ if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
from ._ipc import MsgTransport
from .devx._frame_stack import (
from .devx._code import (
CallerInfo,
)
@ -156,41 +155,6 @@ class Context:
# payload receiver
_pld_rx: msgops.PldRx
@property
def pld_rx(self) -> msgops.PldRx:
'''
The current `tractor.Context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER its container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been
delivered up from `tractor`'s transport layer but BEFORE the
data is yielded to `tractor` application code.
The "IPC-primitive API" is normally one of a `Context` (this)` or a `MsgStream`
or some higher level API using one of them.
For ex. `pld_data: PayloadT = MsgStream.receive()` implicitly
calls into the stream's parent `Context.pld_rx.recv_pld().` to
receive the latest `PayloadMsg.pld` value.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by the
interchange backend.
- for `msgspec` see <PUTLINKHERE>.
Note that the `PldRx` itself is a per-`Context` instance that
normally only changes when some (sub-)task, on a given "side"
of the IPC ctx (either a "child"-side RPC or inside
a "parent"-side `Portal.open_context()` block), modifies it
using the `.msg._ops.limit_plds()` API.
'''
return self._pld_rx
# full "namespace-path" to target RPC function
_nsf: NamespacePath
@ -267,8 +231,6 @@ class Context:
# init and streaming state
_started_called: bool = False
_started_msg: MsgType|None = None
_started_pld: Any = None
_stream_opened: bool = False
_stream: MsgStream|None = None
@ -661,7 +623,7 @@ class Context:
log.runtime(
'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}: {self._actor.uid}\n\n'
f'=> {self.side!r}\n\n'
f'{error}'
)
self._remote_error: BaseException = error
@ -716,7 +678,7 @@ class Context:
log.error(
f'Remote context error:\n\n'
# f'{pformat(self)}\n'
f'{error}'
f'{error}\n'
)
if self._canceller is None:
@ -762,10 +724,8 @@ class Context:
)
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
# mk_pdb().set_trace()
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
if (
cs
and
@ -845,7 +805,6 @@ class Context:
# f'{ci.api_nsp}()\n'
# )
# TODO: use `.dev._frame_stack` scanning to find caller!
return 'Portal.open_context()'
async def cancel(
@ -1345,6 +1304,17 @@ class Context:
ctx=self,
hide_tb=hide_tb,
)
for msg in drained_msgs:
# TODO: mask this by default..
if isinstance(msg, Return):
# from .devx import pause
# await pause()
# raise InternalError(
log.warning(
'Final `return` msg should never be drained !?!?\n\n'
f'{msg}\n'
)
drained_status: str = (
'Ctx drained to final outcome msg\n\n'
@ -1465,10 +1435,6 @@ class Context:
self._result
)
@property
def has_outcome(self) -> bool:
return bool(self.maybe_error) or self._final_result_is_set()
# @property
def repr_outcome(
self,
@ -1671,6 +1637,8 @@ class Context:
)
if rt_started != started_msg:
# TODO: break these methods out from the struct subtype?
# TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__(
rt_started,
@ -1706,8 +1674,6 @@ class Context:
) from verr
self._started_called = True
self._started_msg = started_msg
self._started_pld = value
async def _drain_overflows(
self,
@ -1995,7 +1961,6 @@ async def open_context_from_portal(
portal: Portal,
func: Callable,
pld_spec: TypeAlias|None = None,
allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will
@ -2061,7 +2026,7 @@ async def open_context_from_portal(
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if (uid := portal.channel.uid) == portal.actor.uid:
if portal.channel.uid == portal.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
@ -2089,21 +2054,6 @@ async def open_context_from_portal(
assert ctx._caller_info
_ctxvar_Context.set(ctx)
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=pld_spec,
) as maybe_msgdec,
):
if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
@ -2111,23 +2061,25 @@ async def open_context_from_portal(
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
first: Any = await ctx._pld_rx.recv_pld(
ctx=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
)
# from .devx import pause
# await pause()
ctx._started_called: bool = True
ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
uid: tuple = portal.channel.uid
cid: str = ctx.cid
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with trio.open_nursery() as nurse:
# NOTE: used to start overrun queuing tasks
ctx._scope_nursery: trio.Nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
@ -2174,13 +2126,13 @@ async def open_context_from_portal(
# when in allow_overruns mode there may be
# lingering overflow sender tasks remaining?
if tn.child_tasks:
if nurse.child_tasks:
# XXX: ensure we are in overrun state
# with ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery!
if (
not ctx._allow_overruns
or len(tn.child_tasks) > 1
or len(nurse.child_tasks) > 1
):
raise InternalError(
'Context has sub-tasks but is '
@ -2352,7 +2304,7 @@ async def open_context_from_portal(
):
log.warning(
'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n'
f'task:{cid}\n'
f'actor:{uid}'
)
@ -2503,8 +2455,9 @@ async def open_context_from_portal(
and ctx.cancel_acked
):
log.cancel(
f'Context cancelled by {ctx.side!r}-side task\n'
'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
)
@ -2532,7 +2485,7 @@ async def open_context_from_portal(
f'cid: {ctx.cid}\n'
)
portal.actor._contexts.pop(
(uid, ctx.cid),
(uid, cid),
None,
)
@ -2560,12 +2513,11 @@ def mk_context(
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
from .devx._frame_stack import find_caller_info
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
pld_rx = msgops.PldRx(
_pld_dec=msgops._def_any_pldec,
)
# TODO: when/how do we apply `.limit_plds()` from here?
pld_rx: msgops.PldRx = msgops.current_pldrx()
ctx = Context(
chan=chan,
@ -2579,16 +2531,13 @@ def mk_context(
_caller_info=caller_info,
**kwargs,
)
pld_rx._ctx = ctx
ctx._result = Unresolved
return ctx
# TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types
def context(
func: Callable,
) -> Callable:
def context(func: Callable) -> Callable:
'''
Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more

View File

@ -716,5 +716,4 @@ async def _connect_chan(
chan = Channel((host, port))
await chan.connect()
yield chan
with trio.CancelScope(shield=True):
await chan.aclose()

View File

@ -47,7 +47,6 @@ from ._ipc import Channel
from .log import get_logger
from .msg import (
# Error,
PayloadMsg,
NamespacePath,
Return,
)
@ -99,8 +98,7 @@ class Portal:
self.chan = channel
# during the portal's lifetime
self._final_result_pld: Any|None = None
self._final_result_msg: PayloadMsg|None = None
self._final_result: Any|None = None
# When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some
@ -134,7 +132,7 @@ class Portal:
'A pending main result has already been submitted'
)
self._expect_result_ctx: Context = await self.actor.start_remote_task(
self._expect_result_ctx = await self.actor.start_remote_task(
self.channel,
nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs,
@ -165,22 +163,13 @@ class Portal:
# expecting a "main" result
assert self._expect_result_ctx
if self._final_result_msg is None:
try:
(
self._final_result_msg,
self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
ipc=self._expect_result_ctx,
if self._final_result is None:
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
ctx=self._expect_result_ctx,
expect_msg=Return,
)
except BaseException as err:
# TODO: wrap this into `@api_frame` optionally with
# some kinda filtering mechanism like log levels?
__tracebackhide__: bool = False
raise err
return self._final_result_pld
return self._final_result
async def _cancel_streams(self):
# terminate all locally running async generator
@ -312,7 +301,7 @@ class Portal:
portal=self,
)
return await ctx._pld_rx.recv_pld(
ipc=ctx,
ctx=ctx,
expect_msg=Return,
)
@ -331,8 +320,6 @@ class Portal:
remote rpc task or a local async generator instance.
'''
__runtimeframe__: int = 1 # noqa
if isinstance(func, str):
warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now"
@ -366,7 +353,7 @@ class Portal:
portal=self,
)
return await ctx._pld_rx.recv_pld(
ipc=ctx,
ctx=ctx,
expect_msg=Return,
)

View File

@ -18,7 +18,7 @@
Root actor runtime ignition(s).
'''
from contextlib import asynccontextmanager as acm
from contextlib import asynccontextmanager
from functools import partial
import importlib
import logging
@ -60,7 +60,7 @@ _default_lo_addrs: list[tuple[str, int]] = [(
logger = log.get_logger('tractor')
@acm
@asynccontextmanager
async def open_root_actor(
*,
@ -96,7 +96,6 @@ async def open_root_actor(
Runtime init entry point for ``tractor``.
'''
__tracebackhide__ = True
# TODO: stick this in a `@cm` defined in `devx._debug`?
#
# Override the global debugger hook to make it play nice with
@ -359,11 +358,7 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
import inspect
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
entered: bool = await _debug._maybe_enter_pm(err)
if (
not entered

View File

@ -70,6 +70,7 @@ from .msg import (
from tractor.msg.types import (
CancelAck,
Error,
Msg,
MsgType,
Return,
Start,
@ -249,17 +250,10 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED,
) -> None:
# NOTE: we normally always hide this frame in call-stack tracebacks
# if the crash originated from an RPC task (since normally the
# user is only going to care about their own code not this
# internal runtime frame) and we DID NOT
# fail due to an IPC transport error!
__tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()`
rpc_err: BaseException|None = None
try:
yield # run RPC invoke body
@ -270,7 +264,16 @@ async def _errors_relayed_via_ipc(
BaseExceptionGroup,
KeyboardInterrupt,
) as err:
rpc_err = err
# NOTE: always hide this frame from debug REPL call stack
# if the crash originated from an RPC task and we DID NOT
# fail due to an IPC transport error!
if (
is_rpc
and
chan.connected()
):
__tracebackhide__: bool = hide_tb
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
@ -315,19 +318,11 @@ async def _errors_relayed_via_ipc(
api_frame=inspect.currentframe(),
)
if not entered_debug:
# if we prolly should have entered the REPL but
# didn't, maybe there was an internal error in
# the above code and we do want to show this
# frame!
if _state.debug_mode():
__tracebackhide__: bool = False
log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc:
@ -360,20 +355,6 @@ async def _errors_relayed_via_ipc(
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
# if the error is not from user code and instead a failure
# of a runtime RPC or transport failure we do prolly want to
# show this frame
if (
rpc_err
and (
not is_rpc
or
not chan.connected()
)
):
__tracebackhide__: bool = False
try:
ctx: Context
func: Callable
@ -463,10 +444,9 @@ async def _invoke(
# open the stream with this option.
# allow_overruns=True,
)
context_ep_func: bool = False
context: bool = False
# set the current IPC ctx var for this RPC task
_state._ctxvar_Context.set(ctx)
assert not _state._ctxvar_Context.get()
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
@ -495,7 +475,7 @@ async def _invoke(
# handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False):
kwargs['ctx'] = ctx
context_ep_func = True
context = True
# errors raised inside this block are propgated back to caller
async with _errors_relayed_via_ipc(
@ -521,7 +501,7 @@ async def _invoke(
raise
# TODO: impl all these cases in terms of the `Context` one!
if not context_ep_func:
if not context:
await _invoke_non_context(
actor,
cancel_scope,
@ -591,6 +571,7 @@ async def _invoke(
async with trio.open_nursery() as tn:
ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx)
# TODO: should would be nice to have our
@ -850,7 +831,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ).
'''
assert actor._service_n # runtime state sanity
assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
@ -863,7 +844,7 @@ async def process_messages(
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
nursery_cancelled_before_task: bool = False
msg: MsgType|None = None
msg: Msg|None = None
try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having

View File

@ -644,7 +644,7 @@ class Actor:
peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'uid: {uid}\n'
f'|_ uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
@ -678,12 +678,10 @@ class Actor:
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
(ctx_in_debug := pdb_lock.ctx_in_debug)
and
(pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
pdb_user_uid
and local_nursery
):
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
@ -1171,17 +1169,13 @@ class Actor:
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
debug_req = _debug.DebugStatus
lock_req_ctx: Context = debug_req.req_ctx
if lock_req_ctx is not None:
dbcs = _debug.DebugStatus.req_cs
if dbcs is not None:
msg += (
'-> Cancelling active debugger request..\n'
f'|_{_debug.Lock.repr()}\n\n'
f'|_{lock_req_ctx}\n\n'
f'|_{_debug.Lock.pformat()}'
)
# lock_req_ctx._scope.cancel()
# TODO: wrap this in a method-API..
debug_req.req_cs.cancel()
dbcs.cancel()
# self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks(
@ -1381,17 +1375,15 @@ class Actor:
"IPC channel's "
)
rent_chan_repr: str = (
f' |_{parent_chan}\n\n'
f'|_{parent_chan}'
if parent_chan
else ''
)
log.cancel(
f'Cancelling {descr} RPC tasks\n\n'
f'<= canceller: {req_uid}\n'
f'{rent_chan_repr}'
f'=> cancellee: {self.uid}\n'
f' |_{self}.cancel_rpc_tasks()\n'
f' |_tasks: {len(tasks)}\n'
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
f' {rent_chan_repr}\n'
# f'{self}\n'
# f'{tasks_str}'
)
for (
@ -1421,7 +1413,7 @@ class Actor:
if tasks:
log.cancel(
'Waiting for remaining rpc tasks to complete\n'
f'|_{tasks_str}'
f'|_{tasks}'
)
await self._ongoing_rpc_tasks.wait()
@ -1474,10 +1466,7 @@ class Actor:
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
'''
Return all IPC channels to the actor with provided `uid`.
@ -1637,9 +1626,7 @@ async def async_main(
# tranport address bind errors - normally it's
# something silly like the wrong socket-address
# passed via a config or CLI Bo
entered_debug = await _debug._maybe_enter_pm(
oserr,
)
entered_debug = await _debug._maybe_enter_pm(oserr)
if entered_debug:
log.runtime('Exited debug REPL..')
raise

View File

@ -142,9 +142,7 @@ async def exhaust_portal(
'''
__tracebackhide__ = True
try:
log.debug(
f'Waiting on final result from {actor.uid}'
)
log.debug(f"Waiting on final result from {actor.uid}")
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
@ -197,10 +195,7 @@ async def cancel_on_completion(
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# an exception group and we still send out a cancel request
result: Any|Exception = await exhaust_portal(
portal,
actor,
)
result: Any|Exception = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid]: Exception = result
log.cancel(
@ -508,6 +503,14 @@ async def trio_proc(
)
)
# await chan.send({
# '_parent_main_data': subactor._parent_main_data,
# 'enable_modules': subactor.enable_modules,
# 'reg_addrs': subactor.reg_addrs,
# 'bind_addrs': bind_addrs,
# '_runtime_vars': _runtime_vars,
# })
# track subactor in current nursery
curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
@ -551,8 +554,8 @@ async def trio_proc(
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.

View File

@ -124,15 +124,9 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
)
def current_ipc_ctx(
error_on_not_set: bool = False,
) -> Context|None:
def current_ipc_ctx() -> Context:
ctx: Context = _ctxvar_Context.get()
if (
not ctx
and error_on_not_set
):
if not ctx:
from ._exceptions import InternalError
raise InternalError(
'No IPC context has been allocated for this task yet?\n'

View File

@ -52,7 +52,6 @@ from tractor.msg import (
if TYPE_CHECKING:
from ._context import Context
from ._ipc import Channel
log = get_logger(__name__)
@ -66,10 +65,10 @@ log = get_logger(__name__)
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC `Channel`.
values over an inter-actor IPC ``Channel``.
This is the type returned to a local task which entered either
`Portal.open_stream_from()` or `Context.open_stream()`.
``Portal.open_stream_from()`` or ``Context.open_stream()``.
Termination rules:
@ -96,22 +95,6 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False
@property
def ctx(self) -> Context:
'''
This stream's IPC `Context` ref.
'''
return self._ctx
@property
def chan(self) -> Channel:
'''
Ref to the containing `Context`'s transport `Channel`.
'''
return self._ctx.chan
# TODO: could we make this a direct method bind to `PldRx`?
# -> receive_nowait = PldRx.recv_pld
# |_ means latter would have to accept `MsgStream`-as-`self`?
@ -126,7 +109,7 @@ class MsgStream(trio.abc.Channel):
):
ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait(
ipc=self,
ctx=ctx,
expect_msg=expect_msg,
)
@ -165,7 +148,7 @@ class MsgStream(trio.abc.Channel):
try:
ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self)
return await ctx._pld_rx.recv_pld(ctx=ctx)
# XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure

View File

@ -84,7 +84,6 @@ class ActorNursery:
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -106,7 +105,6 @@ class ActorNursery:
self._at_least_one_child_in_debug: bool = False
self.errors = errors
self.exited = trio.Event()
self._scope_error: BaseException|None = None
# NOTE: when no explicit call is made to
# `.open_root_actor()` by application code,
@ -119,9 +117,7 @@ class ActorNursery:
async def start_actor(
self,
name: str,
*,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None,
@ -129,7 +125,6 @@ class ActorNursery:
nursery: trio.Nursery|None = None,
debug_mode: bool|None = None,
infect_asyncio: bool = False,
) -> Portal:
'''
Start a (daemon) actor: an process that has no designated
@ -194,13 +189,6 @@ class ActorNursery:
)
)
# TODO: DEPRECATE THIS:
# -[ ] impl instead as a hilevel wrapper on
# top of a `@context` style invocation.
# |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side.
# -[ ] use @api_frame on the wrapper
async def run_in_actor(
self,
@ -233,7 +221,7 @@ class ActorNursery:
# use the explicit function name if not provided
name = fn.__name__
portal: Portal = await self.start_actor(
portal = await self.start_actor(
name,
enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
@ -262,7 +250,6 @@ class ActorNursery:
)
return portal
# @api_frame
async def cancel(
self,
hard_kill: bool = False,
@ -360,11 +347,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]:
# normally don't need to show user by default
__tracebackhide__: bool = True
outer_err: BaseException|None = None
inner_err: BaseException|None = None
# TODO: yay or nay?
__tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}
@ -374,7 +358,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# `ActorNursery.start_actor()`).
# ``ActorNursery.start_actor()``).
# errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery:
@ -409,8 +393,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
)
an._join_procs.set()
except BaseException as _inner_err:
inner_err = _inner_err
except BaseException as inner_err:
errors[actor.uid] = inner_err
# If we error in the root but the debugger is
@ -488,10 +471,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
Exception,
BaseExceptionGroup,
trio.Cancelled
) as _outer_err:
outer_err = _outer_err
an._scope_error = outer_err or inner_err
) as err:
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
@ -506,7 +487,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
if an._children:
log.cancel(
'Actor-nursery cancelling due error type:\n'
f'{outer_err}\n'
f'{err}\n'
)
with trio.CancelScope(shield=True):
await an.cancel()
@ -533,19 +514,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
raise list(errors.values())[0]
# show frame on any (likely) internal error
if (
not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
# da_nursery scope end - nursery checkpoint
# final exit
@acm
# @api_frame
async def open_nursery(
**kwargs,
@ -565,7 +538,6 @@ async def open_nursery(
which cancellation scopes correspond to each spawned subactor set.
'''
__tracebackhide__: bool = True
implicit_runtime: bool = False
actor: Actor = current_actor(err_on_no_runtime=False)
an: ActorNursery|None = None
@ -616,14 +588,6 @@ async def open_nursery(
an.exited.set()
finally:
# show frame on any internal runtime-scope error
if (
an
and not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
msg: str = (
'Actor-nursery exited\n'
f'|_{an}\n'

View File

@ -20,8 +20,11 @@ as it pertains to improving the grok-ability of our runtime!
'''
from __future__ import annotations
from functools import partial
import inspect
# import msgspec
# from pprint import pformat
import textwrap
import traceback
from types import (
FrameType,
FunctionType,
@ -29,8 +32,9 @@ from types import (
# CodeType,
)
from typing import (
Any,
# Any,
Callable,
# TYPE_CHECKING,
Type,
)
@ -38,7 +42,6 @@ from tractor.msg import (
pretty_struct,
NamespacePath,
)
import wrapt
# TODO: yeah, i don't love this and we should prolly just
@ -80,31 +83,6 @@ def get_class_from_frame(fr: FrameType) -> (
return None
def get_ns_and_func_from_frame(
frame: FrameType,
) -> Callable:
'''
Return the corresponding function object reference from
a `FrameType`, and return it and it's parent namespace `dict`.
'''
ns: dict[str, Any]
# for a method, go up a frame and lookup the name in locals()
if '.' in (qualname := frame.f_code.co_qualname):
cls_name, _, func_name = qualname.partition('.')
ns = frame.f_back.f_locals[cls_name].__dict__
else:
func_name: str = frame.f_code.co_name
ns = frame.f_globals
return (
ns,
ns[func_name],
)
def func_ref_from_frame(
frame: FrameType,
) -> Callable:
@ -120,63 +98,34 @@ def func_ref_from_frame(
)
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct):
# https://docs.python.org/dev/reference/datamodel.html#frame-objects
# https://docs.python.org/dev/library/inspect.html#the-interpreter-stack
_api_frame: FrameType
rt_fi: inspect.FrameInfo
call_frame: FrameType
@property
def api_frame(self) -> FrameType:
try:
self._api_frame.clear()
except RuntimeError:
# log.warning(
print(
f'Frame {self._api_frame} for {self.api_func} is still active!'
)
return self._api_frame
_api_func: Callable
@property
def api_func(self) -> Callable:
return self._api_func
_caller_frames_up: int|None = 1
_caller_frame: FrameType|None = None # cached after first stack scan
def api_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.rt_fi.frame)
@property
def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func
func: FunctionType = self.api_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
@property
def caller_frame(self) -> FrameType:
# if not already cached, scan up stack explicitly by
# configured count.
if not self._caller_frame:
if self._caller_frames_up:
for _ in range(self._caller_frames_up):
caller_frame: FrameType|None = self.api_frame.f_back
if not caller_frame:
raise ValueError(
'No frame exists {self._caller_frames_up} up from\n'
f'{self.api_frame} @ {self.api_nsp}\n'
)
self._caller_frame = caller_frame
return self._caller_frame
def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
@property
def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func
func: FunctionType = self.caller_func_ref
if func:
return NamespacePath.from_ref(func)
@ -223,66 +172,108 @@ def find_caller_info(
call_frame = call_frame.f_back
return CallerInfo(
_api_frame=rt_frame,
_api_func=func_ref_from_frame(rt_frame),
_caller_frames_up=go_up_iframes,
rt_fi=fi,
call_frame=call_frame,
)
return None
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
# TODO: -[x] move all this into new `.devx._code`!
# -[ ] consider rename to _callstack?
# -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
def api_frame(
wrapped: Callable|None = None,
*,
caller_frames_up: int = 1,
) -> str:
'''
Create a "boxed" looking traceback string.
) -> Callable:
Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
# handle the decorator called WITHOUT () case,
# i.e. just @api_frame, NOT @api_frame(extra=<blah>)
if wrapped is None:
return partial(
api_frame,
caller_frames_up=caller_frames_up,
)
Any other parent/container "fields" can be passed in the
`fields_str` input along with other prefix/indent settings.
@wrapt.decorator
async def wrapper(
wrapped: Callable,
instance: object,
args: tuple,
kwargs: dict,
'''
if (
fields_str
and
field_prefix
):
# maybe cache the API frame for this call
global _frame2callerinfo_cache
this_frame: FrameType = inspect.currentframe()
api_frame: FrameType = this_frame.f_back
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
if not _frame2callerinfo_cache.get(api_frame):
_frame2callerinfo_cache[api_frame] = CallerInfo(
_api_frame=api_frame,
_api_func=wrapped,
_caller_frames_up=caller_frames_up,
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
)
return wrapped(*args, **kwargs)
tb_box: str = (
# annotate the function as a "api function", meaning it is
# a function for which the function above it in the call stack should be
# non-`tractor` code aka "user code".
#
# in the global frame cache for easy lookup from a given
# func-instance
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
wrapped.__api_func__: bool = True
return wrapper(wrapped)
# orig
# f' |\n'
# f' ------ - ------\n\n'
# f'{tb_str}\n'
# f' ------ - ------\n'
# f' _|\n'
f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# (len(field_prefix))
# ? ^-TODO-^ ? if you wanted another indent level
)
if tb_box_indent > 0:
tb_box: str = textwrap.indent(
tb_box,
prefix=tb_box_indent * ' ',
)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str

File diff suppressed because it is too large Load Diff

View File

@ -57,8 +57,8 @@ LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'CANCEL': 16,
'DEVX': 400,
'PDB': 500,
'DEVX': 600,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
@ -137,7 +137,7 @@ class StackLevelAdapter(LoggerAdapter):
"Developer experience" sub-sys statuses.
'''
return self.log(400, msg)
return self.log(600, msg)
def log(
self,
@ -202,29 +202,8 @@ class StackLevelAdapter(LoggerAdapter):
)
# TODO IDEA:
# -[ ] do per task-name and actor-name color coding
# -[ ] unique color per task-id and actor-uuid
def pformat_task_uid(
id_part: str = 'tail'
):
'''
Return `str`-ified unique for a `trio.Task` via a combo of its
`.name: str` and `id()` truncated output.
'''
task: trio.Task = trio.lowlevel.current_task()
tid: str = str(id(task))
if id_part == 'tail':
tid_part: str = tid[-6:]
else:
tid_part: str = tid[:6]
return f'{task.name}[{tid_part}]'
_conc_name_getters = {
'task': pformat_task_uid,
'task': lambda: trio.lowlevel.current_task().name,
'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6],
@ -232,10 +211,7 @@ _conc_name_getters = {
class ActorContextInfo(Mapping):
'''
Dyanmic lookup for local actor and task names.
'''
"Dyanmic lookup for local actor and task names"
_context_keys = (
'task',
'actor',

View File

@ -44,7 +44,7 @@ from ._codec import (
# )
from .types import (
PayloadMsg as PayloadMsg,
Msg as Msg,
Aid as Aid,
SpawnSpec as SpawnSpec,

View File

@ -432,7 +432,7 @@ class MsgCodec(Struct):
# ) -> Any|Struct:
# msg: PayloadMsg = codec.dec.decode(msg)
# msg: Msg = codec.dec.decode(msg)
# payload_tag: str = msg.header.payload_tag
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
# return payload_dec.decode(msg.pld)

View File

@ -22,9 +22,10 @@ operational helpers for processing transaction flows.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# asynccontextmanager as acm,
contextmanager as cm,
)
from contextvars import ContextVar
from typing import (
Any,
Type,
@ -49,7 +50,6 @@ from tractor._exceptions import (
_mk_msg_type_err,
pack_from_raise,
)
from tractor._state import current_ipc_ctx
from ._codec import (
mk_dec,
MsgDec,
@ -75,7 +75,7 @@ if TYPE_CHECKING:
log = get_logger(__name__)
_def_any_pldec: MsgDec[Any] = mk_dec()
_def_any_pldec: MsgDec = mk_dec()
class PldRx(Struct):
@ -104,19 +104,15 @@ class PldRx(Struct):
'''
# TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel
_pld_dec: MsgDec
_ctx: Context|None = None
_pldec: MsgDec
_ipc: Context|MsgStream|None = None
@property
def pld_dec(self) -> MsgDec:
return self._pld_dec
return self._pldec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm
def wraps_ipc(
def apply_to_ipc(
self,
ipc_prim: Context|MsgStream,
@ -144,50 +140,49 @@ class PldRx(Struct):
exit.
'''
orig_dec: MsgDec = self._pld_dec
orig_dec: MsgDec = self._pldec
limit_dec: MsgDec = mk_dec(spec=spec)
try:
self._pld_dec = limit_dec
self._pldec = limit_dec
yield limit_dec
finally:
self._pld_dec = orig_dec
self._pldec = orig_dec
@property
def dec(self) -> msgpack.Decoder:
return self._pld_dec.dec
return self._pldec.dec
def recv_pld_nowait(
self,
# TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream,
ipc: Context|MsgStream,
ctx: Context,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = False,
**dec_msg_kwargs,
) -> Any|Raw:
__tracebackhide__: bool = hide_tb
__tracebackhide__: bool = True
msg: MsgType = (
ipc_msg
or
# sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait()
ctx._rx_chan.receive_nowait()
)
return self.dec_msg(
msg,
ipc=ipc,
ctx=ctx,
expect_msg=expect_msg,
hide_tb=hide_tb,
**dec_msg_kwargs,
)
async def recv_pld(
self,
ipc: Context|MsgStream,
ctx: Context,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True,
@ -205,11 +200,11 @@ class PldRx(Struct):
or
# async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive()
await ctx._rx_chan.receive()
)
return self.dec_msg(
msg=msg,
ipc=ipc,
ctx=ctx,
expect_msg=expect_msg,
**dec_msg_kwargs,
)
@ -217,7 +212,7 @@ class PldRx(Struct):
def dec_msg(
self,
msg: MsgType,
ipc: Context|MsgStream,
ctx: Context,
expect_msg: Type[MsgType]|None,
raise_error: bool = True,
@ -230,9 +225,6 @@ class PldRx(Struct):
'''
__tracebackhide__: bool = hide_tb
_src_err = None
src_err: BaseException|None = None
match msg:
# payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code.
@ -242,7 +234,7 @@ class PldRx(Struct):
|Return(pld=pld) # termination phase
):
try:
pld: PayloadT = self._pld_dec.decode(pld)
pld: PayloadT = self._pldec.decode(pld)
log.runtime(
'Decoded msg payload\n\n'
f'{msg}\n\n'
@ -251,30 +243,25 @@ class PldRx(Struct):
)
return pld
# XXX pld-value type failure
except ValidationError as valerr:
# pack mgterr into error-msg for
# reraise below; ensure remote-actor-err
# info is displayed nicely?
# XXX pld-type failure
except ValidationError as src_err:
msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg,
codec=self.pld_dec,
src_validation_error=valerr,
src_validation_error=src_err,
is_invalid_payload=True,
)
msg: Error = pack_from_raise(
local_err=msgterr,
cid=msg.cid,
src_uid=ipc.chan.uid,
src_uid=ctx.chan.uid,
)
src_err = valerr
# XXX some other decoder specific failure?
# except TypeError as src_error:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise src_error
# ^-TODO-^ can remove?
# a runtime-internal RPC endpoint response.
# always passthrough since (internal) runtime
@ -312,7 +299,6 @@ class PldRx(Struct):
return src_err
case Stop(cid=cid):
ctx: Context = getattr(ipc, 'ctx', ipc)
message: str = (
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
f'{ctx.peer_side!r} peer ?\n'
@ -355,21 +341,14 @@ class PldRx(Struct):
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
#
# fallthrough and raise from `src_err`
try:
_raise_from_unexpected_msg(
ctx=getattr(ipc, 'ctx', ipc),
ctx=ctx,
msg=msg,
src_err=src_err,
log=log,
expect_msg=expect_msg,
hide_tb=hide_tb,
)
except UnboundLocalError:
# XXX if there's an internal lookup error in the above
# code (prolly on `src_err`) we want to show this frame
# in the tb!
__tracebackhide__: bool = False
raise
async def recv_msg_w_pld(
self,
@ -399,13 +378,52 @@ class PldRx(Struct):
# msg instance?
pld: PayloadT = self.dec_msg(
msg,
ipc=ipc,
ctx=ipc,
expect_msg=expect_msg,
**kwargs,
)
return msg, pld
# Always maintain a task-context-global `PldRx`
_def_pld_rx: PldRx = PldRx(
_pldec=_def_any_pldec,
)
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
'pld_rx',
default=_def_pld_rx,
)
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()
@cm
def limit_plds(
spec: Union[Type[Struct]],
@ -421,55 +439,29 @@ def limit_plds(
'''
__tracebackhide__: bool = True
try:
curr_ctx: Context = current_ipc_ctx()
rx: PldRx = curr_ctx._pld_rx
orig_pldec: MsgDec = rx.pld_dec
# sanity on orig settings
orig_pldrx: PldRx = current_pldrx()
orig_pldec: MsgDec = orig_pldrx.pld_dec
with rx.limit_plds(
with orig_pldrx.limit_plds(
spec=spec,
**kwargs,
) as pldec:
log.runtime(
log.info(
'Applying payload-decoder\n\n'
f'{pldec}\n'
)
yield pldec
finally:
log.runtime(
log.info(
'Reverted to previous payload-decoder\n\n'
f'{orig_pldec}\n'
)
# sanity on orig settings
assert rx.pld_dec is orig_pldec
@acm
async def maybe_limit_plds(
ctx: Context,
spec: Union[Type[Struct]]|None = None,
**kwargs,
) -> MsgDec|None:
'''
Async compat maybe-payload type limiter.
Mostly for use inside other internal `@acm`s such that a separate
indent block isn't needed when an async one is already being
used.
'''
if spec is None:
yield None
return
# sanity on scoping
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
yield msgdec
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
assert (
(pldrx := current_pldrx()) is orig_pldrx
and
pldrx.pld_dec is orig_pldec
)
async def drain_to_final_msg(
@ -551,12 +543,21 @@ async def drain_to_final_msg(
match msg:
# final result arrived!
case Return():
case Return(
# cid=cid,
# pld=res,
):
# ctx._result: Any = res
ctx._result: Any = pld
log.runtime(
'Context delivered final draining msg:\n'
f'{pretty_struct.pformat(msg)}'
)
ctx._result: Any = pld
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._rx_chan:
# await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right?
result_msg = msg
break
@ -663,6 +664,24 @@ async def drain_to_final_msg(
result_msg = msg
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=ctx._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is ctx._cancel_msg
# assert error.msgdata == ctx._remote_error.msgdata
# assert error.ipc_msg == ctx._remote_error.ipc_msg
# from .devx._debug import pause
# await pause()
# ctx._maybe_cancel_and_set_remote_error(error)
# ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise

View File

@ -56,7 +56,8 @@ log = get_logger('tractor.msgspec')
PayloadT = TypeVar('PayloadT')
class PayloadMsg(
# TODO: PayloadMsg
class Msg(
Struct,
Generic[PayloadT],
@ -109,10 +110,6 @@ class PayloadMsg(
pld: Raw
# TODO: complete rename
Msg = PayloadMsg
class Aid(
Struct,
tag=True,
@ -302,7 +299,7 @@ class StartAck(
class Started(
PayloadMsg,
Msg,
Generic[PayloadT],
):
'''
@ -316,12 +313,12 @@ class Started(
# TODO: instead of using our existing `Start`
# for this (as we did with the original `{'cmd': ..}` style)
# class Cancel:
# class Cancel(Msg):
# cid: str
class Yield(
PayloadMsg,
Msg,
Generic[PayloadT],
):
'''
@ -348,7 +345,7 @@ class Stop(
# TODO: is `Result` or `Out[come]` a better name?
class Return(
PayloadMsg,
Msg,
Generic[PayloadT],
):
'''
@ -360,7 +357,7 @@ class Return(
class CancelAck(
PayloadMsg,
Msg,
Generic[PayloadT],
):
'''
@ -466,14 +463,14 @@ def from_dict_msg(
# TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`?
# class Cancelled(MsgType):
# class Cancelled(Msg):
# cid: str
# TODO what about overruns?
# class Overrun(MsgType):
# class Overrun(Msg):
# cid: str
_runtime_msgs: list[Struct] = [
_runtime_msgs: list[Msg] = [
# identity handshake on first IPC `Channel` contact.
Aid,
@ -499,9 +496,9 @@ _runtime_msgs: list[Struct] = [
]
# the no-outcome-yet IAC (inter-actor-communication) sub-set which
# can be `PayloadMsg.pld` payload field type-limited by application code
# can be `Msg.pld` payload field type-limited by application code
# using `apply_codec()` and `limit_msg_spec()`.
_payload_msgs: list[PayloadMsg] = [
_payload_msgs: list[Msg] = [
# first <value> from `Context.started(<value>)`
Started,
@ -544,8 +541,8 @@ def mk_msg_spec(
] = 'indexed_generics',
) -> tuple[
Union[MsgType],
list[MsgType],
Union[Type[Msg]],
list[Type[Msg]],
]:
'''
Create a payload-(data-)type-parameterized IPC message specification.
@ -557,7 +554,7 @@ def mk_msg_spec(
determined by the input `payload_type_union: Union[Type]`.
'''
submsg_types: list[MsgType] = Msg.__subclasses__()
submsg_types: list[Type[Msg]] = Msg.__subclasses__()
bases: tuple = (
# XXX NOTE XXX the below generic-parameterization seems to
# be THE ONLY way to get this to work correctly in terms