Compare commits
21 Commits
343b7c9712
...
b22f7dcae0
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | b22f7dcae0 | |
Tyler Goodlet | fde62c72be | |
Tyler Goodlet | 4ef77bb64f | |
Tyler Goodlet | e78fdf2f69 | |
Tyler Goodlet | 13bc3c308d | |
Tyler Goodlet | 60fc43e530 | |
Tyler Goodlet | 30afcd2b6b | |
Tyler Goodlet | c80f020ebc | |
Tyler Goodlet | 262a0e36c6 | |
Tyler Goodlet | d93135acd8 | |
Tyler Goodlet | b23780c102 | |
Tyler Goodlet | 31de5f6648 | |
Tyler Goodlet | 236083b6e4 | |
Tyler Goodlet | d2dee87b36 | |
Tyler Goodlet | 5cb0cc0f0b | |
Tyler Goodlet | fc075e96c6 | |
Tyler Goodlet | d6ca4771ce | |
Tyler Goodlet | c5a0cfc639 | |
Tyler Goodlet | f85314ecab | |
Tyler Goodlet | c929bc15c9 | |
Tyler Goodlet | 6690968236 |
|
@ -1,6 +1,11 @@
|
|||
import time
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import (
|
||||
ActorNursery,
|
||||
MsgStream,
|
||||
Portal,
|
||||
)
|
||||
|
||||
|
||||
# this is the first 2 actors, streamer_1 and streamer_2
|
||||
|
@ -12,14 +17,18 @@ async def stream_data(seed):
|
|||
|
||||
# this is the third actor; the aggregator
|
||||
async def aggregate(seed):
|
||||
"""Ensure that the two streams we receive match but only stream
|
||||
'''
|
||||
Ensure that the two streams we receive match but only stream
|
||||
a single set of values to the parent.
|
||||
"""
|
||||
async with tractor.open_nursery() as nursery:
|
||||
portals = []
|
||||
|
||||
'''
|
||||
an: ActorNursery
|
||||
async with tractor.open_nursery() as an:
|
||||
portals: list[Portal] = []
|
||||
for i in range(1, 3):
|
||||
# fork point
|
||||
portal = await nursery.start_actor(
|
||||
|
||||
# fork/spawn call
|
||||
portal = await an.start_actor(
|
||||
name=f'streamer_{i}',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
@ -43,7 +52,11 @@ async def aggregate(seed):
|
|||
async with trio.open_nursery() as n:
|
||||
|
||||
for portal in portals:
|
||||
n.start_soon(push_to_chan, portal, send_chan.clone())
|
||||
n.start_soon(
|
||||
push_to_chan,
|
||||
portal,
|
||||
send_chan.clone(),
|
||||
)
|
||||
|
||||
# close this local task's reference to send side
|
||||
await send_chan.aclose()
|
||||
|
@ -60,7 +73,7 @@ async def aggregate(seed):
|
|||
|
||||
print("FINISHED ITERATING in aggregator")
|
||||
|
||||
await nursery.cancel()
|
||||
await an.cancel()
|
||||
print("WAITING on `ActorNursery` to finish")
|
||||
print("AGGREGATOR COMPLETE!")
|
||||
|
||||
|
@ -75,18 +88,21 @@ async def main() -> list[int]:
|
|||
|
||||
'''
|
||||
# yes, a nursery which spawns `trio`-"actors" B)
|
||||
nursery: tractor.ActorNursery
|
||||
async with tractor.open_nursery() as nursery:
|
||||
an: ActorNursery
|
||||
async with tractor.open_nursery(
|
||||
loglevel='cancel',
|
||||
debug_mode=True,
|
||||
) as an:
|
||||
|
||||
seed = int(1e3)
|
||||
pre_start = time.time()
|
||||
|
||||
portal: tractor.Portal = await nursery.start_actor(
|
||||
portal: Portal = await an.start_actor(
|
||||
name='aggregator',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
stream: tractor.MsgStream
|
||||
stream: MsgStream
|
||||
async with portal.open_stream_from(
|
||||
aggregate,
|
||||
seed=seed,
|
||||
|
@ -95,11 +111,12 @@ async def main() -> list[int]:
|
|||
start = time.time()
|
||||
# the portal call returns exactly what you'd expect
|
||||
# as if the remote "aggregate" function was called locally
|
||||
result_stream = []
|
||||
result_stream: list[int] = []
|
||||
async for value in stream:
|
||||
result_stream.append(value)
|
||||
|
||||
await portal.cancel_actor()
|
||||
cancelled: bool = await portal.cancel_actor()
|
||||
assert cancelled
|
||||
|
||||
print(f"STREAM TIME = {time.time() - start}")
|
||||
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
|
||||
|
|
|
@ -55,6 +55,7 @@ xontrib-vox = "^0.0.1"
|
|||
optional = false
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pytest = "^8.2.0"
|
||||
pexpect = "^4.9.0"
|
||||
|
||||
# only for xonsh as sh..
|
||||
xontrib-vox = "^0.0.1"
|
||||
|
|
|
@ -97,6 +97,7 @@ def test_ipc_channel_break_during_stream(
|
|||
examples_dir() / 'advanced_faults'
|
||||
/ 'ipc_failure_during_stream.py',
|
||||
root=examples_dir(),
|
||||
consider_namespace_packages=False,
|
||||
)
|
||||
|
||||
# by def we expect KBI from user after a simulated "hang
|
||||
|
|
|
@ -89,17 +89,30 @@ def test_remote_error(reg_addr, args_err):
|
|||
assert excinfo.value.boxed_type == errtype
|
||||
|
||||
else:
|
||||
# the root task will also error on the `.result()` call
|
||||
# so we expect an error from there AND the child.
|
||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||
# the root task will also error on the `Portal.result()`
|
||||
# call so we expect an error from there AND the child.
|
||||
# |_ tho seems like on new `trio` this doesn't always
|
||||
# happen?
|
||||
with pytest.raises((
|
||||
BaseExceptionGroup,
|
||||
tractor.RemoteActorError,
|
||||
)) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
# ensure boxed errors
|
||||
for exc in excinfo.value.exceptions:
|
||||
# ensure boxed errors are `errtype`
|
||||
err: BaseException = excinfo.value
|
||||
if isinstance(err, BaseExceptionGroup):
|
||||
suberrs: list[BaseException] = err.exceptions
|
||||
else:
|
||||
suberrs: list[BaseException] = [err]
|
||||
|
||||
for exc in suberrs:
|
||||
assert exc.boxed_type == errtype
|
||||
|
||||
|
||||
def test_multierror(reg_addr):
|
||||
def test_multierror(
|
||||
reg_addr: tuple[str, int],
|
||||
):
|
||||
'''
|
||||
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
||||
more then one actor errors.
|
||||
|
|
|
@ -444,6 +444,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
|||
infect_asyncio=True,
|
||||
fan_out=fan_out,
|
||||
)
|
||||
# should raise RAE diectly
|
||||
await portal.result()
|
||||
|
||||
trio.run(main)
|
||||
|
@ -461,12 +462,11 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
|||
# should trigger remote actor error
|
||||
await portal.result()
|
||||
|
||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
# ensure boxed errors
|
||||
for exc in excinfo.value.exceptions:
|
||||
assert exc.boxed_type == Exception
|
||||
# ensure boxed error type
|
||||
excinfo.value.boxed_type == Exception
|
||||
|
||||
|
||||
def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||
|
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
|
|||
exit_early=True,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
# should trigger remote actor error
|
||||
# should raise RAE diectly
|
||||
await portal.result()
|
||||
|
||||
# should be a quiet exit on a simple channel exit
|
||||
|
@ -492,15 +492,17 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
|||
aio_raise_err=True,
|
||||
infect_asyncio=True,
|
||||
)
|
||||
# should trigger remote actor error
|
||||
# should trigger RAE directly, not an eg.
|
||||
await portal.result()
|
||||
|
||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||
with pytest.raises(
|
||||
# NOTE: bc we directly wait on `Portal.result()` instead
|
||||
# of capturing it inside the `ActorNursery` machinery.
|
||||
expected_exception=RemoteActorError,
|
||||
) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
# ensure boxed errors
|
||||
for exc in excinfo.value.exceptions:
|
||||
assert exc.boxed_type == Exception
|
||||
excinfo.value.boxed_type == Exception
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
|
@ -55,9 +55,10 @@ from tractor._testing import (
|
|||
|
||||
|
||||
@tractor.context
|
||||
async def sleep_forever(
|
||||
async def open_stream_then_sleep_forever(
|
||||
ctx: Context,
|
||||
expect_ctxc: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Sync the context, open a stream then just sleep.
|
||||
|
@ -67,6 +68,10 @@ async def sleep_forever(
|
|||
'''
|
||||
try:
|
||||
await ctx.started()
|
||||
|
||||
# NOTE: the below means this child will send a `Stop`
|
||||
# to it's parent-side task despite that side never
|
||||
# opening a stream itself.
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
@ -100,7 +105,7 @@ async def error_before_started(
|
|||
'''
|
||||
async with tractor.wait_for_actor('sleeper') as p2:
|
||||
async with (
|
||||
p2.open_context(sleep_forever) as (peer_ctx, first),
|
||||
p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first),
|
||||
peer_ctx.open_stream(),
|
||||
):
|
||||
# NOTE: this WAS inside an @acm body but i factored it
|
||||
|
@ -204,9 +209,13 @@ async def stream_ints(
|
|||
@tractor.context
|
||||
async def stream_from_peer(
|
||||
ctx: Context,
|
||||
debug_mode: bool,
|
||||
peer_name: str = 'sleeper',
|
||||
) -> None:
|
||||
|
||||
# sanity
|
||||
assert tractor._state.debug_mode() == debug_mode
|
||||
|
||||
peer: Portal
|
||||
try:
|
||||
async with (
|
||||
|
@ -240,26 +249,54 @@ async def stream_from_peer(
|
|||
assert msg is not None
|
||||
print(msg)
|
||||
|
||||
# NOTE: cancellation of the (sleeper) peer should always
|
||||
# cause a `ContextCancelled` raise in this streaming
|
||||
# actor.
|
||||
except ContextCancelled as ctxc:
|
||||
ctxerr = ctxc
|
||||
# NOTE: cancellation of the (sleeper) peer should always cause
|
||||
# a `ContextCancelled` raise in this streaming actor.
|
||||
except ContextCancelled as _ctxc:
|
||||
ctxc = _ctxc
|
||||
|
||||
assert peer_ctx._remote_error is ctxerr
|
||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
||||
# print("TRYING TO ENTER PAUSSE!!!")
|
||||
# await tractor.pause(shield=True)
|
||||
re: ContextCancelled = peer_ctx._remote_error
|
||||
|
||||
# XXX YES, bc exact same msg instances
|
||||
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
|
||||
# XXX YES XXX, remote error should be unpacked only once!
|
||||
assert (
|
||||
re
|
||||
is
|
||||
peer_ctx.maybe_error
|
||||
is
|
||||
ctxc
|
||||
is
|
||||
peer_ctx._local_error
|
||||
)
|
||||
# NOTE: these errors should all match!
|
||||
# ------ - ------
|
||||
# XXX [2024-05-03] XXX
|
||||
# ------ - ------
|
||||
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
|
||||
# where the `Error()` msg was directly raising the ctxc
|
||||
# instead of just returning up to the caller inside
|
||||
# `Context.return()` which would results in a diff instance of
|
||||
# the same remote error bubbling out above vs what was
|
||||
# already unpacked and set inside `Context.
|
||||
assert (
|
||||
peer_ctx._remote_error.msgdata
|
||||
==
|
||||
ctxc.msgdata
|
||||
)
|
||||
# ^-XXX-^ notice the data is of course the exact same.. so
|
||||
# the above larger assert makes sense to also always be true!
|
||||
|
||||
# XXX NO, bc new one always created for property accesss
|
||||
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
|
||||
# XXX YES XXX, bc should be exact same msg instances
|
||||
assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg
|
||||
|
||||
# XXX NO XXX, bc new one always created for property accesss
|
||||
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
|
||||
|
||||
# the peer ctx is the canceller even though it's canceller
|
||||
# is the "canceller" XD
|
||||
assert peer_name in peer_ctx.canceller
|
||||
|
||||
assert "canceller" in ctxerr.canceller
|
||||
assert "canceller" in ctxc.canceller
|
||||
|
||||
# caller peer should not be the cancel requester
|
||||
assert not ctx.cancel_called
|
||||
|
@ -283,12 +320,13 @@ async def stream_from_peer(
|
|||
|
||||
# TODO / NOTE `.canceller` won't have been set yet
|
||||
# here because that machinery is inside
|
||||
# `.open_context().__aexit__()` BUT, if we had
|
||||
# `Portal.open_context().__aexit__()` BUT, if we had
|
||||
# a way to know immediately (from the last
|
||||
# checkpoint) that cancellation was due to
|
||||
# a remote, we COULD assert this here..see,
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
#
|
||||
# await tractor.pause()
|
||||
# assert 'canceller' in ctx.canceller
|
||||
|
||||
# root/parent actor task should NEVER HAVE cancelled us!
|
||||
|
@ -392,12 +430,13 @@ def test_peer_canceller(
|
|||
try:
|
||||
async with (
|
||||
sleeper.open_context(
|
||||
sleep_forever,
|
||||
open_stream_then_sleep_forever,
|
||||
expect_ctxc=True,
|
||||
) as (sleeper_ctx, sent),
|
||||
|
||||
just_caller.open_context(
|
||||
stream_from_peer,
|
||||
debug_mode=debug_mode,
|
||||
) as (caller_ctx, sent),
|
||||
|
||||
canceller.open_context(
|
||||
|
@ -423,10 +462,11 @@ def test_peer_canceller(
|
|||
|
||||
# should always raise since this root task does
|
||||
# not request the sleeper cancellation ;)
|
||||
except ContextCancelled as ctxerr:
|
||||
except ContextCancelled as _ctxc:
|
||||
ctxc = _ctxc
|
||||
print(
|
||||
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
|
||||
f'{ctxerr}\n'
|
||||
f'{ctxc}\n'
|
||||
)
|
||||
|
||||
# canceller and caller peers should not
|
||||
|
@ -437,7 +477,7 @@ def test_peer_canceller(
|
|||
# we were not the actor, our peer was
|
||||
assert not sleeper_ctx.cancel_acked
|
||||
|
||||
assert ctxerr.canceller[0] == 'canceller'
|
||||
assert ctxc.canceller[0] == 'canceller'
|
||||
|
||||
# XXX NOTE XXX: since THIS `ContextCancelled`
|
||||
# HAS NOT YET bubbled up to the
|
||||
|
@ -448,7 +488,7 @@ def test_peer_canceller(
|
|||
|
||||
# CASE_1: error-during-ctxc-handling,
|
||||
if error_during_ctxerr_handling:
|
||||
raise RuntimeError('Simulated error during teardown')
|
||||
raise RuntimeError('Simulated RTE re-raise during ctxc handling')
|
||||
|
||||
# CASE_2: standard teardown inside in `.open_context()` block
|
||||
raise
|
||||
|
@ -513,6 +553,9 @@ def test_peer_canceller(
|
|||
# should be cancelled by US.
|
||||
#
|
||||
if error_during_ctxerr_handling:
|
||||
print(f'loc_err: {_loc_err}\n')
|
||||
assert isinstance(loc_err, RuntimeError)
|
||||
|
||||
# since we do a rte reraise above, the
|
||||
# `.open_context()` error handling should have
|
||||
# raised a local rte, thus the internal
|
||||
|
@ -521,9 +564,6 @@ def test_peer_canceller(
|
|||
# a `trio.Cancelled` due to a local
|
||||
# `._scope.cancel()` call.
|
||||
assert not sleeper_ctx._scope.cancelled_caught
|
||||
|
||||
assert isinstance(loc_err, RuntimeError)
|
||||
print(f'_loc_err: {_loc_err}\n')
|
||||
# assert sleeper_ctx._local_error is _loc_err
|
||||
# assert sleeper_ctx._local_error is _loc_err
|
||||
assert not (
|
||||
|
@ -560,10 +600,13 @@ def test_peer_canceller(
|
|||
|
||||
else: # the other 2 ctxs
|
||||
assert (
|
||||
isinstance(re, ContextCancelled)
|
||||
and (
|
||||
re.canceller
|
||||
==
|
||||
canceller.channel.uid
|
||||
)
|
||||
)
|
||||
|
||||
# since the sleeper errors while handling a
|
||||
# peer-cancelled (by ctxc) scenario, we expect
|
||||
|
@ -811,8 +854,7 @@ async def serve_subactors(
|
|||
async with open_nursery() as an:
|
||||
|
||||
# sanity
|
||||
if debug_mode:
|
||||
assert tractor._state.debug_mode()
|
||||
assert tractor._state.debug_mode() == debug_mode
|
||||
|
||||
await ctx.started(peer_name)
|
||||
async with ctx.open_stream() as ipc:
|
||||
|
@ -1091,7 +1133,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
|||
'-> root checking `client_ctx.result()`,\n'
|
||||
f'-> checking that sub-spawn {peer_name} is down\n'
|
||||
)
|
||||
# else:
|
||||
|
||||
try:
|
||||
res = await client_ctx.result(hide_tb=False)
|
||||
|
|
|
@ -2,7 +2,9 @@
|
|||
Spawning basics
|
||||
|
||||
"""
|
||||
from typing import Optional
|
||||
from typing import (
|
||||
Any,
|
||||
)
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
@ -25,13 +27,11 @@ async def spawn(
|
|||
async with tractor.open_root_actor(
|
||||
arbiter_addr=reg_addr,
|
||||
):
|
||||
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter == is_arbiter
|
||||
data = data_to_pass_down
|
||||
|
||||
if actor.is_arbiter:
|
||||
|
||||
async with tractor.open_nursery() as nursery:
|
||||
|
||||
# forks here
|
||||
|
@ -95,7 +95,9 @@ async def test_movie_theatre_convo(start_method):
|
|||
await portal.cancel_actor()
|
||||
|
||||
|
||||
async def cellar_door(return_value: Optional[str]):
|
||||
async def cellar_door(
|
||||
return_value: str|None,
|
||||
):
|
||||
return return_value
|
||||
|
||||
|
||||
|
@ -105,16 +107,18 @@ async def cellar_door(return_value: Optional[str]):
|
|||
)
|
||||
@tractor_test
|
||||
async def test_most_beautiful_word(
|
||||
start_method,
|
||||
return_value
|
||||
start_method: str,
|
||||
return_value: Any,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
The main ``tractor`` routine.
|
||||
|
||||
'''
|
||||
with trio.fail_after(1):
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as n:
|
||||
portal = await n.run_in_actor(
|
||||
cellar_door,
|
||||
return_value=return_value,
|
||||
|
|
|
@ -42,6 +42,7 @@ from ._supervise import (
|
|||
from ._state import (
|
||||
current_actor as current_actor,
|
||||
is_root_process as is_root_process,
|
||||
current_ipc_ctx as current_ipc_ctx,
|
||||
)
|
||||
from ._exceptions import (
|
||||
ContextCancelled as ContextCancelled,
|
||||
|
|
|
@ -41,6 +41,7 @@ from typing import (
|
|||
Callable,
|
||||
Mapping,
|
||||
Type,
|
||||
TypeAlias,
|
||||
TYPE_CHECKING,
|
||||
Union,
|
||||
)
|
||||
|
@ -94,7 +95,7 @@ if TYPE_CHECKING:
|
|||
from ._portal import Portal
|
||||
from ._runtime import Actor
|
||||
from ._ipc import MsgTransport
|
||||
from .devx._code import (
|
||||
from .devx._frame_stack import (
|
||||
CallerInfo,
|
||||
)
|
||||
|
||||
|
@ -155,6 +156,41 @@ class Context:
|
|||
# payload receiver
|
||||
_pld_rx: msgops.PldRx
|
||||
|
||||
@property
|
||||
def pld_rx(self) -> msgops.PldRx:
|
||||
'''
|
||||
The current `tractor.Context`'s msg-payload-receiver.
|
||||
|
||||
A payload receiver is the IPC-msg processing sub-sys which
|
||||
filters inter-actor-task communicated payload data, i.e. the
|
||||
`PayloadMsg.pld: PayloadT` field value, AFTER its container
|
||||
shuttlle msg (eg. `Started`/`Yield`/`Return) has been
|
||||
delivered up from `tractor`'s transport layer but BEFORE the
|
||||
data is yielded to `tractor` application code.
|
||||
|
||||
The "IPC-primitive API" is normally one of a `Context` (this)` or a `MsgStream`
|
||||
or some higher level API using one of them.
|
||||
|
||||
For ex. `pld_data: PayloadT = MsgStream.receive()` implicitly
|
||||
calls into the stream's parent `Context.pld_rx.recv_pld().` to
|
||||
receive the latest `PayloadMsg.pld` value.
|
||||
|
||||
Modification of the current payload spec via `limit_plds()`
|
||||
allows a `tractor` application to contextually filter IPC
|
||||
payload content with a type specification as supported by the
|
||||
interchange backend.
|
||||
|
||||
- for `msgspec` see <PUTLINKHERE>.
|
||||
|
||||
Note that the `PldRx` itself is a per-`Context` instance that
|
||||
normally only changes when some (sub-)task, on a given "side"
|
||||
of the IPC ctx (either a "child"-side RPC or inside
|
||||
a "parent"-side `Portal.open_context()` block), modifies it
|
||||
using the `.msg._ops.limit_plds()` API.
|
||||
|
||||
'''
|
||||
return self._pld_rx
|
||||
|
||||
# full "namespace-path" to target RPC function
|
||||
_nsf: NamespacePath
|
||||
|
||||
|
@ -231,6 +267,8 @@ class Context:
|
|||
|
||||
# init and streaming state
|
||||
_started_called: bool = False
|
||||
_started_msg: MsgType|None = None
|
||||
_started_pld: Any = None
|
||||
_stream_opened: bool = False
|
||||
_stream: MsgStream|None = None
|
||||
|
||||
|
@ -623,7 +661,7 @@ class Context:
|
|||
log.runtime(
|
||||
'Setting remote error for ctx\n\n'
|
||||
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||
f'=> {self.side!r}\n\n'
|
||||
f'=> {self.side!r}: {self._actor.uid}\n\n'
|
||||
f'{error}'
|
||||
)
|
||||
self._remote_error: BaseException = error
|
||||
|
@ -678,7 +716,7 @@ class Context:
|
|||
log.error(
|
||||
f'Remote context error:\n\n'
|
||||
# f'{pformat(self)}\n'
|
||||
f'{error}\n'
|
||||
f'{error}'
|
||||
)
|
||||
|
||||
if self._canceller is None:
|
||||
|
@ -724,8 +762,10 @@ class Context:
|
|||
)
|
||||
else:
|
||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
|
||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
|
||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
||||
if (
|
||||
cs
|
||||
and
|
||||
|
@ -805,6 +845,7 @@ class Context:
|
|||
# f'{ci.api_nsp}()\n'
|
||||
# )
|
||||
|
||||
# TODO: use `.dev._frame_stack` scanning to find caller!
|
||||
return 'Portal.open_context()'
|
||||
|
||||
async def cancel(
|
||||
|
@ -1304,17 +1345,6 @@ class Context:
|
|||
ctx=self,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
for msg in drained_msgs:
|
||||
|
||||
# TODO: mask this by default..
|
||||
if isinstance(msg, Return):
|
||||
# from .devx import pause
|
||||
# await pause()
|
||||
# raise InternalError(
|
||||
log.warning(
|
||||
'Final `return` msg should never be drained !?!?\n\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
drained_status: str = (
|
||||
'Ctx drained to final outcome msg\n\n'
|
||||
|
@ -1435,6 +1465,10 @@ class Context:
|
|||
self._result
|
||||
)
|
||||
|
||||
@property
|
||||
def has_outcome(self) -> bool:
|
||||
return bool(self.maybe_error) or self._final_result_is_set()
|
||||
|
||||
# @property
|
||||
def repr_outcome(
|
||||
self,
|
||||
|
@ -1637,8 +1671,6 @@ class Context:
|
|||
)
|
||||
|
||||
if rt_started != started_msg:
|
||||
# TODO: break these methods out from the struct subtype?
|
||||
|
||||
# TODO: make that one a mod func too..
|
||||
diff = pretty_struct.Struct.__sub__(
|
||||
rt_started,
|
||||
|
@ -1674,6 +1706,8 @@ class Context:
|
|||
) from verr
|
||||
|
||||
self._started_called = True
|
||||
self._started_msg = started_msg
|
||||
self._started_pld = value
|
||||
|
||||
async def _drain_overflows(
|
||||
self,
|
||||
|
@ -1961,6 +1995,7 @@ async def open_context_from_portal(
|
|||
portal: Portal,
|
||||
func: Callable,
|
||||
|
||||
pld_spec: TypeAlias|None = None,
|
||||
allow_overruns: bool = False,
|
||||
|
||||
# TODO: if we set this the wrapping `@acm` body will
|
||||
|
@ -2026,7 +2061,7 @@ async def open_context_from_portal(
|
|||
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
||||
# with "self" since the local feeder mem-chan processing
|
||||
# is not built for it.
|
||||
if portal.channel.uid == portal.actor.uid:
|
||||
if (uid := portal.channel.uid) == portal.actor.uid:
|
||||
raise RuntimeError(
|
||||
'** !! Invalid Operation !! **\n'
|
||||
'Can not open an IPC ctx with the local actor!\n'
|
||||
|
@ -2054,6 +2089,21 @@ async def open_context_from_portal(
|
|||
assert ctx._caller_info
|
||||
_ctxvar_Context.set(ctx)
|
||||
|
||||
# placeholder for any exception raised in the runtime
|
||||
# or by user tasks which cause this context's closure.
|
||||
scope_err: BaseException|None = None
|
||||
ctxc_from_callee: ContextCancelled|None = None
|
||||
try:
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
msgops.maybe_limit_plds(
|
||||
ctx=ctx,
|
||||
spec=pld_spec,
|
||||
) as maybe_msgdec,
|
||||
):
|
||||
if maybe_msgdec:
|
||||
assert maybe_msgdec.pld_spec == pld_spec
|
||||
|
||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||
# `Started`-msg any cancellation triggered
|
||||
# in `._maybe_cancel_and_set_remote_error()` will
|
||||
|
@ -2061,25 +2111,23 @@ async def open_context_from_portal(
|
|||
# -> it's expected that if there is an error in this phase of
|
||||
# the dialog, the `Error` msg should be raised from the `msg`
|
||||
# handling block below.
|
||||
first: Any = await ctx._pld_rx.recv_pld(
|
||||
ctx=ctx,
|
||||
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
|
||||
ipc=ctx,
|
||||
expect_msg=Started,
|
||||
passthrough_non_pld_msgs=False,
|
||||
)
|
||||
|
||||
# from .devx import pause
|
||||
# await pause()
|
||||
ctx._started_called: bool = True
|
||||
ctx._started_msg: bool = started_msg
|
||||
ctx._started_pld: bool = first
|
||||
|
||||
uid: tuple = portal.channel.uid
|
||||
cid: str = ctx.cid
|
||||
|
||||
# placeholder for any exception raised in the runtime
|
||||
# or by user tasks which cause this context's closure.
|
||||
scope_err: BaseException|None = None
|
||||
ctxc_from_callee: ContextCancelled|None = None
|
||||
try:
|
||||
async with trio.open_nursery() as nurse:
|
||||
|
||||
# NOTE: used to start overrun queuing tasks
|
||||
ctx._scope_nursery: trio.Nursery = nurse
|
||||
ctx._scope: trio.CancelScope = nurse.cancel_scope
|
||||
# NOTE: this in an implicit runtime nursery used to,
|
||||
# - start overrun queuing tasks when as well as
|
||||
# for cancellation of the scope opened by the user.
|
||||
ctx._scope_nursery: trio.Nursery = tn
|
||||
ctx._scope: trio.CancelScope = tn.cancel_scope
|
||||
|
||||
# deliver context instance and .started() msg value
|
||||
# in enter tuple.
|
||||
|
@ -2126,13 +2174,13 @@ async def open_context_from_portal(
|
|||
|
||||
# when in allow_overruns mode there may be
|
||||
# lingering overflow sender tasks remaining?
|
||||
if nurse.child_tasks:
|
||||
if tn.child_tasks:
|
||||
# XXX: ensure we are in overrun state
|
||||
# with ``._allow_overruns=True`` bc otherwise
|
||||
# there should be no tasks in this nursery!
|
||||
if (
|
||||
not ctx._allow_overruns
|
||||
or len(nurse.child_tasks) > 1
|
||||
or len(tn.child_tasks) > 1
|
||||
):
|
||||
raise InternalError(
|
||||
'Context has sub-tasks but is '
|
||||
|
@ -2304,8 +2352,8 @@ async def open_context_from_portal(
|
|||
):
|
||||
log.warning(
|
||||
'IPC connection for context is broken?\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
f'task: {ctx.cid}\n'
|
||||
f'actor: {uid}'
|
||||
)
|
||||
|
||||
raise # duh
|
||||
|
@ -2455,9 +2503,8 @@ async def open_context_from_portal(
|
|||
and ctx.cancel_acked
|
||||
):
|
||||
log.cancel(
|
||||
'Context cancelled by {ctx.side!r}-side task\n'
|
||||
f'Context cancelled by {ctx.side!r}-side task\n'
|
||||
f'|_{ctx._task}\n\n'
|
||||
|
||||
f'{repr(scope_err)}\n'
|
||||
)
|
||||
|
||||
|
@ -2485,7 +2532,7 @@ async def open_context_from_portal(
|
|||
f'cid: {ctx.cid}\n'
|
||||
)
|
||||
portal.actor._contexts.pop(
|
||||
(uid, cid),
|
||||
(uid, ctx.cid),
|
||||
None,
|
||||
)
|
||||
|
||||
|
@ -2513,11 +2560,12 @@ def mk_context(
|
|||
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
||||
|
||||
# TODO: only scan caller-info if log level so high!
|
||||
from .devx._code import find_caller_info
|
||||
from .devx._frame_stack import find_caller_info
|
||||
caller_info: CallerInfo|None = find_caller_info()
|
||||
|
||||
# TODO: when/how do we apply `.limit_plds()` from here?
|
||||
pld_rx: msgops.PldRx = msgops.current_pldrx()
|
||||
pld_rx = msgops.PldRx(
|
||||
_pld_dec=msgops._def_any_pldec,
|
||||
)
|
||||
|
||||
ctx = Context(
|
||||
chan=chan,
|
||||
|
@ -2531,13 +2579,16 @@ def mk_context(
|
|||
_caller_info=caller_info,
|
||||
**kwargs,
|
||||
)
|
||||
pld_rx._ctx = ctx
|
||||
ctx._result = Unresolved
|
||||
return ctx
|
||||
|
||||
|
||||
# TODO: use the new type-parameters to annotate this in 3.13?
|
||||
# -[ ] https://peps.python.org/pep-0718/#unknown-types
|
||||
def context(func: Callable) -> Callable:
|
||||
def context(
|
||||
func: Callable,
|
||||
) -> Callable:
|
||||
'''
|
||||
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
||||
child-`trio.Task`, IPC endpoint otherwise known more
|
||||
|
|
|
@ -716,4 +716,5 @@ async def _connect_chan(
|
|||
chan = Channel((host, port))
|
||||
await chan.connect()
|
||||
yield chan
|
||||
with trio.CancelScope(shield=True):
|
||||
await chan.aclose()
|
||||
|
|
|
@ -47,6 +47,7 @@ from ._ipc import Channel
|
|||
from .log import get_logger
|
||||
from .msg import (
|
||||
# Error,
|
||||
PayloadMsg,
|
||||
NamespacePath,
|
||||
Return,
|
||||
)
|
||||
|
@ -98,7 +99,8 @@ class Portal:
|
|||
|
||||
self.chan = channel
|
||||
# during the portal's lifetime
|
||||
self._final_result: Any|None = None
|
||||
self._final_result_pld: Any|None = None
|
||||
self._final_result_msg: PayloadMsg|None = None
|
||||
|
||||
# When set to a ``Context`` (when _submit_for_result is called)
|
||||
# it is expected that ``result()`` will be awaited at some
|
||||
|
@ -132,7 +134,7 @@ class Portal:
|
|||
'A pending main result has already been submitted'
|
||||
)
|
||||
|
||||
self._expect_result_ctx = await self.actor.start_remote_task(
|
||||
self._expect_result_ctx: Context = await self.actor.start_remote_task(
|
||||
self.channel,
|
||||
nsf=NamespacePath(f'{ns}:{func}'),
|
||||
kwargs=kwargs,
|
||||
|
@ -163,13 +165,22 @@ class Portal:
|
|||
# expecting a "main" result
|
||||
assert self._expect_result_ctx
|
||||
|
||||
if self._final_result is None:
|
||||
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
|
||||
ctx=self._expect_result_ctx,
|
||||
if self._final_result_msg is None:
|
||||
try:
|
||||
(
|
||||
self._final_result_msg,
|
||||
self._final_result_pld,
|
||||
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
|
||||
ipc=self._expect_result_ctx,
|
||||
expect_msg=Return,
|
||||
)
|
||||
except BaseException as err:
|
||||
# TODO: wrap this into `@api_frame` optionally with
|
||||
# some kinda filtering mechanism like log levels?
|
||||
__tracebackhide__: bool = False
|
||||
raise err
|
||||
|
||||
return self._final_result
|
||||
return self._final_result_pld
|
||||
|
||||
async def _cancel_streams(self):
|
||||
# terminate all locally running async generator
|
||||
|
@ -301,7 +312,7 @@ class Portal:
|
|||
portal=self,
|
||||
)
|
||||
return await ctx._pld_rx.recv_pld(
|
||||
ctx=ctx,
|
||||
ipc=ctx,
|
||||
expect_msg=Return,
|
||||
)
|
||||
|
||||
|
@ -320,6 +331,8 @@ class Portal:
|
|||
remote rpc task or a local async generator instance.
|
||||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
|
||||
if isinstance(func, str):
|
||||
warnings.warn(
|
||||
"`Portal.run(namespace: str, funcname: str)` is now"
|
||||
|
@ -353,7 +366,7 @@ class Portal:
|
|||
portal=self,
|
||||
)
|
||||
return await ctx._pld_rx.recv_pld(
|
||||
ctx=ctx,
|
||||
ipc=ctx,
|
||||
expect_msg=Return,
|
||||
)
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
Root actor runtime ignition(s).
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from functools import partial
|
||||
import importlib
|
||||
import logging
|
||||
|
@ -60,7 +60,7 @@ _default_lo_addrs: list[tuple[str, int]] = [(
|
|||
logger = log.get_logger('tractor')
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def open_root_actor(
|
||||
|
||||
*,
|
||||
|
@ -96,6 +96,7 @@ async def open_root_actor(
|
|||
Runtime init entry point for ``tractor``.
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
# TODO: stick this in a `@cm` defined in `devx._debug`?
|
||||
#
|
||||
# Override the global debugger hook to make it play nice with
|
||||
|
@ -358,7 +359,11 @@ async def open_root_actor(
|
|||
BaseExceptionGroup,
|
||||
) as err:
|
||||
|
||||
entered: bool = await _debug._maybe_enter_pm(err)
|
||||
import inspect
|
||||
entered: bool = await _debug._maybe_enter_pm(
|
||||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
)
|
||||
|
||||
if (
|
||||
not entered
|
||||
|
|
|
@ -70,7 +70,6 @@ from .msg import (
|
|||
from tractor.msg.types import (
|
||||
CancelAck,
|
||||
Error,
|
||||
Msg,
|
||||
MsgType,
|
||||
Return,
|
||||
Start,
|
||||
|
@ -250,10 +249,17 @@ async def _errors_relayed_via_ipc(
|
|||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
# NOTE: we normally always hide this frame in call-stack tracebacks
|
||||
# if the crash originated from an RPC task (since normally the
|
||||
# user is only going to care about their own code not this
|
||||
# internal runtime frame) and we DID NOT
|
||||
# fail due to an IPC transport error!
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# TODO: a debug nursery when in debug mode!
|
||||
# async with maybe_open_debugger_nursery() as debug_tn:
|
||||
# => see matching comment in side `._debug._pause()`
|
||||
rpc_err: BaseException|None = None
|
||||
try:
|
||||
yield # run RPC invoke body
|
||||
|
||||
|
@ -264,16 +270,7 @@ async def _errors_relayed_via_ipc(
|
|||
BaseExceptionGroup,
|
||||
KeyboardInterrupt,
|
||||
) as err:
|
||||
|
||||
# NOTE: always hide this frame from debug REPL call stack
|
||||
# if the crash originated from an RPC task and we DID NOT
|
||||
# fail due to an IPC transport error!
|
||||
if (
|
||||
is_rpc
|
||||
and
|
||||
chan.connected()
|
||||
):
|
||||
__tracebackhide__: bool = hide_tb
|
||||
rpc_err = err
|
||||
|
||||
# TODO: maybe we'll want different "levels" of debugging
|
||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||
|
@ -318,11 +315,19 @@ async def _errors_relayed_via_ipc(
|
|||
api_frame=inspect.currentframe(),
|
||||
)
|
||||
if not entered_debug:
|
||||
# if we prolly should have entered the REPL but
|
||||
# didn't, maybe there was an internal error in
|
||||
# the above code and we do want to show this
|
||||
# frame!
|
||||
if _state.debug_mode():
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
log.exception(
|
||||
'RPC task crashed\n'
|
||||
f'|_{ctx}'
|
||||
)
|
||||
|
||||
|
||||
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||
if is_rpc:
|
||||
|
||||
|
@ -355,6 +360,20 @@ async def _errors_relayed_via_ipc(
|
|||
# `Actor._service_n`, we add "handles" to each such that
|
||||
# they can be individually ccancelled.
|
||||
finally:
|
||||
|
||||
# if the error is not from user code and instead a failure
|
||||
# of a runtime RPC or transport failure we do prolly want to
|
||||
# show this frame
|
||||
if (
|
||||
rpc_err
|
||||
and (
|
||||
not is_rpc
|
||||
or
|
||||
not chan.connected()
|
||||
)
|
||||
):
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
try:
|
||||
ctx: Context
|
||||
func: Callable
|
||||
|
@ -444,9 +463,10 @@ async def _invoke(
|
|||
# open the stream with this option.
|
||||
# allow_overruns=True,
|
||||
)
|
||||
context: bool = False
|
||||
context_ep_func: bool = False
|
||||
|
||||
assert not _state._ctxvar_Context.get()
|
||||
# set the current IPC ctx var for this RPC task
|
||||
_state._ctxvar_Context.set(ctx)
|
||||
|
||||
# TODO: deprecate this style..
|
||||
if getattr(func, '_tractor_stream_function', False):
|
||||
|
@ -475,7 +495,7 @@ async def _invoke(
|
|||
# handle decorated ``@tractor.context`` async function
|
||||
elif getattr(func, '_tractor_context_function', False):
|
||||
kwargs['ctx'] = ctx
|
||||
context = True
|
||||
context_ep_func = True
|
||||
|
||||
# errors raised inside this block are propgated back to caller
|
||||
async with _errors_relayed_via_ipc(
|
||||
|
@ -501,7 +521,7 @@ async def _invoke(
|
|||
raise
|
||||
|
||||
# TODO: impl all these cases in terms of the `Context` one!
|
||||
if not context:
|
||||
if not context_ep_func:
|
||||
await _invoke_non_context(
|
||||
actor,
|
||||
cancel_scope,
|
||||
|
@ -571,7 +591,6 @@ async def _invoke(
|
|||
async with trio.open_nursery() as tn:
|
||||
ctx._scope_nursery = tn
|
||||
ctx._scope = tn.cancel_scope
|
||||
_state._ctxvar_Context.set(ctx)
|
||||
task_status.started(ctx)
|
||||
|
||||
# TODO: should would be nice to have our
|
||||
|
@ -831,7 +850,7 @@ async def process_messages(
|
|||
(as utilized inside `Portal.cancel_actor()` ).
|
||||
|
||||
'''
|
||||
assert actor._service_n # state sanity
|
||||
assert actor._service_n # runtime state sanity
|
||||
|
||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||
# should use it?
|
||||
|
@ -844,7 +863,7 @@ async def process_messages(
|
|||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
||||
nursery_cancelled_before_task: bool = False
|
||||
msg: Msg|None = None
|
||||
msg: MsgType|None = None
|
||||
try:
|
||||
# NOTE: this internal scope allows for keeping this
|
||||
# message loop running despite the current task having
|
||||
|
|
|
@ -644,7 +644,7 @@ class Actor:
|
|||
peers_str: str = ''
|
||||
for uid, chans in self._peers.items():
|
||||
peers_str += (
|
||||
f'|_ uid: {uid}\n'
|
||||
f'uid: {uid}\n'
|
||||
)
|
||||
for i, chan in enumerate(chans):
|
||||
peers_str += (
|
||||
|
@ -678,10 +678,12 @@ class Actor:
|
|||
# XXX => YES IT DOES, when i was testing ctl-c
|
||||
# from broken debug TTY locking due to
|
||||
# msg-spec races on application using RunVar...
|
||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||
if (
|
||||
pdb_user_uid
|
||||
and local_nursery
|
||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||
and
|
||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||
and
|
||||
local_nursery
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(
|
||||
tuple(pdb_user_uid)
|
||||
|
@ -1169,13 +1171,17 @@ class Actor:
|
|||
|
||||
# kill any debugger request task to avoid deadlock
|
||||
# with the root actor in this tree
|
||||
dbcs = _debug.DebugStatus.req_cs
|
||||
if dbcs is not None:
|
||||
debug_req = _debug.DebugStatus
|
||||
lock_req_ctx: Context = debug_req.req_ctx
|
||||
if lock_req_ctx is not None:
|
||||
msg += (
|
||||
'-> Cancelling active debugger request..\n'
|
||||
f'|_{_debug.Lock.pformat()}'
|
||||
f'|_{_debug.Lock.repr()}\n\n'
|
||||
f'|_{lock_req_ctx}\n\n'
|
||||
)
|
||||
dbcs.cancel()
|
||||
# lock_req_ctx._scope.cancel()
|
||||
# TODO: wrap this in a method-API..
|
||||
debug_req.req_cs.cancel()
|
||||
|
||||
# self-cancel **all** ongoing RPC tasks
|
||||
await self.cancel_rpc_tasks(
|
||||
|
@ -1375,15 +1381,17 @@ class Actor:
|
|||
"IPC channel's "
|
||||
)
|
||||
rent_chan_repr: str = (
|
||||
f'|_{parent_chan}'
|
||||
f' |_{parent_chan}\n\n'
|
||||
if parent_chan
|
||||
else ''
|
||||
)
|
||||
log.cancel(
|
||||
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
|
||||
f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
|
||||
f' {rent_chan_repr}\n'
|
||||
# f'{self}\n'
|
||||
f'Cancelling {descr} RPC tasks\n\n'
|
||||
f'<= canceller: {req_uid}\n'
|
||||
f'{rent_chan_repr}'
|
||||
f'=> cancellee: {self.uid}\n'
|
||||
f' |_{self}.cancel_rpc_tasks()\n'
|
||||
f' |_tasks: {len(tasks)}\n'
|
||||
# f'{tasks_str}'
|
||||
)
|
||||
for (
|
||||
|
@ -1413,7 +1421,7 @@ class Actor:
|
|||
if tasks:
|
||||
log.cancel(
|
||||
'Waiting for remaining rpc tasks to complete\n'
|
||||
f'|_{tasks}'
|
||||
f'|_{tasks_str}'
|
||||
)
|
||||
await self._ongoing_rpc_tasks.wait()
|
||||
|
||||
|
@ -1466,7 +1474,10 @@ class Actor:
|
|||
assert self._parent_chan, "No parent channel for this actor?"
|
||||
return Portal(self._parent_chan)
|
||||
|
||||
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
|
||||
def get_chans(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
) -> list[Channel]:
|
||||
'''
|
||||
Return all IPC channels to the actor with provided `uid`.
|
||||
|
||||
|
@ -1626,7 +1637,9 @@ async def async_main(
|
|||
# tranport address bind errors - normally it's
|
||||
# something silly like the wrong socket-address
|
||||
# passed via a config or CLI Bo
|
||||
entered_debug = await _debug._maybe_enter_pm(oserr)
|
||||
entered_debug = await _debug._maybe_enter_pm(
|
||||
oserr,
|
||||
)
|
||||
if entered_debug:
|
||||
log.runtime('Exited debug REPL..')
|
||||
raise
|
||||
|
|
|
@ -142,7 +142,9 @@ async def exhaust_portal(
|
|||
'''
|
||||
__tracebackhide__ = True
|
||||
try:
|
||||
log.debug(f"Waiting on final result from {actor.uid}")
|
||||
log.debug(
|
||||
f'Waiting on final result from {actor.uid}'
|
||||
)
|
||||
|
||||
# XXX: streams should never be reaped here since they should
|
||||
# always be established and shutdown using a context manager api
|
||||
|
@ -195,7 +197,10 @@ async def cancel_on_completion(
|
|||
# if this call errors we store the exception for later
|
||||
# in ``errors`` which will be reraised inside
|
||||
# an exception group and we still send out a cancel request
|
||||
result: Any|Exception = await exhaust_portal(portal, actor)
|
||||
result: Any|Exception = await exhaust_portal(
|
||||
portal,
|
||||
actor,
|
||||
)
|
||||
if isinstance(result, Exception):
|
||||
errors[actor.uid]: Exception = result
|
||||
log.cancel(
|
||||
|
@ -503,14 +508,6 @@ async def trio_proc(
|
|||
)
|
||||
)
|
||||
|
||||
# await chan.send({
|
||||
# '_parent_main_data': subactor._parent_main_data,
|
||||
# 'enable_modules': subactor.enable_modules,
|
||||
# 'reg_addrs': subactor.reg_addrs,
|
||||
# 'bind_addrs': bind_addrs,
|
||||
# '_runtime_vars': _runtime_vars,
|
||||
# })
|
||||
|
||||
# track subactor in current nursery
|
||||
curr_actor: Actor = current_actor()
|
||||
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||
|
@ -554,8 +551,8 @@ async def trio_proc(
|
|||
# killing the process too early.
|
||||
if proc:
|
||||
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
||||
with trio.CancelScope(shield=True):
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
# don't clobber an ongoing pdb
|
||||
if cancelled_during_spawn:
|
||||
# Try again to avoid TTY clobbering.
|
||||
|
|
|
@ -124,9 +124,15 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
|
|||
)
|
||||
|
||||
|
||||
def current_ipc_ctx() -> Context:
|
||||
def current_ipc_ctx(
|
||||
error_on_not_set: bool = False,
|
||||
) -> Context|None:
|
||||
ctx: Context = _ctxvar_Context.get()
|
||||
if not ctx:
|
||||
|
||||
if (
|
||||
not ctx
|
||||
and error_on_not_set
|
||||
):
|
||||
from ._exceptions import InternalError
|
||||
raise InternalError(
|
||||
'No IPC context has been allocated for this task yet?\n'
|
||||
|
|
|
@ -52,6 +52,7 @@ from tractor.msg import (
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from ._context import Context
|
||||
from ._ipc import Channel
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -65,10 +66,10 @@ log = get_logger(__name__)
|
|||
class MsgStream(trio.abc.Channel):
|
||||
'''
|
||||
A bidirectional message stream for receiving logically sequenced
|
||||
values over an inter-actor IPC ``Channel``.
|
||||
values over an inter-actor IPC `Channel`.
|
||||
|
||||
This is the type returned to a local task which entered either
|
||||
``Portal.open_stream_from()`` or ``Context.open_stream()``.
|
||||
`Portal.open_stream_from()` or `Context.open_stream()`.
|
||||
|
||||
Termination rules:
|
||||
|
||||
|
@ -95,6 +96,22 @@ class MsgStream(trio.abc.Channel):
|
|||
self._eoc: bool|trio.EndOfChannel = False
|
||||
self._closed: bool|trio.ClosedResourceError = False
|
||||
|
||||
@property
|
||||
def ctx(self) -> Context:
|
||||
'''
|
||||
This stream's IPC `Context` ref.
|
||||
|
||||
'''
|
||||
return self._ctx
|
||||
|
||||
@property
|
||||
def chan(self) -> Channel:
|
||||
'''
|
||||
Ref to the containing `Context`'s transport `Channel`.
|
||||
|
||||
'''
|
||||
return self._ctx.chan
|
||||
|
||||
# TODO: could we make this a direct method bind to `PldRx`?
|
||||
# -> receive_nowait = PldRx.recv_pld
|
||||
# |_ means latter would have to accept `MsgStream`-as-`self`?
|
||||
|
@ -109,7 +126,7 @@ class MsgStream(trio.abc.Channel):
|
|||
):
|
||||
ctx: Context = self._ctx
|
||||
return ctx._pld_rx.recv_pld_nowait(
|
||||
ctx=ctx,
|
||||
ipc=self,
|
||||
expect_msg=expect_msg,
|
||||
)
|
||||
|
||||
|
@ -148,7 +165,7 @@ class MsgStream(trio.abc.Channel):
|
|||
try:
|
||||
|
||||
ctx: Context = self._ctx
|
||||
return await ctx._pld_rx.recv_pld(ctx=ctx)
|
||||
return await ctx._pld_rx.recv_pld(ipc=self)
|
||||
|
||||
# XXX: the stream terminates on either of:
|
||||
# - via `self._rx_chan.receive()` raising after manual closure
|
||||
|
|
|
@ -84,6 +84,7 @@ class ActorNursery:
|
|||
ria_nursery: trio.Nursery,
|
||||
da_nursery: trio.Nursery,
|
||||
errors: dict[tuple[str, str], BaseException],
|
||||
|
||||
) -> None:
|
||||
# self.supervisor = supervisor # TODO
|
||||
self._actor: Actor = actor
|
||||
|
@ -105,6 +106,7 @@ class ActorNursery:
|
|||
self._at_least_one_child_in_debug: bool = False
|
||||
self.errors = errors
|
||||
self.exited = trio.Event()
|
||||
self._scope_error: BaseException|None = None
|
||||
|
||||
# NOTE: when no explicit call is made to
|
||||
# `.open_root_actor()` by application code,
|
||||
|
@ -117,7 +119,9 @@ class ActorNursery:
|
|||
async def start_actor(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
*,
|
||||
|
||||
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
|
||||
rpc_module_paths: list[str]|None = None,
|
||||
enable_modules: list[str]|None = None,
|
||||
|
@ -125,6 +129,7 @@ class ActorNursery:
|
|||
nursery: trio.Nursery|None = None,
|
||||
debug_mode: bool|None = None,
|
||||
infect_asyncio: bool = False,
|
||||
|
||||
) -> Portal:
|
||||
'''
|
||||
Start a (daemon) actor: an process that has no designated
|
||||
|
@ -189,6 +194,13 @@ class ActorNursery:
|
|||
)
|
||||
)
|
||||
|
||||
# TODO: DEPRECATE THIS:
|
||||
# -[ ] impl instead as a hilevel wrapper on
|
||||
# top of a `@context` style invocation.
|
||||
# |_ dynamic @context decoration on child side
|
||||
# |_ implicit `Portal.open_context() as (ctx, first):`
|
||||
# and `return first` on parent side.
|
||||
# -[ ] use @api_frame on the wrapper
|
||||
async def run_in_actor(
|
||||
self,
|
||||
|
||||
|
@ -221,7 +233,7 @@ class ActorNursery:
|
|||
# use the explicit function name if not provided
|
||||
name = fn.__name__
|
||||
|
||||
portal = await self.start_actor(
|
||||
portal: Portal = await self.start_actor(
|
||||
name,
|
||||
enable_modules=[mod_path] + (
|
||||
enable_modules or rpc_module_paths or []
|
||||
|
@ -250,6 +262,7 @@ class ActorNursery:
|
|||
)
|
||||
return portal
|
||||
|
||||
# @api_frame
|
||||
async def cancel(
|
||||
self,
|
||||
hard_kill: bool = False,
|
||||
|
@ -347,8 +360,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
|
||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||
|
||||
# TODO: yay or nay?
|
||||
__tracebackhide__ = True
|
||||
# normally don't need to show user by default
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
outer_err: BaseException|None = None
|
||||
inner_err: BaseException|None = None
|
||||
|
||||
# the collection of errors retreived from spawned sub-actors
|
||||
errors: dict[tuple[str, str], BaseException] = {}
|
||||
|
@ -358,7 +374,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# handling errors that are generated by the inner nursery in
|
||||
# a supervisor strategy **before** blocking indefinitely to wait for
|
||||
# actors spawned in "daemon mode" (aka started using
|
||||
# ``ActorNursery.start_actor()``).
|
||||
# `ActorNursery.start_actor()`).
|
||||
|
||||
# errors from this daemon actor nursery bubble up to caller
|
||||
async with trio.open_nursery() as da_nursery:
|
||||
|
@ -393,7 +409,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
)
|
||||
an._join_procs.set()
|
||||
|
||||
except BaseException as inner_err:
|
||||
except BaseException as _inner_err:
|
||||
inner_err = _inner_err
|
||||
errors[actor.uid] = inner_err
|
||||
|
||||
# If we error in the root but the debugger is
|
||||
|
@ -471,8 +488,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
Exception,
|
||||
BaseExceptionGroup,
|
||||
trio.Cancelled
|
||||
) as _outer_err:
|
||||
outer_err = _outer_err
|
||||
|
||||
) as err:
|
||||
an._scope_error = outer_err or inner_err
|
||||
|
||||
# XXX: yet another guard before allowing the cancel
|
||||
# sequence in case a (single) child is in debug.
|
||||
|
@ -487,7 +506,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
if an._children:
|
||||
log.cancel(
|
||||
'Actor-nursery cancelling due error type:\n'
|
||||
f'{err}\n'
|
||||
f'{outer_err}\n'
|
||||
)
|
||||
with trio.CancelScope(shield=True):
|
||||
await an.cancel()
|
||||
|
@ -514,11 +533,19 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
else:
|
||||
raise list(errors.values())[0]
|
||||
|
||||
# show frame on any (likely) internal error
|
||||
if (
|
||||
not an.cancelled
|
||||
and an._scope_error
|
||||
):
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
# da_nursery scope end - nursery checkpoint
|
||||
# final exit
|
||||
|
||||
|
||||
@acm
|
||||
# @api_frame
|
||||
async def open_nursery(
|
||||
**kwargs,
|
||||
|
||||
|
@ -538,6 +565,7 @@ async def open_nursery(
|
|||
which cancellation scopes correspond to each spawned subactor set.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
implicit_runtime: bool = False
|
||||
actor: Actor = current_actor(err_on_no_runtime=False)
|
||||
an: ActorNursery|None = None
|
||||
|
@ -588,6 +616,14 @@ async def open_nursery(
|
|||
an.exited.set()
|
||||
|
||||
finally:
|
||||
# show frame on any internal runtime-scope error
|
||||
if (
|
||||
an
|
||||
and not an.cancelled
|
||||
and an._scope_error
|
||||
):
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
msg: str = (
|
||||
'Actor-nursery exited\n'
|
||||
f'|_{an}\n'
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -20,11 +20,8 @@ as it pertains to improving the grok-ability of our runtime!
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from functools import partial
|
||||
import inspect
|
||||
# import msgspec
|
||||
# from pprint import pformat
|
||||
import textwrap
|
||||
import traceback
|
||||
from types import (
|
||||
FrameType,
|
||||
FunctionType,
|
||||
|
@ -32,9 +29,8 @@ from types import (
|
|||
# CodeType,
|
||||
)
|
||||
from typing import (
|
||||
# Any,
|
||||
Any,
|
||||
Callable,
|
||||
# TYPE_CHECKING,
|
||||
Type,
|
||||
)
|
||||
|
||||
|
@ -42,6 +38,7 @@ from tractor.msg import (
|
|||
pretty_struct,
|
||||
NamespacePath,
|
||||
)
|
||||
import wrapt
|
||||
|
||||
|
||||
# TODO: yeah, i don't love this and we should prolly just
|
||||
|
@ -83,6 +80,31 @@ def get_class_from_frame(fr: FrameType) -> (
|
|||
return None
|
||||
|
||||
|
||||
def get_ns_and_func_from_frame(
|
||||
frame: FrameType,
|
||||
) -> Callable:
|
||||
'''
|
||||
Return the corresponding function object reference from
|
||||
a `FrameType`, and return it and it's parent namespace `dict`.
|
||||
|
||||
'''
|
||||
ns: dict[str, Any]
|
||||
|
||||
# for a method, go up a frame and lookup the name in locals()
|
||||
if '.' in (qualname := frame.f_code.co_qualname):
|
||||
cls_name, _, func_name = qualname.partition('.')
|
||||
ns = frame.f_back.f_locals[cls_name].__dict__
|
||||
|
||||
else:
|
||||
func_name: str = frame.f_code.co_name
|
||||
ns = frame.f_globals
|
||||
|
||||
return (
|
||||
ns,
|
||||
ns[func_name],
|
||||
)
|
||||
|
||||
|
||||
def func_ref_from_frame(
|
||||
frame: FrameType,
|
||||
) -> Callable:
|
||||
|
@ -98,34 +120,63 @@ def func_ref_from_frame(
|
|||
)
|
||||
|
||||
|
||||
# TODO: move all this into new `.devx._code`!
|
||||
# -[ ] prolly create a `@runtime_api` dec?
|
||||
# -[ ] ^- make it capture and/or accept buncha optional
|
||||
# meta-data like a fancier version of `@pdbp.hideframe`.
|
||||
#
|
||||
class CallerInfo(pretty_struct.Struct):
|
||||
rt_fi: inspect.FrameInfo
|
||||
call_frame: FrameType
|
||||
# https://docs.python.org/dev/reference/datamodel.html#frame-objects
|
||||
# https://docs.python.org/dev/library/inspect.html#the-interpreter-stack
|
||||
_api_frame: FrameType
|
||||
|
||||
@property
|
||||
def api_func_ref(self) -> Callable|None:
|
||||
return func_ref_from_frame(self.rt_fi.frame)
|
||||
def api_frame(self) -> FrameType:
|
||||
try:
|
||||
self._api_frame.clear()
|
||||
except RuntimeError:
|
||||
# log.warning(
|
||||
print(
|
||||
f'Frame {self._api_frame} for {self.api_func} is still active!'
|
||||
)
|
||||
|
||||
return self._api_frame
|
||||
|
||||
_api_func: Callable
|
||||
|
||||
@property
|
||||
def api_func(self) -> Callable:
|
||||
return self._api_func
|
||||
|
||||
_caller_frames_up: int|None = 1
|
||||
_caller_frame: FrameType|None = None # cached after first stack scan
|
||||
|
||||
@property
|
||||
def api_nsp(self) -> NamespacePath|None:
|
||||
func: FunctionType = self.api_func_ref
|
||||
func: FunctionType = self.api_func
|
||||
if func:
|
||||
return NamespacePath.from_ref(func)
|
||||
|
||||
return '<unknown>'
|
||||
|
||||
@property
|
||||
def caller_func_ref(self) -> Callable|None:
|
||||
return func_ref_from_frame(self.call_frame)
|
||||
def caller_frame(self) -> FrameType:
|
||||
|
||||
# if not already cached, scan up stack explicitly by
|
||||
# configured count.
|
||||
if not self._caller_frame:
|
||||
if self._caller_frames_up:
|
||||
for _ in range(self._caller_frames_up):
|
||||
caller_frame: FrameType|None = self.api_frame.f_back
|
||||
|
||||
if not caller_frame:
|
||||
raise ValueError(
|
||||
'No frame exists {self._caller_frames_up} up from\n'
|
||||
f'{self.api_frame} @ {self.api_nsp}\n'
|
||||
)
|
||||
|
||||
self._caller_frame = caller_frame
|
||||
|
||||
return self._caller_frame
|
||||
|
||||
@property
|
||||
def caller_nsp(self) -> NamespacePath|None:
|
||||
func: FunctionType = self.caller_func_ref
|
||||
func: FunctionType = self.api_func
|
||||
if func:
|
||||
return NamespacePath.from_ref(func)
|
||||
|
||||
|
@ -172,108 +223,66 @@ def find_caller_info(
|
|||
call_frame = call_frame.f_back
|
||||
|
||||
return CallerInfo(
|
||||
rt_fi=fi,
|
||||
call_frame=call_frame,
|
||||
_api_frame=rt_frame,
|
||||
_api_func=func_ref_from_frame(rt_frame),
|
||||
_caller_frames_up=go_up_iframes,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def pformat_boxed_tb(
|
||||
tb_str: str,
|
||||
fields_str: str|None = None,
|
||||
field_prefix: str = ' |_',
|
||||
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
|
||||
|
||||
tb_box_indent: int|None = None,
|
||||
tb_body_indent: int = 1,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Create a "boxed" looking traceback string.
|
||||
# TODO: -[x] move all this into new `.devx._code`!
|
||||
# -[ ] consider rename to _callstack?
|
||||
# -[ ] prolly create a `@runtime_api` dec?
|
||||
# |_ @api_frame seems better?
|
||||
# -[ ] ^- make it capture and/or accept buncha optional
|
||||
# meta-data like a fancier version of `@pdbp.hideframe`.
|
||||
#
|
||||
def api_frame(
|
||||
wrapped: Callable|None = None,
|
||||
*,
|
||||
caller_frames_up: int = 1,
|
||||
|
||||
Useful for emphasizing traceback text content as being an
|
||||
embedded attribute of some other object (like
|
||||
a `RemoteActorError` or other boxing remote error shuttle
|
||||
container).
|
||||
) -> Callable:
|
||||
|
||||
Any other parent/container "fields" can be passed in the
|
||||
`fields_str` input along with other prefix/indent settings.
|
||||
# handle the decorator called WITHOUT () case,
|
||||
# i.e. just @api_frame, NOT @api_frame(extra=<blah>)
|
||||
if wrapped is None:
|
||||
return partial(
|
||||
api_frame,
|
||||
caller_frames_up=caller_frames_up,
|
||||
)
|
||||
|
||||
'''
|
||||
if (
|
||||
fields_str
|
||||
and
|
||||
field_prefix
|
||||
@wrapt.decorator
|
||||
async def wrapper(
|
||||
wrapped: Callable,
|
||||
instance: object,
|
||||
args: tuple,
|
||||
kwargs: dict,
|
||||
):
|
||||
fields: str = textwrap.indent(
|
||||
fields_str,
|
||||
prefix=field_prefix,
|
||||
)
|
||||
else:
|
||||
fields = fields_str or ''
|
||||
# maybe cache the API frame for this call
|
||||
global _frame2callerinfo_cache
|
||||
this_frame: FrameType = inspect.currentframe()
|
||||
api_frame: FrameType = this_frame.f_back
|
||||
|
||||
tb_body = tb_str
|
||||
if tb_body_indent:
|
||||
tb_body: str = textwrap.indent(
|
||||
tb_str,
|
||||
prefix=tb_body_indent * ' ',
|
||||
if not _frame2callerinfo_cache.get(api_frame):
|
||||
_frame2callerinfo_cache[api_frame] = CallerInfo(
|
||||
_api_frame=api_frame,
|
||||
_api_func=wrapped,
|
||||
_caller_frames_up=caller_frames_up,
|
||||
)
|
||||
|
||||
tb_box: str = (
|
||||
return wrapped(*args, **kwargs)
|
||||
|
||||
# orig
|
||||
# f' |\n'
|
||||
# f' ------ - ------\n\n'
|
||||
# f'{tb_str}\n'
|
||||
# f' ------ - ------\n'
|
||||
# f' _|\n'
|
||||
|
||||
f'|\n'
|
||||
f' ------ - ------\n\n'
|
||||
# f'{tb_str}\n'
|
||||
f'{tb_body}'
|
||||
f' ------ - ------\n'
|
||||
f'_|\n'
|
||||
)
|
||||
tb_box_indent: str = (
|
||||
tb_box_indent
|
||||
or
|
||||
1
|
||||
|
||||
# (len(field_prefix))
|
||||
# ? ^-TODO-^ ? if you wanted another indent level
|
||||
)
|
||||
if tb_box_indent > 0:
|
||||
tb_box: str = textwrap.indent(
|
||||
tb_box,
|
||||
prefix=tb_box_indent * ' ',
|
||||
)
|
||||
|
||||
return (
|
||||
fields
|
||||
+
|
||||
tb_box
|
||||
)
|
||||
|
||||
|
||||
def pformat_caller_frame(
|
||||
stack_limit: int = 1,
|
||||
box_tb: bool = True,
|
||||
) -> str:
|
||||
'''
|
||||
Capture and return the traceback text content from
|
||||
`stack_limit` call frames up.
|
||||
|
||||
'''
|
||||
tb_str: str = (
|
||||
'\n'.join(
|
||||
traceback.format_stack(limit=stack_limit)
|
||||
)
|
||||
)
|
||||
if box_tb:
|
||||
tb_str: str = pformat_boxed_tb(
|
||||
tb_str=tb_str,
|
||||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
return tb_str
|
||||
# annotate the function as a "api function", meaning it is
|
||||
# a function for which the function above it in the call stack should be
|
||||
# non-`tractor` code aka "user code".
|
||||
#
|
||||
# in the global frame cache for easy lookup from a given
|
||||
# func-instance
|
||||
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
|
||||
wrapped.__api_func__: bool = True
|
||||
return wrapper(wrapped)
|
|
@ -57,8 +57,8 @@ LEVELS: dict[str, int] = {
|
|||
'TRANSPORT': 5,
|
||||
'RUNTIME': 15,
|
||||
'CANCEL': 16,
|
||||
'DEVX': 400,
|
||||
'PDB': 500,
|
||||
'DEVX': 600,
|
||||
}
|
||||
# _custom_levels: set[str] = {
|
||||
# lvlname.lower for lvlname in LEVELS.keys()
|
||||
|
@ -137,7 +137,7 @@ class StackLevelAdapter(LoggerAdapter):
|
|||
"Developer experience" sub-sys statuses.
|
||||
|
||||
'''
|
||||
return self.log(600, msg)
|
||||
return self.log(400, msg)
|
||||
|
||||
def log(
|
||||
self,
|
||||
|
@ -202,8 +202,29 @@ class StackLevelAdapter(LoggerAdapter):
|
|||
)
|
||||
|
||||
|
||||
# TODO IDEA:
|
||||
# -[ ] do per task-name and actor-name color coding
|
||||
# -[ ] unique color per task-id and actor-uuid
|
||||
def pformat_task_uid(
|
||||
id_part: str = 'tail'
|
||||
):
|
||||
'''
|
||||
Return `str`-ified unique for a `trio.Task` via a combo of its
|
||||
`.name: str` and `id()` truncated output.
|
||||
|
||||
'''
|
||||
task: trio.Task = trio.lowlevel.current_task()
|
||||
tid: str = str(id(task))
|
||||
if id_part == 'tail':
|
||||
tid_part: str = tid[-6:]
|
||||
else:
|
||||
tid_part: str = tid[:6]
|
||||
|
||||
return f'{task.name}[{tid_part}]'
|
||||
|
||||
|
||||
_conc_name_getters = {
|
||||
'task': lambda: trio.lowlevel.current_task().name,
|
||||
'task': pformat_task_uid,
|
||||
'actor': lambda: current_actor(),
|
||||
'actor_name': lambda: current_actor().name,
|
||||
'actor_uid': lambda: current_actor().uid[1][:6],
|
||||
|
@ -211,7 +232,10 @@ _conc_name_getters = {
|
|||
|
||||
|
||||
class ActorContextInfo(Mapping):
|
||||
"Dyanmic lookup for local actor and task names"
|
||||
'''
|
||||
Dyanmic lookup for local actor and task names.
|
||||
|
||||
'''
|
||||
_context_keys = (
|
||||
'task',
|
||||
'actor',
|
||||
|
|
|
@ -44,7 +44,7 @@ from ._codec import (
|
|||
# )
|
||||
|
||||
from .types import (
|
||||
Msg as Msg,
|
||||
PayloadMsg as PayloadMsg,
|
||||
|
||||
Aid as Aid,
|
||||
SpawnSpec as SpawnSpec,
|
||||
|
|
|
@ -432,7 +432,7 @@ class MsgCodec(Struct):
|
|||
|
||||
# ) -> Any|Struct:
|
||||
|
||||
# msg: Msg = codec.dec.decode(msg)
|
||||
# msg: PayloadMsg = codec.dec.decode(msg)
|
||||
# payload_tag: str = msg.header.payload_tag
|
||||
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
||||
# return payload_dec.decode(msg.pld)
|
||||
|
|
|
@ -22,10 +22,9 @@ operational helpers for processing transaction flows.
|
|||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import (
|
||||
# asynccontextmanager as acm,
|
||||
asynccontextmanager as acm,
|
||||
contextmanager as cm,
|
||||
)
|
||||
from contextvars import ContextVar
|
||||
from typing import (
|
||||
Any,
|
||||
Type,
|
||||
|
@ -50,6 +49,7 @@ from tractor._exceptions import (
|
|||
_mk_msg_type_err,
|
||||
pack_from_raise,
|
||||
)
|
||||
from tractor._state import current_ipc_ctx
|
||||
from ._codec import (
|
||||
mk_dec,
|
||||
MsgDec,
|
||||
|
@ -75,7 +75,7 @@ if TYPE_CHECKING:
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_def_any_pldec: MsgDec = mk_dec()
|
||||
_def_any_pldec: MsgDec[Any] = mk_dec()
|
||||
|
||||
|
||||
class PldRx(Struct):
|
||||
|
@ -104,15 +104,19 @@ class PldRx(Struct):
|
|||
'''
|
||||
# TODO: better to bind it here?
|
||||
# _rx_mc: trio.MemoryReceiveChannel
|
||||
_pldec: MsgDec
|
||||
_pld_dec: MsgDec
|
||||
_ctx: Context|None = None
|
||||
_ipc: Context|MsgStream|None = None
|
||||
|
||||
@property
|
||||
def pld_dec(self) -> MsgDec:
|
||||
return self._pldec
|
||||
return self._pld_dec
|
||||
|
||||
# TODO: a better name?
|
||||
# -[ ] when would this be used as it avoids needingn to pass the
|
||||
# ipc prim to every method
|
||||
@cm
|
||||
def apply_to_ipc(
|
||||
def wraps_ipc(
|
||||
self,
|
||||
ipc_prim: Context|MsgStream,
|
||||
|
||||
|
@ -140,49 +144,50 @@ class PldRx(Struct):
|
|||
exit.
|
||||
|
||||
'''
|
||||
orig_dec: MsgDec = self._pldec
|
||||
orig_dec: MsgDec = self._pld_dec
|
||||
limit_dec: MsgDec = mk_dec(spec=spec)
|
||||
try:
|
||||
self._pldec = limit_dec
|
||||
self._pld_dec = limit_dec
|
||||
yield limit_dec
|
||||
finally:
|
||||
self._pldec = orig_dec
|
||||
self._pld_dec = orig_dec
|
||||
|
||||
@property
|
||||
def dec(self) -> msgpack.Decoder:
|
||||
return self._pldec.dec
|
||||
return self._pld_dec.dec
|
||||
|
||||
def recv_pld_nowait(
|
||||
self,
|
||||
# TODO: make this `MsgStream` compat as well, see above^
|
||||
# ipc_prim: Context|MsgStream,
|
||||
ctx: Context,
|
||||
ipc: Context|MsgStream,
|
||||
|
||||
ipc_msg: MsgType|None = None,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
|
||||
hide_tb: bool = False,
|
||||
**dec_msg_kwargs,
|
||||
|
||||
) -> Any|Raw:
|
||||
__tracebackhide__: bool = True
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
msg: MsgType = (
|
||||
ipc_msg
|
||||
or
|
||||
|
||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||
ctx._rx_chan.receive_nowait()
|
||||
ipc._rx_chan.receive_nowait()
|
||||
)
|
||||
return self.dec_msg(
|
||||
msg,
|
||||
ctx=ctx,
|
||||
ipc=ipc,
|
||||
expect_msg=expect_msg,
|
||||
hide_tb=hide_tb,
|
||||
**dec_msg_kwargs,
|
||||
)
|
||||
|
||||
async def recv_pld(
|
||||
self,
|
||||
ctx: Context,
|
||||
ipc: Context|MsgStream,
|
||||
ipc_msg: MsgType|None = None,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
hide_tb: bool = True,
|
||||
|
@ -200,11 +205,11 @@ class PldRx(Struct):
|
|||
or
|
||||
|
||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||
await ctx._rx_chan.receive()
|
||||
await ipc._rx_chan.receive()
|
||||
)
|
||||
return self.dec_msg(
|
||||
msg=msg,
|
||||
ctx=ctx,
|
||||
ipc=ipc,
|
||||
expect_msg=expect_msg,
|
||||
**dec_msg_kwargs,
|
||||
)
|
||||
|
@ -212,7 +217,7 @@ class PldRx(Struct):
|
|||
def dec_msg(
|
||||
self,
|
||||
msg: MsgType,
|
||||
ctx: Context,
|
||||
ipc: Context|MsgStream,
|
||||
expect_msg: Type[MsgType]|None,
|
||||
|
||||
raise_error: bool = True,
|
||||
|
@ -225,6 +230,9 @@ class PldRx(Struct):
|
|||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
_src_err = None
|
||||
src_err: BaseException|None = None
|
||||
match msg:
|
||||
# payload-data shuttle msg; deliver the `.pld` value
|
||||
# directly to IPC (primitive) client-consumer code.
|
||||
|
@ -234,7 +242,7 @@ class PldRx(Struct):
|
|||
|Return(pld=pld) # termination phase
|
||||
):
|
||||
try:
|
||||
pld: PayloadT = self._pldec.decode(pld)
|
||||
pld: PayloadT = self._pld_dec.decode(pld)
|
||||
log.runtime(
|
||||
'Decoded msg payload\n\n'
|
||||
f'{msg}\n\n'
|
||||
|
@ -243,25 +251,30 @@ class PldRx(Struct):
|
|||
)
|
||||
return pld
|
||||
|
||||
# XXX pld-type failure
|
||||
except ValidationError as src_err:
|
||||
# XXX pld-value type failure
|
||||
except ValidationError as valerr:
|
||||
# pack mgterr into error-msg for
|
||||
# reraise below; ensure remote-actor-err
|
||||
# info is displayed nicely?
|
||||
msgterr: MsgTypeError = _mk_msg_type_err(
|
||||
msg=msg,
|
||||
codec=self.pld_dec,
|
||||
src_validation_error=src_err,
|
||||
src_validation_error=valerr,
|
||||
is_invalid_payload=True,
|
||||
)
|
||||
msg: Error = pack_from_raise(
|
||||
local_err=msgterr,
|
||||
cid=msg.cid,
|
||||
src_uid=ctx.chan.uid,
|
||||
src_uid=ipc.chan.uid,
|
||||
)
|
||||
src_err = valerr
|
||||
|
||||
# XXX some other decoder specific failure?
|
||||
# except TypeError as src_error:
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
# raise src_error
|
||||
# ^-TODO-^ can remove?
|
||||
|
||||
# a runtime-internal RPC endpoint response.
|
||||
# always passthrough since (internal) runtime
|
||||
|
@ -299,6 +312,7 @@ class PldRx(Struct):
|
|||
return src_err
|
||||
|
||||
case Stop(cid=cid):
|
||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||
message: str = (
|
||||
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
|
||||
f'{ctx.peer_side!r} peer ?\n'
|
||||
|
@ -341,14 +355,21 @@ class PldRx(Struct):
|
|||
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
|
||||
#
|
||||
# fallthrough and raise from `src_err`
|
||||
try:
|
||||
_raise_from_unexpected_msg(
|
||||
ctx=ctx,
|
||||
ctx=getattr(ipc, 'ctx', ipc),
|
||||
msg=msg,
|
||||
src_err=src_err,
|
||||
log=log,
|
||||
expect_msg=expect_msg,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
except UnboundLocalError:
|
||||
# XXX if there's an internal lookup error in the above
|
||||
# code (prolly on `src_err`) we want to show this frame
|
||||
# in the tb!
|
||||
__tracebackhide__: bool = False
|
||||
raise
|
||||
|
||||
async def recv_msg_w_pld(
|
||||
self,
|
||||
|
@ -378,52 +399,13 @@ class PldRx(Struct):
|
|||
# msg instance?
|
||||
pld: PayloadT = self.dec_msg(
|
||||
msg,
|
||||
ctx=ipc,
|
||||
ipc=ipc,
|
||||
expect_msg=expect_msg,
|
||||
**kwargs,
|
||||
)
|
||||
return msg, pld
|
||||
|
||||
|
||||
# Always maintain a task-context-global `PldRx`
|
||||
_def_pld_rx: PldRx = PldRx(
|
||||
_pldec=_def_any_pldec,
|
||||
)
|
||||
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
|
||||
'pld_rx',
|
||||
default=_def_pld_rx,
|
||||
)
|
||||
|
||||
|
||||
def current_pldrx() -> PldRx:
|
||||
'''
|
||||
Return the current `trio.Task.context`'s msg-payload-receiver.
|
||||
|
||||
A payload receiver is the IPC-msg processing sub-sys which
|
||||
filters inter-actor-task communicated payload data, i.e. the
|
||||
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
|
||||
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
|
||||
up from `tractor`'s transport layer but BEFORE the data is
|
||||
yielded to application code, normally via an IPC primitive API
|
||||
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
|
||||
|
||||
Modification of the current payload spec via `limit_plds()`
|
||||
allows a `tractor` application to contextually filter IPC
|
||||
payload content with a type specification as supported by
|
||||
the interchange backend.
|
||||
|
||||
- for `msgspec` see <PUTLINKHERE>.
|
||||
|
||||
NOTE that the `PldRx` itself is a per-`Context` global sub-system
|
||||
that normally does not change other then the applied pld-spec
|
||||
for the current `trio.Task`.
|
||||
|
||||
'''
|
||||
# ctx: context = current_ipc_ctx()
|
||||
# return ctx._pld_rx
|
||||
return _ctxvar_PldRx.get()
|
||||
|
||||
|
||||
@cm
|
||||
def limit_plds(
|
||||
spec: Union[Type[Struct]],
|
||||
|
@ -439,29 +421,55 @@ def limit_plds(
|
|||
'''
|
||||
__tracebackhide__: bool = True
|
||||
try:
|
||||
# sanity on orig settings
|
||||
orig_pldrx: PldRx = current_pldrx()
|
||||
orig_pldec: MsgDec = orig_pldrx.pld_dec
|
||||
curr_ctx: Context = current_ipc_ctx()
|
||||
rx: PldRx = curr_ctx._pld_rx
|
||||
orig_pldec: MsgDec = rx.pld_dec
|
||||
|
||||
with orig_pldrx.limit_plds(
|
||||
with rx.limit_plds(
|
||||
spec=spec,
|
||||
**kwargs,
|
||||
) as pldec:
|
||||
log.info(
|
||||
log.runtime(
|
||||
'Applying payload-decoder\n\n'
|
||||
f'{pldec}\n'
|
||||
)
|
||||
yield pldec
|
||||
finally:
|
||||
log.info(
|
||||
log.runtime(
|
||||
'Reverted to previous payload-decoder\n\n'
|
||||
f'{orig_pldec}\n'
|
||||
)
|
||||
assert (
|
||||
(pldrx := current_pldrx()) is orig_pldrx
|
||||
and
|
||||
pldrx.pld_dec is orig_pldec
|
||||
)
|
||||
# sanity on orig settings
|
||||
assert rx.pld_dec is orig_pldec
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_limit_plds(
|
||||
ctx: Context,
|
||||
spec: Union[Type[Struct]]|None = None,
|
||||
**kwargs,
|
||||
) -> MsgDec|None:
|
||||
'''
|
||||
Async compat maybe-payload type limiter.
|
||||
|
||||
Mostly for use inside other internal `@acm`s such that a separate
|
||||
indent block isn't needed when an async one is already being
|
||||
used.
|
||||
|
||||
'''
|
||||
if spec is None:
|
||||
yield None
|
||||
return
|
||||
|
||||
# sanity on scoping
|
||||
curr_ctx: Context = current_ipc_ctx()
|
||||
assert ctx is curr_ctx
|
||||
|
||||
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
|
||||
yield msgdec
|
||||
|
||||
curr_ctx: Context = current_ipc_ctx()
|
||||
assert ctx is curr_ctx
|
||||
|
||||
|
||||
async def drain_to_final_msg(
|
||||
|
@ -543,21 +551,12 @@ async def drain_to_final_msg(
|
|||
match msg:
|
||||
|
||||
# final result arrived!
|
||||
case Return(
|
||||
# cid=cid,
|
||||
# pld=res,
|
||||
):
|
||||
# ctx._result: Any = res
|
||||
ctx._result: Any = pld
|
||||
case Return():
|
||||
log.runtime(
|
||||
'Context delivered final draining msg:\n'
|
||||
f'{pretty_struct.pformat(msg)}'
|
||||
)
|
||||
# XXX: only close the rx mem chan AFTER
|
||||
# a final result is retreived.
|
||||
# if ctx._rx_chan:
|
||||
# await ctx._rx_chan.aclose()
|
||||
# TODO: ^ we don't need it right?
|
||||
ctx._result: Any = pld
|
||||
result_msg = msg
|
||||
break
|
||||
|
||||
|
@ -664,24 +663,6 @@ async def drain_to_final_msg(
|
|||
result_msg = msg
|
||||
break # OOOOOF, yeah obvi we need this..
|
||||
|
||||
# XXX we should never really get here
|
||||
# right! since `._deliver_msg()` should
|
||||
# always have detected an {'error': ..}
|
||||
# msg and already called this right!?!
|
||||
# elif error := unpack_error(
|
||||
# msg=msg,
|
||||
# chan=ctx._portal.channel,
|
||||
# hide_tb=False,
|
||||
# ):
|
||||
# log.critical('SHOULD NEVER GET HERE!?')
|
||||
# assert msg is ctx._cancel_msg
|
||||
# assert error.msgdata == ctx._remote_error.msgdata
|
||||
# assert error.ipc_msg == ctx._remote_error.ipc_msg
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
# ctx._maybe_cancel_and_set_remote_error(error)
|
||||
# ctx._maybe_raise_remote_err(error)
|
||||
|
||||
else:
|
||||
# bubble the original src key error
|
||||
raise
|
||||
|
|
|
@ -56,8 +56,7 @@ log = get_logger('tractor.msgspec')
|
|||
PayloadT = TypeVar('PayloadT')
|
||||
|
||||
|
||||
# TODO: PayloadMsg
|
||||
class Msg(
|
||||
class PayloadMsg(
|
||||
Struct,
|
||||
Generic[PayloadT],
|
||||
|
||||
|
@ -110,6 +109,10 @@ class Msg(
|
|||
pld: Raw
|
||||
|
||||
|
||||
# TODO: complete rename
|
||||
Msg = PayloadMsg
|
||||
|
||||
|
||||
class Aid(
|
||||
Struct,
|
||||
tag=True,
|
||||
|
@ -299,7 +302,7 @@ class StartAck(
|
|||
|
||||
|
||||
class Started(
|
||||
Msg,
|
||||
PayloadMsg,
|
||||
Generic[PayloadT],
|
||||
):
|
||||
'''
|
||||
|
@ -313,12 +316,12 @@ class Started(
|
|||
|
||||
# TODO: instead of using our existing `Start`
|
||||
# for this (as we did with the original `{'cmd': ..}` style)
|
||||
# class Cancel(Msg):
|
||||
# class Cancel:
|
||||
# cid: str
|
||||
|
||||
|
||||
class Yield(
|
||||
Msg,
|
||||
PayloadMsg,
|
||||
Generic[PayloadT],
|
||||
):
|
||||
'''
|
||||
|
@ -345,7 +348,7 @@ class Stop(
|
|||
|
||||
# TODO: is `Result` or `Out[come]` a better name?
|
||||
class Return(
|
||||
Msg,
|
||||
PayloadMsg,
|
||||
Generic[PayloadT],
|
||||
):
|
||||
'''
|
||||
|
@ -357,7 +360,7 @@ class Return(
|
|||
|
||||
|
||||
class CancelAck(
|
||||
Msg,
|
||||
PayloadMsg,
|
||||
Generic[PayloadT],
|
||||
):
|
||||
'''
|
||||
|
@ -463,14 +466,14 @@ def from_dict_msg(
|
|||
|
||||
# TODO: should be make a msg version of `ContextCancelled?`
|
||||
# and/or with a scope field or a full `ActorCancelled`?
|
||||
# class Cancelled(Msg):
|
||||
# class Cancelled(MsgType):
|
||||
# cid: str
|
||||
|
||||
# TODO what about overruns?
|
||||
# class Overrun(Msg):
|
||||
# class Overrun(MsgType):
|
||||
# cid: str
|
||||
|
||||
_runtime_msgs: list[Msg] = [
|
||||
_runtime_msgs: list[Struct] = [
|
||||
|
||||
# identity handshake on first IPC `Channel` contact.
|
||||
Aid,
|
||||
|
@ -496,9 +499,9 @@ _runtime_msgs: list[Msg] = [
|
|||
]
|
||||
|
||||
# the no-outcome-yet IAC (inter-actor-communication) sub-set which
|
||||
# can be `Msg.pld` payload field type-limited by application code
|
||||
# can be `PayloadMsg.pld` payload field type-limited by application code
|
||||
# using `apply_codec()` and `limit_msg_spec()`.
|
||||
_payload_msgs: list[Msg] = [
|
||||
_payload_msgs: list[PayloadMsg] = [
|
||||
# first <value> from `Context.started(<value>)`
|
||||
Started,
|
||||
|
||||
|
@ -541,8 +544,8 @@ def mk_msg_spec(
|
|||
] = 'indexed_generics',
|
||||
|
||||
) -> tuple[
|
||||
Union[Type[Msg]],
|
||||
list[Type[Msg]],
|
||||
Union[MsgType],
|
||||
list[MsgType],
|
||||
]:
|
||||
'''
|
||||
Create a payload-(data-)type-parameterized IPC message specification.
|
||||
|
@ -554,7 +557,7 @@ def mk_msg_spec(
|
|||
determined by the input `payload_type_union: Union[Type]`.
|
||||
|
||||
'''
|
||||
submsg_types: list[Type[Msg]] = Msg.__subclasses__()
|
||||
submsg_types: list[MsgType] = Msg.__subclasses__()
|
||||
bases: tuple = (
|
||||
# XXX NOTE XXX the below generic-parameterization seems to
|
||||
# be THE ONLY way to get this to work correctly in terms
|
||||
|
|
Loading…
Reference in New Issue