Compare commits

..

No commits in common. "b22f7dcae042dae0a9d068021a76f2c818489d7d" and "343b7c971249b25480c6a59d8974856107e736b7" have entirely different histories.

25 changed files with 893 additions and 1318 deletions

View File

@ -1,11 +1,6 @@
import time import time
import trio import trio
import tractor import tractor
from tractor import (
ActorNursery,
MsgStream,
Portal,
)
# this is the first 2 actors, streamer_1 and streamer_2 # this is the first 2 actors, streamer_1 and streamer_2
@ -17,18 +12,14 @@ async def stream_data(seed):
# this is the third actor; the aggregator # this is the third actor; the aggregator
async def aggregate(seed): async def aggregate(seed):
''' """Ensure that the two streams we receive match but only stream
Ensure that the two streams we receive match but only stream
a single set of values to the parent. a single set of values to the parent.
"""
''' async with tractor.open_nursery() as nursery:
an: ActorNursery portals = []
async with tractor.open_nursery() as an:
portals: list[Portal] = []
for i in range(1, 3): for i in range(1, 3):
# fork point
# fork/spawn call portal = await nursery.start_actor(
portal = await an.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -52,11 +43,7 @@ async def aggregate(seed):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for portal in portals: for portal in portals:
n.start_soon( n.start_soon(push_to_chan, portal, send_chan.clone())
push_to_chan,
portal,
send_chan.clone(),
)
# close this local task's reference to send side # close this local task's reference to send side
await send_chan.aclose() await send_chan.aclose()
@ -73,7 +60,7 @@ async def aggregate(seed):
print("FINISHED ITERATING in aggregator") print("FINISHED ITERATING in aggregator")
await an.cancel() await nursery.cancel()
print("WAITING on `ActorNursery` to finish") print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!") print("AGGREGATOR COMPLETE!")
@ -88,21 +75,18 @@ async def main() -> list[int]:
''' '''
# yes, a nursery which spawns `trio`-"actors" B) # yes, a nursery which spawns `trio`-"actors" B)
an: ActorNursery nursery: tractor.ActorNursery
async with tractor.open_nursery( async with tractor.open_nursery() as nursery:
loglevel='cancel',
debug_mode=True,
) as an:
seed = int(1e3) seed = int(1e3)
pre_start = time.time() pre_start = time.time()
portal: Portal = await an.start_actor( portal: tractor.Portal = await nursery.start_actor(
name='aggregator', name='aggregator',
enable_modules=[__name__], enable_modules=[__name__],
) )
stream: MsgStream stream: tractor.MsgStream
async with portal.open_stream_from( async with portal.open_stream_from(
aggregate, aggregate,
seed=seed, seed=seed,
@ -111,12 +95,11 @@ async def main() -> list[int]:
start = time.time() start = time.time()
# the portal call returns exactly what you'd expect # the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally # as if the remote "aggregate" function was called locally
result_stream: list[int] = [] result_stream = []
async for value in stream: async for value in stream:
result_stream.append(value) result_stream.append(value)
cancelled: bool = await portal.cancel_actor() await portal.cancel_actor()
assert cancelled
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")

View File

@ -55,7 +55,6 @@ xontrib-vox = "^0.0.1"
optional = false optional = false
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pytest = "^8.2.0" pytest = "^8.2.0"
pexpect = "^4.9.0"
# only for xonsh as sh.. # only for xonsh as sh..
xontrib-vox = "^0.0.1" xontrib-vox = "^0.0.1"

View File

@ -97,7 +97,6 @@ def test_ipc_channel_break_during_stream(
examples_dir() / 'advanced_faults' examples_dir() / 'advanced_faults'
/ 'ipc_failure_during_stream.py', / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
consider_namespace_packages=False,
) )
# by def we expect KBI from user after a simulated "hang # by def we expect KBI from user after a simulated "hang

View File

@ -89,30 +89,17 @@ def test_remote_error(reg_addr, args_err):
assert excinfo.value.boxed_type == errtype assert excinfo.value.boxed_type == errtype
else: else:
# the root task will also error on the `Portal.result()` # the root task will also error on the `.result()` call
# call so we expect an error from there AND the child. # so we expect an error from there AND the child.
# |_ tho seems like on new `trio` this doesn't always with pytest.raises(BaseExceptionGroup) as excinfo:
# happen?
with pytest.raises((
BaseExceptionGroup,
tractor.RemoteActorError,
)) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed errors are `errtype` # ensure boxed errors
err: BaseException = excinfo.value for exc in excinfo.value.exceptions:
if isinstance(err, BaseExceptionGroup):
suberrs: list[BaseException] = err.exceptions
else:
suberrs: list[BaseException] = [err]
for exc in suberrs:
assert exc.boxed_type == errtype assert exc.boxed_type == errtype
def test_multierror( def test_multierror(reg_addr):
reg_addr: tuple[str, int],
):
''' '''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors. more then one actor errors.

View File

@ -444,7 +444,6 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
infect_asyncio=True, infect_asyncio=True,
fan_out=fan_out, fan_out=fan_out,
) )
# should raise RAE diectly
await portal.result() await portal.result()
trio.run(main) trio.run(main)
@ -462,11 +461,12 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed error type # ensure boxed errors
excinfo.value.boxed_type == Exception for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception
def test_trio_closes_early_and_channel_exits(reg_addr): def test_trio_closes_early_and_channel_exits(reg_addr):
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
exit_early=True, exit_early=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should raise RAE diectly # should trigger remote actor error
await portal.result() await portal.result()
# should be a quiet exit on a simple channel exit # should be a quiet exit on a simple channel exit
@ -492,17 +492,15 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
aio_raise_err=True, aio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should trigger RAE directly, not an eg. # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises( with pytest.raises(BaseExceptionGroup) as excinfo:
# NOTE: bc we directly wait on `Portal.result()` instead
# of capturing it inside the `ActorNursery` machinery.
expected_exception=RemoteActorError,
) as excinfo:
trio.run(main) trio.run(main)
excinfo.value.boxed_type == Exception # ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.boxed_type == Exception
@tractor.context @tractor.context

View File

@ -55,10 +55,9 @@ from tractor._testing import (
@tractor.context @tractor.context
async def open_stream_then_sleep_forever( async def sleep_forever(
ctx: Context, ctx: Context,
expect_ctxc: bool = False, expect_ctxc: bool = False,
) -> None: ) -> None:
''' '''
Sync the context, open a stream then just sleep. Sync the context, open a stream then just sleep.
@ -68,10 +67,6 @@ async def open_stream_then_sleep_forever(
''' '''
try: try:
await ctx.started() await ctx.started()
# NOTE: the below means this child will send a `Stop`
# to it's parent-side task despite that side never
# opening a stream itself.
async with ctx.open_stream(): async with ctx.open_stream():
await trio.sleep_forever() await trio.sleep_forever()
@ -105,7 +100,7 @@ async def error_before_started(
''' '''
async with tractor.wait_for_actor('sleeper') as p2: async with tractor.wait_for_actor('sleeper') as p2:
async with ( async with (
p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first), p2.open_context(sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(), peer_ctx.open_stream(),
): ):
# NOTE: this WAS inside an @acm body but i factored it # NOTE: this WAS inside an @acm body but i factored it
@ -209,13 +204,9 @@ async def stream_ints(
@tractor.context @tractor.context
async def stream_from_peer( async def stream_from_peer(
ctx: Context, ctx: Context,
debug_mode: bool,
peer_name: str = 'sleeper', peer_name: str = 'sleeper',
) -> None: ) -> None:
# sanity
assert tractor._state.debug_mode() == debug_mode
peer: Portal peer: Portal
try: try:
async with ( async with (
@ -249,54 +240,26 @@ async def stream_from_peer(
assert msg is not None assert msg is not None
print(msg) print(msg)
# NOTE: cancellation of the (sleeper) peer should always cause # NOTE: cancellation of the (sleeper) peer should always
# a `ContextCancelled` raise in this streaming actor. # cause a `ContextCancelled` raise in this streaming
except ContextCancelled as _ctxc: # actor.
ctxc = _ctxc except ContextCancelled as ctxc:
ctxerr = ctxc
# print("TRYING TO ENTER PAUSSE!!!") assert peer_ctx._remote_error is ctxerr
# await tractor.pause(shield=True) assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
re: ContextCancelled = peer_ctx._remote_error
# XXX YES XXX, remote error should be unpacked only once! # XXX YES, bc exact same msg instances
assert ( assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
re
is
peer_ctx.maybe_error
is
ctxc
is
peer_ctx._local_error
)
# NOTE: these errors should all match!
# ------ - ------
# XXX [2024-05-03] XXX
# ------ - ------
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
# where the `Error()` msg was directly raising the ctxc
# instead of just returning up to the caller inside
# `Context.return()` which would results in a diff instance of
# the same remote error bubbling out above vs what was
# already unpacked and set inside `Context.
assert (
peer_ctx._remote_error.msgdata
==
ctxc.msgdata
)
# ^-XXX-^ notice the data is of course the exact same.. so
# the above larger assert makes sense to also always be true!
# XXX YES XXX, bc should be exact same msg instances # XXX NO, bc new one always created for property accesss
assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
# XXX NO XXX, bc new one always created for property accesss
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
# the peer ctx is the canceller even though it's canceller # the peer ctx is the canceller even though it's canceller
# is the "canceller" XD # is the "canceller" XD
assert peer_name in peer_ctx.canceller assert peer_name in peer_ctx.canceller
assert "canceller" in ctxc.canceller assert "canceller" in ctxerr.canceller
# caller peer should not be the cancel requester # caller peer should not be the cancel requester
assert not ctx.cancel_called assert not ctx.cancel_called
@ -320,13 +283,12 @@ async def stream_from_peer(
# TODO / NOTE `.canceller` won't have been set yet # TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside # here because that machinery is inside
# `Portal.open_context().__aexit__()` BUT, if we had # `.open_context().__aexit__()` BUT, if we had
# a way to know immediately (from the last # a way to know immediately (from the last
# checkpoint) that cancellation was due to # checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see, # a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
# #
# await tractor.pause()
# assert 'canceller' in ctx.canceller # assert 'canceller' in ctx.canceller
# root/parent actor task should NEVER HAVE cancelled us! # root/parent actor task should NEVER HAVE cancelled us!
@ -430,13 +392,12 @@ def test_peer_canceller(
try: try:
async with ( async with (
sleeper.open_context( sleeper.open_context(
open_stream_then_sleep_forever, sleep_forever,
expect_ctxc=True, expect_ctxc=True,
) as (sleeper_ctx, sent), ) as (sleeper_ctx, sent),
just_caller.open_context( just_caller.open_context(
stream_from_peer, stream_from_peer,
debug_mode=debug_mode,
) as (caller_ctx, sent), ) as (caller_ctx, sent),
canceller.open_context( canceller.open_context(
@ -462,11 +423,10 @@ def test_peer_canceller(
# should always raise since this root task does # should always raise since this root task does
# not request the sleeper cancellation ;) # not request the sleeper cancellation ;)
except ContextCancelled as _ctxc: except ContextCancelled as ctxerr:
ctxc = _ctxc
print( print(
'CAUGHT REMOTE CONTEXT CANCEL\n\n' 'CAUGHT REMOTE CONTEXT CANCEL\n\n'
f'{ctxc}\n' f'{ctxerr}\n'
) )
# canceller and caller peers should not # canceller and caller peers should not
@ -477,7 +437,7 @@ def test_peer_canceller(
# we were not the actor, our peer was # we were not the actor, our peer was
assert not sleeper_ctx.cancel_acked assert not sleeper_ctx.cancel_acked
assert ctxc.canceller[0] == 'canceller' assert ctxerr.canceller[0] == 'canceller'
# XXX NOTE XXX: since THIS `ContextCancelled` # XXX NOTE XXX: since THIS `ContextCancelled`
# HAS NOT YET bubbled up to the # HAS NOT YET bubbled up to the
@ -488,7 +448,7 @@ def test_peer_canceller(
# CASE_1: error-during-ctxc-handling, # CASE_1: error-during-ctxc-handling,
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
raise RuntimeError('Simulated RTE re-raise during ctxc handling') raise RuntimeError('Simulated error during teardown')
# CASE_2: standard teardown inside in `.open_context()` block # CASE_2: standard teardown inside in `.open_context()` block
raise raise
@ -553,9 +513,6 @@ def test_peer_canceller(
# should be cancelled by US. # should be cancelled by US.
# #
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
print(f'loc_err: {_loc_err}\n')
assert isinstance(loc_err, RuntimeError)
# since we do a rte reraise above, the # since we do a rte reraise above, the
# `.open_context()` error handling should have # `.open_context()` error handling should have
# raised a local rte, thus the internal # raised a local rte, thus the internal
@ -564,6 +521,9 @@ def test_peer_canceller(
# a `trio.Cancelled` due to a local # a `trio.Cancelled` due to a local
# `._scope.cancel()` call. # `._scope.cancel()` call.
assert not sleeper_ctx._scope.cancelled_caught assert not sleeper_ctx._scope.cancelled_caught
assert isinstance(loc_err, RuntimeError)
print(f'_loc_err: {_loc_err}\n')
# assert sleeper_ctx._local_error is _loc_err # assert sleeper_ctx._local_error is _loc_err
# assert sleeper_ctx._local_error is _loc_err # assert sleeper_ctx._local_error is _loc_err
assert not ( assert not (
@ -600,12 +560,9 @@ def test_peer_canceller(
else: # the other 2 ctxs else: # the other 2 ctxs
assert ( assert (
isinstance(re, ContextCancelled) re.canceller
and ( ==
re.canceller canceller.channel.uid
==
canceller.channel.uid
)
) )
# since the sleeper errors while handling a # since the sleeper errors while handling a
@ -854,7 +811,8 @@ async def serve_subactors(
async with open_nursery() as an: async with open_nursery() as an:
# sanity # sanity
assert tractor._state.debug_mode() == debug_mode if debug_mode:
assert tractor._state.debug_mode()
await ctx.started(peer_name) await ctx.started(peer_name)
async with ctx.open_stream() as ipc: async with ctx.open_stream() as ipc:
@ -1133,6 +1091,7 @@ def test_peer_spawns_and_cancels_service_subactor(
'-> root checking `client_ctx.result()`,\n' '-> root checking `client_ctx.result()`,\n'
f'-> checking that sub-spawn {peer_name} is down\n' f'-> checking that sub-spawn {peer_name} is down\n'
) )
# else:
try: try:
res = await client_ctx.result(hide_tb=False) res = await client_ctx.result(hide_tb=False)

View File

@ -2,9 +2,7 @@
Spawning basics Spawning basics
""" """
from typing import ( from typing import Optional
Any,
)
import pytest import pytest
import trio import trio
@ -27,11 +25,13 @@ async def spawn(
async with tractor.open_root_actor( async with tractor.open_root_actor(
arbiter_addr=reg_addr, arbiter_addr=reg_addr,
): ):
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter assert actor.is_arbiter == is_arbiter
data = data_to_pass_down data = data_to_pass_down
if actor.is_arbiter: if actor.is_arbiter:
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
# forks here # forks here
@ -95,9 +95,7 @@ async def test_movie_theatre_convo(start_method):
await portal.cancel_actor() await portal.cancel_actor()
async def cellar_door( async def cellar_door(return_value: Optional[str]):
return_value: str|None,
):
return return_value return return_value
@ -107,18 +105,16 @@ async def cellar_door(
) )
@tractor_test @tractor_test
async def test_most_beautiful_word( async def test_most_beautiful_word(
start_method: str, start_method,
return_value: Any, return_value
debug_mode: bool,
): ):
''' '''
The main ``tractor`` routine. The main ``tractor`` routine.
''' '''
with trio.fail_after(1): with trio.fail_after(1):
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode,
) as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(
cellar_door, cellar_door,
return_value=return_value, return_value=return_value,

View File

@ -42,7 +42,6 @@ from ._supervise import (
from ._state import ( from ._state import (
current_actor as current_actor, current_actor as current_actor,
is_root_process as is_root_process, is_root_process as is_root_process,
current_ipc_ctx as current_ipc_ctx,
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled as ContextCancelled, ContextCancelled as ContextCancelled,

View File

@ -41,7 +41,6 @@ from typing import (
Callable, Callable,
Mapping, Mapping,
Type, Type,
TypeAlias,
TYPE_CHECKING, TYPE_CHECKING,
Union, Union,
) )
@ -95,7 +94,7 @@ if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
from ._ipc import MsgTransport from ._ipc import MsgTransport
from .devx._frame_stack import ( from .devx._code import (
CallerInfo, CallerInfo,
) )
@ -156,41 +155,6 @@ class Context:
# payload receiver # payload receiver
_pld_rx: msgops.PldRx _pld_rx: msgops.PldRx
@property
def pld_rx(self) -> msgops.PldRx:
'''
The current `tractor.Context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER its container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been
delivered up from `tractor`'s transport layer but BEFORE the
data is yielded to `tractor` application code.
The "IPC-primitive API" is normally one of a `Context` (this)` or a `MsgStream`
or some higher level API using one of them.
For ex. `pld_data: PayloadT = MsgStream.receive()` implicitly
calls into the stream's parent `Context.pld_rx.recv_pld().` to
receive the latest `PayloadMsg.pld` value.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by the
interchange backend.
- for `msgspec` see <PUTLINKHERE>.
Note that the `PldRx` itself is a per-`Context` instance that
normally only changes when some (sub-)task, on a given "side"
of the IPC ctx (either a "child"-side RPC or inside
a "parent"-side `Portal.open_context()` block), modifies it
using the `.msg._ops.limit_plds()` API.
'''
return self._pld_rx
# full "namespace-path" to target RPC function # full "namespace-path" to target RPC function
_nsf: NamespacePath _nsf: NamespacePath
@ -267,8 +231,6 @@ class Context:
# init and streaming state # init and streaming state
_started_called: bool = False _started_called: bool = False
_started_msg: MsgType|None = None
_started_pld: Any = None
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
@ -661,7 +623,7 @@ class Context:
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n' f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}: {self._actor.uid}\n\n' f'=> {self.side!r}\n\n'
f'{error}' f'{error}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
@ -716,7 +678,7 @@ class Context:
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
# f'{pformat(self)}\n' # f'{pformat(self)}\n'
f'{error}' f'{error}\n'
) )
if self._canceller is None: if self._canceller is None:
@ -762,10 +724,8 @@ class Context:
) )
else: else:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
# mk_pdb().set_trace()
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
if ( if (
cs cs
and and
@ -845,7 +805,6 @@ class Context:
# f'{ci.api_nsp}()\n' # f'{ci.api_nsp}()\n'
# ) # )
# TODO: use `.dev._frame_stack` scanning to find caller!
return 'Portal.open_context()' return 'Portal.open_context()'
async def cancel( async def cancel(
@ -1345,6 +1304,17 @@ class Context:
ctx=self, ctx=self,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
for msg in drained_msgs:
# TODO: mask this by default..
if isinstance(msg, Return):
# from .devx import pause
# await pause()
# raise InternalError(
log.warning(
'Final `return` msg should never be drained !?!?\n\n'
f'{msg}\n'
)
drained_status: str = ( drained_status: str = (
'Ctx drained to final outcome msg\n\n' 'Ctx drained to final outcome msg\n\n'
@ -1465,10 +1435,6 @@ class Context:
self._result self._result
) )
@property
def has_outcome(self) -> bool:
return bool(self.maybe_error) or self._final_result_is_set()
# @property # @property
def repr_outcome( def repr_outcome(
self, self,
@ -1671,6 +1637,8 @@ class Context:
) )
if rt_started != started_msg: if rt_started != started_msg:
# TODO: break these methods out from the struct subtype?
# TODO: make that one a mod func too.. # TODO: make that one a mod func too..
diff = pretty_struct.Struct.__sub__( diff = pretty_struct.Struct.__sub__(
rt_started, rt_started,
@ -1706,8 +1674,6 @@ class Context:
) from verr ) from verr
self._started_called = True self._started_called = True
self._started_msg = started_msg
self._started_pld = value
async def _drain_overflows( async def _drain_overflows(
self, self,
@ -1995,7 +1961,6 @@ async def open_context_from_portal(
portal: Portal, portal: Portal,
func: Callable, func: Callable,
pld_spec: TypeAlias|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will # TODO: if we set this the wrapping `@acm` body will
@ -2061,7 +2026,7 @@ async def open_context_from_portal(
# XXX NOTE XXX: currenly we do NOT allow opening a contex # XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing # with "self" since the local feeder mem-chan processing
# is not built for it. # is not built for it.
if (uid := portal.channel.uid) == portal.actor.uid: if portal.channel.uid == portal.actor.uid:
raise RuntimeError( raise RuntimeError(
'** !! Invalid Operation !! **\n' '** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n' 'Can not open an IPC ctx with the local actor!\n'
@ -2089,45 +2054,32 @@ async def open_context_from_portal(
assert ctx._caller_info assert ctx._caller_info
_ctxvar_Context.set(ctx) _ctxvar_Context.set(ctx)
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
first: Any = await ctx._pld_rx.recv_pld(
ctx=ctx,
expect_msg=Started,
)
ctx._started_called: bool = True
uid: tuple = portal.channel.uid
cid: str = ctx.cid
# placeholder for any exception raised in the runtime # placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure. # or by user tasks which cause this context's closure.
scope_err: BaseException|None = None scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None ctxc_from_callee: ContextCancelled|None = None
try: try:
async with ( async with trio.open_nursery() as nurse:
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=pld_spec,
) as maybe_msgdec,
):
if maybe_msgdec:
assert maybe_msgdec.pld_spec == pld_spec
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the # NOTE: used to start overrun queuing tasks
# `Started`-msg any cancellation triggered ctx._scope_nursery: trio.Nursery = nurse
# in `._maybe_cancel_and_set_remote_error()` will ctx._scope: trio.CancelScope = nurse.cancel_scope
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
ipc=ctx,
expect_msg=Started,
passthrough_non_pld_msgs=False,
)
# from .devx import pause
# await pause()
ctx._started_called: bool = True
ctx._started_msg: bool = started_msg
ctx._started_pld: bool = first
# NOTE: this in an implicit runtime nursery used to,
# - start overrun queuing tasks when as well as
# for cancellation of the scope opened by the user.
ctx._scope_nursery: trio.Nursery = tn
ctx._scope: trio.CancelScope = tn.cancel_scope
# deliver context instance and .started() msg value # deliver context instance and .started() msg value
# in enter tuple. # in enter tuple.
@ -2174,13 +2126,13 @@ async def open_context_from_portal(
# when in allow_overruns mode there may be # when in allow_overruns mode there may be
# lingering overflow sender tasks remaining? # lingering overflow sender tasks remaining?
if tn.child_tasks: if nurse.child_tasks:
# XXX: ensure we are in overrun state # XXX: ensure we are in overrun state
# with ``._allow_overruns=True`` bc otherwise # with ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery! # there should be no tasks in this nursery!
if ( if (
not ctx._allow_overruns not ctx._allow_overruns
or len(tn.child_tasks) > 1 or len(nurse.child_tasks) > 1
): ):
raise InternalError( raise InternalError(
'Context has sub-tasks but is ' 'Context has sub-tasks but is '
@ -2352,8 +2304,8 @@ async def open_context_from_portal(
): ):
log.warning( log.warning(
'IPC connection for context is broken?\n' 'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n' f'task:{cid}\n'
f'actor: {uid}' f'actor:{uid}'
) )
raise # duh raise # duh
@ -2503,8 +2455,9 @@ async def open_context_from_portal(
and ctx.cancel_acked and ctx.cancel_acked
): ):
log.cancel( log.cancel(
f'Context cancelled by {ctx.side!r}-side task\n' 'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n' f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n' f'{repr(scope_err)}\n'
) )
@ -2532,7 +2485,7 @@ async def open_context_from_portal(
f'cid: {ctx.cid}\n' f'cid: {ctx.cid}\n'
) )
portal.actor._contexts.pop( portal.actor._contexts.pop(
(uid, ctx.cid), (uid, cid),
None, None,
) )
@ -2560,12 +2513,11 @@ def mk_context(
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high! # TODO: only scan caller-info if log level so high!
from .devx._frame_stack import find_caller_info from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info() caller_info: CallerInfo|None = find_caller_info()
pld_rx = msgops.PldRx( # TODO: when/how do we apply `.limit_plds()` from here?
_pld_dec=msgops._def_any_pldec, pld_rx: msgops.PldRx = msgops.current_pldrx()
)
ctx = Context( ctx = Context(
chan=chan, chan=chan,
@ -2579,16 +2531,13 @@ def mk_context(
_caller_info=caller_info, _caller_info=caller_info,
**kwargs, **kwargs,
) )
pld_rx._ctx = ctx
ctx._result = Unresolved ctx._result = Unresolved
return ctx return ctx
# TODO: use the new type-parameters to annotate this in 3.13? # TODO: use the new type-parameters to annotate this in 3.13?
# -[ ] https://peps.python.org/pep-0718/#unknown-types # -[ ] https://peps.python.org/pep-0718/#unknown-types
def context( def context(func: Callable) -> Callable:
func: Callable,
) -> Callable:
''' '''
Mark an (async) function as an SC-supervised, inter-`Actor`, Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more child-`trio.Task`, IPC endpoint otherwise known more

View File

@ -716,5 +716,4 @@ async def _connect_chan(
chan = Channel((host, port)) chan = Channel((host, port))
await chan.connect() await chan.connect()
yield chan yield chan
with trio.CancelScope(shield=True): await chan.aclose()
await chan.aclose()

View File

@ -47,7 +47,6 @@ from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
# Error, # Error,
PayloadMsg,
NamespacePath, NamespacePath,
Return, Return,
) )
@ -99,8 +98,7 @@ class Portal:
self.chan = channel self.chan = channel
# during the portal's lifetime # during the portal's lifetime
self._final_result_pld: Any|None = None self._final_result: Any|None = None
self._final_result_msg: PayloadMsg|None = None
# When set to a ``Context`` (when _submit_for_result is called) # When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some # it is expected that ``result()`` will be awaited at some
@ -134,7 +132,7 @@ class Portal:
'A pending main result has already been submitted' 'A pending main result has already been submitted'
) )
self._expect_result_ctx: Context = await self.actor.start_remote_task( self._expect_result_ctx = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath(f'{ns}:{func}'), nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs, kwargs=kwargs,
@ -165,22 +163,13 @@ class Portal:
# expecting a "main" result # expecting a "main" result
assert self._expect_result_ctx assert self._expect_result_ctx
if self._final_result_msg is None: if self._final_result is None:
try: self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
( ctx=self._expect_result_ctx,
self._final_result_msg, expect_msg=Return,
self._final_result_pld, )
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
ipc=self._expect_result_ctx,
expect_msg=Return,
)
except BaseException as err:
# TODO: wrap this into `@api_frame` optionally with
# some kinda filtering mechanism like log levels?
__tracebackhide__: bool = False
raise err
return self._final_result_pld return self._final_result
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
@ -312,7 +301,7 @@ class Portal:
portal=self, portal=self,
) )
return await ctx._pld_rx.recv_pld( return await ctx._pld_rx.recv_pld(
ipc=ctx, ctx=ctx,
expect_msg=Return, expect_msg=Return,
) )
@ -331,8 +320,6 @@ class Portal:
remote rpc task or a local async generator instance. remote rpc task or a local async generator instance.
''' '''
__runtimeframe__: int = 1 # noqa
if isinstance(func, str): if isinstance(func, str):
warnings.warn( warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now" "`Portal.run(namespace: str, funcname: str)` is now"
@ -366,7 +353,7 @@ class Portal:
portal=self, portal=self,
) )
return await ctx._pld_rx.recv_pld( return await ctx._pld_rx.recv_pld(
ipc=ctx, ctx=ctx,
expect_msg=Return, expect_msg=Return,
) )

View File

@ -18,7 +18,7 @@
Root actor runtime ignition(s). Root actor runtime ignition(s).
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager
from functools import partial from functools import partial
import importlib import importlib
import logging import logging
@ -60,7 +60,7 @@ _default_lo_addrs: list[tuple[str, int]] = [(
logger = log.get_logger('tractor') logger = log.get_logger('tractor')
@acm @asynccontextmanager
async def open_root_actor( async def open_root_actor(
*, *,
@ -96,7 +96,6 @@ async def open_root_actor(
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
''' '''
__tracebackhide__ = True
# TODO: stick this in a `@cm` defined in `devx._debug`? # TODO: stick this in a `@cm` defined in `devx._debug`?
# #
# Override the global debugger hook to make it play nice with # Override the global debugger hook to make it play nice with
@ -359,11 +358,7 @@ async def open_root_actor(
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
import inspect entered: bool = await _debug._maybe_enter_pm(err)
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if ( if (
not entered not entered

View File

@ -70,6 +70,7 @@ from .msg import (
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck, CancelAck,
Error, Error,
Msg,
MsgType, MsgType,
Return, Return,
Start, Start,
@ -249,17 +250,10 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# NOTE: we normally always hide this frame in call-stack tracebacks
# if the crash originated from an RPC task (since normally the
# user is only going to care about their own code not this
# internal runtime frame) and we DID NOT
# fail due to an IPC transport error!
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: a debug nursery when in debug mode! # TODO: a debug nursery when in debug mode!
# async with maybe_open_debugger_nursery() as debug_tn: # async with maybe_open_debugger_nursery() as debug_tn:
# => see matching comment in side `._debug._pause()` # => see matching comment in side `._debug._pause()`
rpc_err: BaseException|None = None
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -270,7 +264,16 @@ async def _errors_relayed_via_ipc(
BaseExceptionGroup, BaseExceptionGroup,
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
rpc_err = err
# NOTE: always hide this frame from debug REPL call stack
# if the crash originated from an RPC task and we DID NOT
# fail due to an IPC transport error!
if (
is_rpc
and
chan.connected()
):
__tracebackhide__: bool = hide_tb
# TODO: maybe we'll want different "levels" of debugging # TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
@ -315,19 +318,11 @@ async def _errors_relayed_via_ipc(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but
# didn't, maybe there was an internal error in
# the above code and we do want to show this
# frame!
if _state.debug_mode():
__tracebackhide__: bool = False
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
f'|_{ctx}' f'|_{ctx}'
) )
# ALWAYS try to ship RPC errors back to parent/caller task # ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc: if is_rpc:
@ -360,20 +355,6 @@ async def _errors_relayed_via_ipc(
# `Actor._service_n`, we add "handles" to each such that # `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled. # they can be individually ccancelled.
finally: finally:
# if the error is not from user code and instead a failure
# of a runtime RPC or transport failure we do prolly want to
# show this frame
if (
rpc_err
and (
not is_rpc
or
not chan.connected()
)
):
__tracebackhide__: bool = False
try: try:
ctx: Context ctx: Context
func: Callable func: Callable
@ -463,10 +444,9 @@ async def _invoke(
# open the stream with this option. # open the stream with this option.
# allow_overruns=True, # allow_overruns=True,
) )
context_ep_func: bool = False context: bool = False
# set the current IPC ctx var for this RPC task assert not _state._ctxvar_Context.get()
_state._ctxvar_Context.set(ctx)
# TODO: deprecate this style.. # TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
@ -495,7 +475,7 @@ async def _invoke(
# handle decorated ``@tractor.context`` async function # handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False): elif getattr(func, '_tractor_context_function', False):
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
context_ep_func = True context = True
# errors raised inside this block are propgated back to caller # errors raised inside this block are propgated back to caller
async with _errors_relayed_via_ipc( async with _errors_relayed_via_ipc(
@ -521,7 +501,7 @@ async def _invoke(
raise raise
# TODO: impl all these cases in terms of the `Context` one! # TODO: impl all these cases in terms of the `Context` one!
if not context_ep_func: if not context:
await _invoke_non_context( await _invoke_non_context(
actor, actor,
cancel_scope, cancel_scope,
@ -591,6 +571,7 @@ async def _invoke(
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
ctx._scope_nursery = tn ctx._scope_nursery = tn
ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
_state._ctxvar_Context.set(ctx)
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our # TODO: should would be nice to have our
@ -850,7 +831,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
assert actor._service_n # runtime state sanity assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
@ -863,7 +844,7 @@ async def process_messages(
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: MsgType|None = None msg: Msg|None = None
try: try:
# NOTE: this internal scope allows for keeping this # NOTE: this internal scope allows for keeping this
# message loop running despite the current task having # message loop running despite the current task having

View File

@ -644,7 +644,7 @@ class Actor:
peers_str: str = '' peers_str: str = ''
for uid, chans in self._peers.items(): for uid, chans in self._peers.items():
peers_str += ( peers_str += (
f'uid: {uid}\n' f'|_ uid: {uid}\n'
) )
for i, chan in enumerate(chans): for i, chan in enumerate(chans):
peers_str += ( peers_str += (
@ -678,12 +678,10 @@ class Actor:
# XXX => YES IT DOES, when i was testing ctl-c # XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to # from broken debug TTY locking due to
# msg-spec races on application using RunVar... # msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if ( if (
(ctx_in_debug := pdb_lock.ctx_in_debug) pdb_user_uid
and and local_nursery
(pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
): ):
entry: tuple|None = local_nursery._children.get( entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid) tuple(pdb_user_uid)
@ -1171,17 +1169,13 @@ class Actor:
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
debug_req = _debug.DebugStatus dbcs = _debug.DebugStatus.req_cs
lock_req_ctx: Context = debug_req.req_ctx if dbcs is not None:
if lock_req_ctx is not None:
msg += ( msg += (
'-> Cancelling active debugger request..\n' '-> Cancelling active debugger request..\n'
f'|_{_debug.Lock.repr()}\n\n' f'|_{_debug.Lock.pformat()}'
f'|_{lock_req_ctx}\n\n'
) )
# lock_req_ctx._scope.cancel() dbcs.cancel()
# TODO: wrap this in a method-API..
debug_req.req_cs.cancel()
# self-cancel **all** ongoing RPC tasks # self-cancel **all** ongoing RPC tasks
await self.cancel_rpc_tasks( await self.cancel_rpc_tasks(
@ -1381,17 +1375,15 @@ class Actor:
"IPC channel's " "IPC channel's "
) )
rent_chan_repr: str = ( rent_chan_repr: str = (
f' |_{parent_chan}\n\n' f'|_{parent_chan}'
if parent_chan if parent_chan
else '' else ''
) )
log.cancel( log.cancel(
f'Cancelling {descr} RPC tasks\n\n' f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
f'<= canceller: {req_uid}\n' f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
f'{rent_chan_repr}' f' {rent_chan_repr}\n'
f'=> cancellee: {self.uid}\n' # f'{self}\n'
f' |_{self}.cancel_rpc_tasks()\n'
f' |_tasks: {len(tasks)}\n'
# f'{tasks_str}' # f'{tasks_str}'
) )
for ( for (
@ -1421,7 +1413,7 @@ class Actor:
if tasks: if tasks:
log.cancel( log.cancel(
'Waiting for remaining rpc tasks to complete\n' 'Waiting for remaining rpc tasks to complete\n'
f'|_{tasks_str}' f'|_{tasks}'
) )
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
@ -1474,10 +1466,7 @@ class Actor:
assert self._parent_chan, "No parent channel for this actor?" assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan) return Portal(self._parent_chan)
def get_chans( def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
self,
uid: tuple[str, str],
) -> list[Channel]:
''' '''
Return all IPC channels to the actor with provided `uid`. Return all IPC channels to the actor with provided `uid`.
@ -1637,9 +1626,7 @@ async def async_main(
# tranport address bind errors - normally it's # tranport address bind errors - normally it's
# something silly like the wrong socket-address # something silly like the wrong socket-address
# passed via a config or CLI Bo # passed via a config or CLI Bo
entered_debug = await _debug._maybe_enter_pm( entered_debug = await _debug._maybe_enter_pm(oserr)
oserr,
)
if entered_debug: if entered_debug:
log.runtime('Exited debug REPL..') log.runtime('Exited debug REPL..')
raise raise

View File

@ -142,9 +142,7 @@ async def exhaust_portal(
''' '''
__tracebackhide__ = True __tracebackhide__ = True
try: try:
log.debug( log.debug(f"Waiting on final result from {actor.uid}")
f'Waiting on final result from {actor.uid}'
)
# XXX: streams should never be reaped here since they should # XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
@ -197,10 +195,7 @@ async def cancel_on_completion(
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# an exception group and we still send out a cancel request # an exception group and we still send out a cancel request
result: Any|Exception = await exhaust_portal( result: Any|Exception = await exhaust_portal(portal, actor)
portal,
actor,
)
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid]: Exception = result errors[actor.uid]: Exception = result
log.cancel( log.cancel(
@ -508,6 +503,14 @@ async def trio_proc(
) )
) )
# await chan.send({
# '_parent_main_data': subactor._parent_main_data,
# 'enable_modules': subactor.enable_modules,
# 'reg_addrs': subactor.reg_addrs,
# 'bind_addrs': bind_addrs,
# '_runtime_vars': _runtime_vars,
# })
# track subactor in current nursery # track subactor in current nursery
curr_actor: Actor = current_actor() curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
@ -551,8 +554,8 @@ async def trio_proc(
# killing the process too early. # killing the process too early.
if proc: if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}') log.cancel(f'Hard reap sequence starting for {subactor.uid}')
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if cancelled_during_spawn:
# Try again to avoid TTY clobbering. # Try again to avoid TTY clobbering.

View File

@ -124,15 +124,9 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
) )
def current_ipc_ctx( def current_ipc_ctx() -> Context:
error_on_not_set: bool = False,
) -> Context|None:
ctx: Context = _ctxvar_Context.get() ctx: Context = _ctxvar_Context.get()
if not ctx:
if (
not ctx
and error_on_not_set
):
from ._exceptions import InternalError from ._exceptions import InternalError
raise InternalError( raise InternalError(
'No IPC context has been allocated for this task yet?\n' 'No IPC context has been allocated for this task yet?\n'

View File

@ -52,7 +52,6 @@ from tractor.msg import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._context import Context from ._context import Context
from ._ipc import Channel
log = get_logger(__name__) log = get_logger(__name__)
@ -66,10 +65,10 @@ log = get_logger(__name__)
class MsgStream(trio.abc.Channel): class MsgStream(trio.abc.Channel):
''' '''
A bidirectional message stream for receiving logically sequenced A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC `Channel`. values over an inter-actor IPC ``Channel``.
This is the type returned to a local task which entered either This is the type returned to a local task which entered either
`Portal.open_stream_from()` or `Context.open_stream()`. ``Portal.open_stream_from()`` or ``Context.open_stream()``.
Termination rules: Termination rules:
@ -96,22 +95,6 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool|trio.EndOfChannel = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
@property
def ctx(self) -> Context:
'''
This stream's IPC `Context` ref.
'''
return self._ctx
@property
def chan(self) -> Channel:
'''
Ref to the containing `Context`'s transport `Channel`.
'''
return self._ctx.chan
# TODO: could we make this a direct method bind to `PldRx`? # TODO: could we make this a direct method bind to `PldRx`?
# -> receive_nowait = PldRx.recv_pld # -> receive_nowait = PldRx.recv_pld
# |_ means latter would have to accept `MsgStream`-as-`self`? # |_ means latter would have to accept `MsgStream`-as-`self`?
@ -126,7 +109,7 @@ class MsgStream(trio.abc.Channel):
): ):
ctx: Context = self._ctx ctx: Context = self._ctx
return ctx._pld_rx.recv_pld_nowait( return ctx._pld_rx.recv_pld_nowait(
ipc=self, ctx=ctx,
expect_msg=expect_msg, expect_msg=expect_msg,
) )
@ -165,7 +148,7 @@ class MsgStream(trio.abc.Channel):
try: try:
ctx: Context = self._ctx ctx: Context = self._ctx
return await ctx._pld_rx.recv_pld(ipc=self) return await ctx._pld_rx.recv_pld(ctx=ctx)
# XXX: the stream terminates on either of: # XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure # - via `self._rx_chan.receive()` raising after manual closure

View File

@ -84,7 +84,6 @@ class ActorNursery:
ria_nursery: trio.Nursery, ria_nursery: trio.Nursery,
da_nursery: trio.Nursery, da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException], errors: dict[tuple[str, str], BaseException],
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
@ -106,7 +105,6 @@ class ActorNursery:
self._at_least_one_child_in_debug: bool = False self._at_least_one_child_in_debug: bool = False
self.errors = errors self.errors = errors
self.exited = trio.Event() self.exited = trio.Event()
self._scope_error: BaseException|None = None
# NOTE: when no explicit call is made to # NOTE: when no explicit call is made to
# `.open_root_actor()` by application code, # `.open_root_actor()` by application code,
@ -119,9 +117,7 @@ class ActorNursery:
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
*, *,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr], bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
@ -129,7 +125,6 @@ class ActorNursery:
nursery: trio.Nursery|None = None, nursery: trio.Nursery|None = None,
debug_mode: bool|None = None, debug_mode: bool|None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> Portal: ) -> Portal:
''' '''
Start a (daemon) actor: an process that has no designated Start a (daemon) actor: an process that has no designated
@ -194,13 +189,6 @@ class ActorNursery:
) )
) )
# TODO: DEPRECATE THIS:
# -[ ] impl instead as a hilevel wrapper on
# top of a `@context` style invocation.
# |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side.
# -[ ] use @api_frame on the wrapper
async def run_in_actor( async def run_in_actor(
self, self,
@ -233,7 +221,7 @@ class ActorNursery:
# use the explicit function name if not provided # use the explicit function name if not provided
name = fn.__name__ name = fn.__name__
portal: Portal = await self.start_actor( portal = await self.start_actor(
name, name,
enable_modules=[mod_path] + ( enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or [] enable_modules or rpc_module_paths or []
@ -262,7 +250,6 @@ class ActorNursery:
) )
return portal return portal
# @api_frame
async def cancel( async def cancel(
self, self,
hard_kill: bool = False, hard_kill: bool = False,
@ -360,11 +347,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# normally don't need to show user by default # TODO: yay or nay?
__tracebackhide__: bool = True __tracebackhide__ = True
outer_err: BaseException|None = None
inner_err: BaseException|None = None
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {} errors: dict[tuple[str, str], BaseException] = {}
@ -374,7 +358,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# handling errors that are generated by the inner nursery in # handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for # a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using # actors spawned in "daemon mode" (aka started using
# `ActorNursery.start_actor()`). # ``ActorNursery.start_actor()``).
# errors from this daemon actor nursery bubble up to caller # errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery() as da_nursery: async with trio.open_nursery() as da_nursery:
@ -409,8 +393,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) )
an._join_procs.set() an._join_procs.set()
except BaseException as _inner_err: except BaseException as inner_err:
inner_err = _inner_err
errors[actor.uid] = inner_err errors[actor.uid] = inner_err
# If we error in the root but the debugger is # If we error in the root but the debugger is
@ -488,10 +471,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
trio.Cancelled trio.Cancelled
) as _outer_err:
outer_err = _outer_err
an._scope_error = outer_err or inner_err ) as err:
# XXX: yet another guard before allowing the cancel # XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug. # sequence in case a (single) child is in debug.
@ -506,7 +487,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
if an._children: if an._children:
log.cancel( log.cancel(
'Actor-nursery cancelling due error type:\n' 'Actor-nursery cancelling due error type:\n'
f'{outer_err}\n' f'{err}\n'
) )
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await an.cancel() await an.cancel()
@ -533,19 +514,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
else: else:
raise list(errors.values())[0] raise list(errors.values())[0]
# show frame on any (likely) internal error
if (
not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
# da_nursery scope end - nursery checkpoint # da_nursery scope end - nursery checkpoint
# final exit # final exit
@acm @acm
# @api_frame
async def open_nursery( async def open_nursery(
**kwargs, **kwargs,
@ -565,7 +538,6 @@ async def open_nursery(
which cancellation scopes correspond to each spawned subactor set. which cancellation scopes correspond to each spawned subactor set.
''' '''
__tracebackhide__: bool = True
implicit_runtime: bool = False implicit_runtime: bool = False
actor: Actor = current_actor(err_on_no_runtime=False) actor: Actor = current_actor(err_on_no_runtime=False)
an: ActorNursery|None = None an: ActorNursery|None = None
@ -616,14 +588,6 @@ async def open_nursery(
an.exited.set() an.exited.set()
finally: finally:
# show frame on any internal runtime-scope error
if (
an
and not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
msg: str = ( msg: str = (
'Actor-nursery exited\n' 'Actor-nursery exited\n'
f'|_{an}\n' f'|_{an}\n'

View File

@ -20,8 +20,11 @@ as it pertains to improving the grok-ability of our runtime!
''' '''
from __future__ import annotations from __future__ import annotations
from functools import partial
import inspect import inspect
# import msgspec
# from pprint import pformat
import textwrap
import traceback
from types import ( from types import (
FrameType, FrameType,
FunctionType, FunctionType,
@ -29,8 +32,9 @@ from types import (
# CodeType, # CodeType,
) )
from typing import ( from typing import (
Any, # Any,
Callable, Callable,
# TYPE_CHECKING,
Type, Type,
) )
@ -38,7 +42,6 @@ from tractor.msg import (
pretty_struct, pretty_struct,
NamespacePath, NamespacePath,
) )
import wrapt
# TODO: yeah, i don't love this and we should prolly just # TODO: yeah, i don't love this and we should prolly just
@ -80,31 +83,6 @@ def get_class_from_frame(fr: FrameType) -> (
return None return None
def get_ns_and_func_from_frame(
frame: FrameType,
) -> Callable:
'''
Return the corresponding function object reference from
a `FrameType`, and return it and it's parent namespace `dict`.
'''
ns: dict[str, Any]
# for a method, go up a frame and lookup the name in locals()
if '.' in (qualname := frame.f_code.co_qualname):
cls_name, _, func_name = qualname.partition('.')
ns = frame.f_back.f_locals[cls_name].__dict__
else:
func_name: str = frame.f_code.co_name
ns = frame.f_globals
return (
ns,
ns[func_name],
)
def func_ref_from_frame( def func_ref_from_frame(
frame: FrameType, frame: FrameType,
) -> Callable: ) -> Callable:
@ -120,63 +98,34 @@ def func_ref_from_frame(
) )
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct): class CallerInfo(pretty_struct.Struct):
# https://docs.python.org/dev/reference/datamodel.html#frame-objects rt_fi: inspect.FrameInfo
# https://docs.python.org/dev/library/inspect.html#the-interpreter-stack call_frame: FrameType
_api_frame: FrameType
@property @property
def api_frame(self) -> FrameType: def api_func_ref(self) -> Callable|None:
try: return func_ref_from_frame(self.rt_fi.frame)
self._api_frame.clear()
except RuntimeError:
# log.warning(
print(
f'Frame {self._api_frame} for {self.api_func} is still active!'
)
return self._api_frame
_api_func: Callable
@property
def api_func(self) -> Callable:
return self._api_func
_caller_frames_up: int|None = 1
_caller_frame: FrameType|None = None # cached after first stack scan
@property @property
def api_nsp(self) -> NamespacePath|None: def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func func: FunctionType = self.api_func_ref
if func: if func:
return NamespacePath.from_ref(func) return NamespacePath.from_ref(func)
return '<unknown>' return '<unknown>'
@property @property
def caller_frame(self) -> FrameType: def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
# if not already cached, scan up stack explicitly by
# configured count.
if not self._caller_frame:
if self._caller_frames_up:
for _ in range(self._caller_frames_up):
caller_frame: FrameType|None = self.api_frame.f_back
if not caller_frame:
raise ValueError(
'No frame exists {self._caller_frames_up} up from\n'
f'{self.api_frame} @ {self.api_nsp}\n'
)
self._caller_frame = caller_frame
return self._caller_frame
@property @property
def caller_nsp(self) -> NamespacePath|None: def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func func: FunctionType = self.caller_func_ref
if func: if func:
return NamespacePath.from_ref(func) return NamespacePath.from_ref(func)
@ -223,66 +172,108 @@ def find_caller_info(
call_frame = call_frame.f_back call_frame = call_frame.f_back
return CallerInfo( return CallerInfo(
_api_frame=rt_frame, rt_fi=fi,
_api_func=func_ref_from_frame(rt_frame), call_frame=call_frame,
_caller_frames_up=go_up_iframes,
) )
return None return None
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} def pformat_boxed_tb(
tb_str: str,
fields_str: str|None = None,
field_prefix: str = ' |_',
tb_box_indent: int|None = None,
tb_body_indent: int = 1,
# TODO: -[x] move all this into new `.devx._code`! ) -> str:
# -[ ] consider rename to _callstack? '''
# -[ ] prolly create a `@runtime_api` dec? Create a "boxed" looking traceback string.
# |_ @api_frame seems better?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
def api_frame(
wrapped: Callable|None = None,
*,
caller_frames_up: int = 1,
) -> Callable: Useful for emphasizing traceback text content as being an
embedded attribute of some other object (like
a `RemoteActorError` or other boxing remote error shuttle
container).
# handle the decorator called WITHOUT () case, Any other parent/container "fields" can be passed in the
# i.e. just @api_frame, NOT @api_frame(extra=<blah>) `fields_str` input along with other prefix/indent settings.
if wrapped is None:
return partial( '''
api_frame, if (
caller_frames_up=caller_frames_up, fields_str
and
field_prefix
):
fields: str = textwrap.indent(
fields_str,
prefix=field_prefix,
)
else:
fields = fields_str or ''
tb_body = tb_str
if tb_body_indent:
tb_body: str = textwrap.indent(
tb_str,
prefix=tb_body_indent * ' ',
) )
@wrapt.decorator tb_box: str = (
async def wrapper(
wrapped: Callable,
instance: object,
args: tuple,
kwargs: dict,
):
# maybe cache the API frame for this call
global _frame2callerinfo_cache
this_frame: FrameType = inspect.currentframe()
api_frame: FrameType = this_frame.f_back
if not _frame2callerinfo_cache.get(api_frame): # orig
_frame2callerinfo_cache[api_frame] = CallerInfo( # f' |\n'
_api_frame=api_frame, # f' ------ - ------\n\n'
_api_func=wrapped, # f'{tb_str}\n'
_caller_frames_up=caller_frames_up, # f' ------ - ------\n'
) # f' _|\n'
return wrapped(*args, **kwargs) f'|\n'
f' ------ - ------\n\n'
# f'{tb_str}\n'
f'{tb_body}'
f' ------ - ------\n'
f'_|\n'
)
tb_box_indent: str = (
tb_box_indent
or
1
# annotate the function as a "api function", meaning it is # (len(field_prefix))
# a function for which the function above it in the call stack should be # ? ^-TODO-^ ? if you wanted another indent level
# non-`tractor` code aka "user code". )
# if tb_box_indent > 0:
# in the global frame cache for easy lookup from a given tb_box: str = textwrap.indent(
# func-instance tb_box,
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache prefix=tb_box_indent * ' ',
wrapped.__api_func__: bool = True )
return wrapper(wrapped)
return (
fields
+
tb_box
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,
) -> str:
'''
Capture and return the traceback text content from
`stack_limit` call frames up.
'''
tb_str: str = (
'\n'.join(
traceback.format_stack(limit=stack_limit)
)
)
if box_tb:
tb_str: str = pformat_boxed_tb(
tb_str=tb_str,
field_prefix=' ',
indent='',
)
return tb_str

File diff suppressed because it is too large Load Diff

View File

@ -57,8 +57,8 @@ LEVELS: dict[str, int] = {
'TRANSPORT': 5, 'TRANSPORT': 5,
'RUNTIME': 15, 'RUNTIME': 15,
'CANCEL': 16, 'CANCEL': 16,
'DEVX': 400,
'PDB': 500, 'PDB': 500,
'DEVX': 600,
} }
# _custom_levels: set[str] = { # _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys() # lvlname.lower for lvlname in LEVELS.keys()
@ -137,7 +137,7 @@ class StackLevelAdapter(LoggerAdapter):
"Developer experience" sub-sys statuses. "Developer experience" sub-sys statuses.
''' '''
return self.log(400, msg) return self.log(600, msg)
def log( def log(
self, self,
@ -202,29 +202,8 @@ class StackLevelAdapter(LoggerAdapter):
) )
# TODO IDEA:
# -[ ] do per task-name and actor-name color coding
# -[ ] unique color per task-id and actor-uuid
def pformat_task_uid(
id_part: str = 'tail'
):
'''
Return `str`-ified unique for a `trio.Task` via a combo of its
`.name: str` and `id()` truncated output.
'''
task: trio.Task = trio.lowlevel.current_task()
tid: str = str(id(task))
if id_part == 'tail':
tid_part: str = tid[-6:]
else:
tid_part: str = tid[:6]
return f'{task.name}[{tid_part}]'
_conc_name_getters = { _conc_name_getters = {
'task': pformat_task_uid, 'task': lambda: trio.lowlevel.current_task().name,
'actor': lambda: current_actor(), 'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name, 'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6], 'actor_uid': lambda: current_actor().uid[1][:6],
@ -232,10 +211,7 @@ _conc_name_getters = {
class ActorContextInfo(Mapping): class ActorContextInfo(Mapping):
''' "Dyanmic lookup for local actor and task names"
Dyanmic lookup for local actor and task names.
'''
_context_keys = ( _context_keys = (
'task', 'task',
'actor', 'actor',

View File

@ -44,7 +44,7 @@ from ._codec import (
# ) # )
from .types import ( from .types import (
PayloadMsg as PayloadMsg, Msg as Msg,
Aid as Aid, Aid as Aid,
SpawnSpec as SpawnSpec, SpawnSpec as SpawnSpec,

View File

@ -432,7 +432,7 @@ class MsgCodec(Struct):
# ) -> Any|Struct: # ) -> Any|Struct:
# msg: PayloadMsg = codec.dec.decode(msg) # msg: Msg = codec.dec.decode(msg)
# payload_tag: str = msg.header.payload_tag # payload_tag: str = msg.header.payload_tag
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag] # payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
# return payload_dec.decode(msg.pld) # return payload_dec.decode(msg.pld)

View File

@ -22,9 +22,10 @@ operational helpers for processing transaction flows.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, # asynccontextmanager as acm,
contextmanager as cm, contextmanager as cm,
) )
from contextvars import ContextVar
from typing import ( from typing import (
Any, Any,
Type, Type,
@ -49,7 +50,6 @@ from tractor._exceptions import (
_mk_msg_type_err, _mk_msg_type_err,
pack_from_raise, pack_from_raise,
) )
from tractor._state import current_ipc_ctx
from ._codec import ( from ._codec import (
mk_dec, mk_dec,
MsgDec, MsgDec,
@ -75,7 +75,7 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
_def_any_pldec: MsgDec[Any] = mk_dec() _def_any_pldec: MsgDec = mk_dec()
class PldRx(Struct): class PldRx(Struct):
@ -104,19 +104,15 @@ class PldRx(Struct):
''' '''
# TODO: better to bind it here? # TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel # _rx_mc: trio.MemoryReceiveChannel
_pld_dec: MsgDec _pldec: MsgDec
_ctx: Context|None = None
_ipc: Context|MsgStream|None = None _ipc: Context|MsgStream|None = None
@property @property
def pld_dec(self) -> MsgDec: def pld_dec(self) -> MsgDec:
return self._pld_dec return self._pldec
# TODO: a better name?
# -[ ] when would this be used as it avoids needingn to pass the
# ipc prim to every method
@cm @cm
def wraps_ipc( def apply_to_ipc(
self, self,
ipc_prim: Context|MsgStream, ipc_prim: Context|MsgStream,
@ -144,50 +140,49 @@ class PldRx(Struct):
exit. exit.
''' '''
orig_dec: MsgDec = self._pld_dec orig_dec: MsgDec = self._pldec
limit_dec: MsgDec = mk_dec(spec=spec) limit_dec: MsgDec = mk_dec(spec=spec)
try: try:
self._pld_dec = limit_dec self._pldec = limit_dec
yield limit_dec yield limit_dec
finally: finally:
self._pld_dec = orig_dec self._pldec = orig_dec
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._pld_dec.dec return self._pldec.dec
def recv_pld_nowait( def recv_pld_nowait(
self, self,
# TODO: make this `MsgStream` compat as well, see above^ # TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream, # ipc_prim: Context|MsgStream,
ipc: Context|MsgStream, ctx: Context,
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = False,
**dec_msg_kwargs, **dec_msg_kwargs,
) -> Any|Raw: ) -> Any|Raw:
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = True
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait() ctx._rx_chan.receive_nowait()
) )
return self.dec_msg( return self.dec_msg(
msg, msg,
ipc=ipc, ctx=ctx,
expect_msg=expect_msg, expect_msg=expect_msg,
hide_tb=hide_tb,
**dec_msg_kwargs, **dec_msg_kwargs,
) )
async def recv_pld( async def recv_pld(
self, self,
ipc: Context|MsgStream, ctx: Context,
ipc_msg: MsgType|None = None, ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None, expect_msg: Type[MsgType]|None = None,
hide_tb: bool = True, hide_tb: bool = True,
@ -205,11 +200,11 @@ class PldRx(Struct):
or or
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ctx._rx_chan.receive()
) )
return self.dec_msg( return self.dec_msg(
msg=msg, msg=msg,
ipc=ipc, ctx=ctx,
expect_msg=expect_msg, expect_msg=expect_msg,
**dec_msg_kwargs, **dec_msg_kwargs,
) )
@ -217,7 +212,7 @@ class PldRx(Struct):
def dec_msg( def dec_msg(
self, self,
msg: MsgType, msg: MsgType,
ipc: Context|MsgStream, ctx: Context,
expect_msg: Type[MsgType]|None, expect_msg: Type[MsgType]|None,
raise_error: bool = True, raise_error: bool = True,
@ -230,9 +225,6 @@ class PldRx(Struct):
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
_src_err = None
src_err: BaseException|None = None
match msg: match msg:
# payload-data shuttle msg; deliver the `.pld` value # payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code. # directly to IPC (primitive) client-consumer code.
@ -242,7 +234,7 @@ class PldRx(Struct):
|Return(pld=pld) # termination phase |Return(pld=pld) # termination phase
): ):
try: try:
pld: PayloadT = self._pld_dec.decode(pld) pld: PayloadT = self._pldec.decode(pld)
log.runtime( log.runtime(
'Decoded msg payload\n\n' 'Decoded msg payload\n\n'
f'{msg}\n\n' f'{msg}\n\n'
@ -251,30 +243,25 @@ class PldRx(Struct):
) )
return pld return pld
# XXX pld-value type failure # XXX pld-type failure
except ValidationError as valerr: except ValidationError as src_err:
# pack mgterr into error-msg for
# reraise below; ensure remote-actor-err
# info is displayed nicely?
msgterr: MsgTypeError = _mk_msg_type_err( msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg, msg=msg,
codec=self.pld_dec, codec=self.pld_dec,
src_validation_error=valerr, src_validation_error=src_err,
is_invalid_payload=True, is_invalid_payload=True,
) )
msg: Error = pack_from_raise( msg: Error = pack_from_raise(
local_err=msgterr, local_err=msgterr,
cid=msg.cid, cid=msg.cid,
src_uid=ipc.chan.uid, src_uid=ctx.chan.uid,
) )
src_err = valerr
# XXX some other decoder specific failure? # XXX some other decoder specific failure?
# except TypeError as src_error: # except TypeError as src_error:
# from .devx import mk_pdb # from .devx import mk_pdb
# mk_pdb().set_trace() # mk_pdb().set_trace()
# raise src_error # raise src_error
# ^-TODO-^ can remove?
# a runtime-internal RPC endpoint response. # a runtime-internal RPC endpoint response.
# always passthrough since (internal) runtime # always passthrough since (internal) runtime
@ -312,7 +299,6 @@ class PldRx(Struct):
return src_err return src_err
case Stop(cid=cid): case Stop(cid=cid):
ctx: Context = getattr(ipc, 'ctx', ipc)
message: str = ( message: str = (
f'{ctx.side!r}-side of ctx received stream-`Stop` from ' f'{ctx.side!r}-side of ctx received stream-`Stop` from '
f'{ctx.peer_side!r} peer ?\n' f'{ctx.peer_side!r} peer ?\n'
@ -355,21 +341,14 @@ class PldRx(Struct):
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note # |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
# #
# fallthrough and raise from `src_err` # fallthrough and raise from `src_err`
try: _raise_from_unexpected_msg(
_raise_from_unexpected_msg( ctx=ctx,
ctx=getattr(ipc, 'ctx', ipc), msg=msg,
msg=msg, src_err=src_err,
src_err=src_err, log=log,
log=log, expect_msg=expect_msg,
expect_msg=expect_msg, hide_tb=hide_tb,
hide_tb=hide_tb, )
)
except UnboundLocalError:
# XXX if there's an internal lookup error in the above
# code (prolly on `src_err`) we want to show this frame
# in the tb!
__tracebackhide__: bool = False
raise
async def recv_msg_w_pld( async def recv_msg_w_pld(
self, self,
@ -399,13 +378,52 @@ class PldRx(Struct):
# msg instance? # msg instance?
pld: PayloadT = self.dec_msg( pld: PayloadT = self.dec_msg(
msg, msg,
ipc=ipc, ctx=ipc,
expect_msg=expect_msg, expect_msg=expect_msg,
**kwargs, **kwargs,
) )
return msg, pld return msg, pld
# Always maintain a task-context-global `PldRx`
_def_pld_rx: PldRx = PldRx(
_pldec=_def_any_pldec,
)
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
'pld_rx',
default=_def_pld_rx,
)
def current_pldrx() -> PldRx:
'''
Return the current `trio.Task.context`'s msg-payload-receiver.
A payload receiver is the IPC-msg processing sub-sys which
filters inter-actor-task communicated payload data, i.e. the
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
up from `tractor`'s transport layer but BEFORE the data is
yielded to application code, normally via an IPC primitive API
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
Modification of the current payload spec via `limit_plds()`
allows a `tractor` application to contextually filter IPC
payload content with a type specification as supported by
the interchange backend.
- for `msgspec` see <PUTLINKHERE>.
NOTE that the `PldRx` itself is a per-`Context` global sub-system
that normally does not change other then the applied pld-spec
for the current `trio.Task`.
'''
# ctx: context = current_ipc_ctx()
# return ctx._pld_rx
return _ctxvar_PldRx.get()
@cm @cm
def limit_plds( def limit_plds(
spec: Union[Type[Struct]], spec: Union[Type[Struct]],
@ -421,55 +439,29 @@ def limit_plds(
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
try: try:
curr_ctx: Context = current_ipc_ctx() # sanity on orig settings
rx: PldRx = curr_ctx._pld_rx orig_pldrx: PldRx = current_pldrx()
orig_pldec: MsgDec = rx.pld_dec orig_pldec: MsgDec = orig_pldrx.pld_dec
with rx.limit_plds( with orig_pldrx.limit_plds(
spec=spec, spec=spec,
**kwargs, **kwargs,
) as pldec: ) as pldec:
log.runtime( log.info(
'Applying payload-decoder\n\n' 'Applying payload-decoder\n\n'
f'{pldec}\n' f'{pldec}\n'
) )
yield pldec yield pldec
finally: finally:
log.runtime( log.info(
'Reverted to previous payload-decoder\n\n' 'Reverted to previous payload-decoder\n\n'
f'{orig_pldec}\n' f'{orig_pldec}\n'
) )
# sanity on orig settings assert (
assert rx.pld_dec is orig_pldec (pldrx := current_pldrx()) is orig_pldrx
and
pldrx.pld_dec is orig_pldec
@acm )
async def maybe_limit_plds(
ctx: Context,
spec: Union[Type[Struct]]|None = None,
**kwargs,
) -> MsgDec|None:
'''
Async compat maybe-payload type limiter.
Mostly for use inside other internal `@acm`s such that a separate
indent block isn't needed when an async one is already being
used.
'''
if spec is None:
yield None
return
# sanity on scoping
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
yield msgdec
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
async def drain_to_final_msg( async def drain_to_final_msg(
@ -551,12 +543,21 @@ async def drain_to_final_msg(
match msg: match msg:
# final result arrived! # final result arrived!
case Return(): case Return(
# cid=cid,
# pld=res,
):
# ctx._result: Any = res
ctx._result: Any = pld
log.runtime( log.runtime(
'Context delivered final draining msg:\n' 'Context delivered final draining msg:\n'
f'{pretty_struct.pformat(msg)}' f'{pretty_struct.pformat(msg)}'
) )
ctx._result: Any = pld # XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._rx_chan:
# await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right?
result_msg = msg result_msg = msg
break break
@ -663,6 +664,24 @@ async def drain_to_final_msg(
result_msg = msg result_msg = msg
break # OOOOOF, yeah obvi we need this.. break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=ctx._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is ctx._cancel_msg
# assert error.msgdata == ctx._remote_error.msgdata
# assert error.ipc_msg == ctx._remote_error.ipc_msg
# from .devx._debug import pause
# await pause()
# ctx._maybe_cancel_and_set_remote_error(error)
# ctx._maybe_raise_remote_err(error)
else: else:
# bubble the original src key error # bubble the original src key error
raise raise

View File

@ -56,7 +56,8 @@ log = get_logger('tractor.msgspec')
PayloadT = TypeVar('PayloadT') PayloadT = TypeVar('PayloadT')
class PayloadMsg( # TODO: PayloadMsg
class Msg(
Struct, Struct,
Generic[PayloadT], Generic[PayloadT],
@ -109,10 +110,6 @@ class PayloadMsg(
pld: Raw pld: Raw
# TODO: complete rename
Msg = PayloadMsg
class Aid( class Aid(
Struct, Struct,
tag=True, tag=True,
@ -302,7 +299,7 @@ class StartAck(
class Started( class Started(
PayloadMsg, Msg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -316,12 +313,12 @@ class Started(
# TODO: instead of using our existing `Start` # TODO: instead of using our existing `Start`
# for this (as we did with the original `{'cmd': ..}` style) # for this (as we did with the original `{'cmd': ..}` style)
# class Cancel: # class Cancel(Msg):
# cid: str # cid: str
class Yield( class Yield(
PayloadMsg, Msg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -348,7 +345,7 @@ class Stop(
# TODO: is `Result` or `Out[come]` a better name? # TODO: is `Result` or `Out[come]` a better name?
class Return( class Return(
PayloadMsg, Msg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -360,7 +357,7 @@ class Return(
class CancelAck( class CancelAck(
PayloadMsg, Msg,
Generic[PayloadT], Generic[PayloadT],
): ):
''' '''
@ -466,14 +463,14 @@ def from_dict_msg(
# TODO: should be make a msg version of `ContextCancelled?` # TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`? # and/or with a scope field or a full `ActorCancelled`?
# class Cancelled(MsgType): # class Cancelled(Msg):
# cid: str # cid: str
# TODO what about overruns? # TODO what about overruns?
# class Overrun(MsgType): # class Overrun(Msg):
# cid: str # cid: str
_runtime_msgs: list[Struct] = [ _runtime_msgs: list[Msg] = [
# identity handshake on first IPC `Channel` contact. # identity handshake on first IPC `Channel` contact.
Aid, Aid,
@ -499,9 +496,9 @@ _runtime_msgs: list[Struct] = [
] ]
# the no-outcome-yet IAC (inter-actor-communication) sub-set which # the no-outcome-yet IAC (inter-actor-communication) sub-set which
# can be `PayloadMsg.pld` payload field type-limited by application code # can be `Msg.pld` payload field type-limited by application code
# using `apply_codec()` and `limit_msg_spec()`. # using `apply_codec()` and `limit_msg_spec()`.
_payload_msgs: list[PayloadMsg] = [ _payload_msgs: list[Msg] = [
# first <value> from `Context.started(<value>)` # first <value> from `Context.started(<value>)`
Started, Started,
@ -544,8 +541,8 @@ def mk_msg_spec(
] = 'indexed_generics', ] = 'indexed_generics',
) -> tuple[ ) -> tuple[
Union[MsgType], Union[Type[Msg]],
list[MsgType], list[Type[Msg]],
]: ]:
''' '''
Create a payload-(data-)type-parameterized IPC message specification. Create a payload-(data-)type-parameterized IPC message specification.
@ -557,7 +554,7 @@ def mk_msg_spec(
determined by the input `payload_type_union: Union[Type]`. determined by the input `payload_type_union: Union[Type]`.
''' '''
submsg_types: list[MsgType] = Msg.__subclasses__() submsg_types: list[Type[Msg]] = Msg.__subclasses__()
bases: tuple = ( bases: tuple = (
# XXX NOTE XXX the below generic-parameterization seems to # XXX NOTE XXX the below generic-parameterization seems to
# be THE ONLY way to get this to work correctly in terms # be THE ONLY way to get this to work correctly in terms