Compare commits
21 Commits
343b7c9712
...
b22f7dcae0
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | b22f7dcae0 | |
Tyler Goodlet | fde62c72be | |
Tyler Goodlet | 4ef77bb64f | |
Tyler Goodlet | e78fdf2f69 | |
Tyler Goodlet | 13bc3c308d | |
Tyler Goodlet | 60fc43e530 | |
Tyler Goodlet | 30afcd2b6b | |
Tyler Goodlet | c80f020ebc | |
Tyler Goodlet | 262a0e36c6 | |
Tyler Goodlet | d93135acd8 | |
Tyler Goodlet | b23780c102 | |
Tyler Goodlet | 31de5f6648 | |
Tyler Goodlet | 236083b6e4 | |
Tyler Goodlet | d2dee87b36 | |
Tyler Goodlet | 5cb0cc0f0b | |
Tyler Goodlet | fc075e96c6 | |
Tyler Goodlet | d6ca4771ce | |
Tyler Goodlet | c5a0cfc639 | |
Tyler Goodlet | f85314ecab | |
Tyler Goodlet | c929bc15c9 | |
Tyler Goodlet | 6690968236 |
|
@ -1,6 +1,11 @@
|
||||||
import time
|
import time
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor import (
|
||||||
|
ActorNursery,
|
||||||
|
MsgStream,
|
||||||
|
Portal,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# this is the first 2 actors, streamer_1 and streamer_2
|
# this is the first 2 actors, streamer_1 and streamer_2
|
||||||
|
@ -12,14 +17,18 @@ async def stream_data(seed):
|
||||||
|
|
||||||
# this is the third actor; the aggregator
|
# this is the third actor; the aggregator
|
||||||
async def aggregate(seed):
|
async def aggregate(seed):
|
||||||
"""Ensure that the two streams we receive match but only stream
|
'''
|
||||||
|
Ensure that the two streams we receive match but only stream
|
||||||
a single set of values to the parent.
|
a single set of values to the parent.
|
||||||
"""
|
|
||||||
async with tractor.open_nursery() as nursery:
|
'''
|
||||||
portals = []
|
an: ActorNursery
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
|
portals: list[Portal] = []
|
||||||
for i in range(1, 3):
|
for i in range(1, 3):
|
||||||
# fork point
|
|
||||||
portal = await nursery.start_actor(
|
# fork/spawn call
|
||||||
|
portal = await an.start_actor(
|
||||||
name=f'streamer_{i}',
|
name=f'streamer_{i}',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -43,7 +52,11 @@ async def aggregate(seed):
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
|
|
||||||
for portal in portals:
|
for portal in portals:
|
||||||
n.start_soon(push_to_chan, portal, send_chan.clone())
|
n.start_soon(
|
||||||
|
push_to_chan,
|
||||||
|
portal,
|
||||||
|
send_chan.clone(),
|
||||||
|
)
|
||||||
|
|
||||||
# close this local task's reference to send side
|
# close this local task's reference to send side
|
||||||
await send_chan.aclose()
|
await send_chan.aclose()
|
||||||
|
@ -60,7 +73,7 @@ async def aggregate(seed):
|
||||||
|
|
||||||
print("FINISHED ITERATING in aggregator")
|
print("FINISHED ITERATING in aggregator")
|
||||||
|
|
||||||
await nursery.cancel()
|
await an.cancel()
|
||||||
print("WAITING on `ActorNursery` to finish")
|
print("WAITING on `ActorNursery` to finish")
|
||||||
print("AGGREGATOR COMPLETE!")
|
print("AGGREGATOR COMPLETE!")
|
||||||
|
|
||||||
|
@ -75,18 +88,21 @@ async def main() -> list[int]:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# yes, a nursery which spawns `trio`-"actors" B)
|
# yes, a nursery which spawns `trio`-"actors" B)
|
||||||
nursery: tractor.ActorNursery
|
an: ActorNursery
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery(
|
||||||
|
loglevel='cancel',
|
||||||
|
debug_mode=True,
|
||||||
|
) as an:
|
||||||
|
|
||||||
seed = int(1e3)
|
seed = int(1e3)
|
||||||
pre_start = time.time()
|
pre_start = time.time()
|
||||||
|
|
||||||
portal: tractor.Portal = await nursery.start_actor(
|
portal: Portal = await an.start_actor(
|
||||||
name='aggregator',
|
name='aggregator',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
stream: tractor.MsgStream
|
stream: MsgStream
|
||||||
async with portal.open_stream_from(
|
async with portal.open_stream_from(
|
||||||
aggregate,
|
aggregate,
|
||||||
seed=seed,
|
seed=seed,
|
||||||
|
@ -95,11 +111,12 @@ async def main() -> list[int]:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
# the portal call returns exactly what you'd expect
|
# the portal call returns exactly what you'd expect
|
||||||
# as if the remote "aggregate" function was called locally
|
# as if the remote "aggregate" function was called locally
|
||||||
result_stream = []
|
result_stream: list[int] = []
|
||||||
async for value in stream:
|
async for value in stream:
|
||||||
result_stream.append(value)
|
result_stream.append(value)
|
||||||
|
|
||||||
await portal.cancel_actor()
|
cancelled: bool = await portal.cancel_actor()
|
||||||
|
assert cancelled
|
||||||
|
|
||||||
print(f"STREAM TIME = {time.time() - start}")
|
print(f"STREAM TIME = {time.time() - start}")
|
||||||
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
|
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
|
||||||
|
|
|
@ -55,6 +55,7 @@ xontrib-vox = "^0.0.1"
|
||||||
optional = false
|
optional = false
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
pytest = "^8.2.0"
|
pytest = "^8.2.0"
|
||||||
|
pexpect = "^4.9.0"
|
||||||
|
|
||||||
# only for xonsh as sh..
|
# only for xonsh as sh..
|
||||||
xontrib-vox = "^0.0.1"
|
xontrib-vox = "^0.0.1"
|
||||||
|
|
|
@ -97,6 +97,7 @@ def test_ipc_channel_break_during_stream(
|
||||||
examples_dir() / 'advanced_faults'
|
examples_dir() / 'advanced_faults'
|
||||||
/ 'ipc_failure_during_stream.py',
|
/ 'ipc_failure_during_stream.py',
|
||||||
root=examples_dir(),
|
root=examples_dir(),
|
||||||
|
consider_namespace_packages=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
# by def we expect KBI from user after a simulated "hang
|
# by def we expect KBI from user after a simulated "hang
|
||||||
|
|
|
@ -89,17 +89,30 @@ def test_remote_error(reg_addr, args_err):
|
||||||
assert excinfo.value.boxed_type == errtype
|
assert excinfo.value.boxed_type == errtype
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# the root task will also error on the `.result()` call
|
# the root task will also error on the `Portal.result()`
|
||||||
# so we expect an error from there AND the child.
|
# call so we expect an error from there AND the child.
|
||||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
# |_ tho seems like on new `trio` this doesn't always
|
||||||
|
# happen?
|
||||||
|
with pytest.raises((
|
||||||
|
BaseExceptionGroup,
|
||||||
|
tractor.RemoteActorError,
|
||||||
|
)) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed errors
|
# ensure boxed errors are `errtype`
|
||||||
for exc in excinfo.value.exceptions:
|
err: BaseException = excinfo.value
|
||||||
|
if isinstance(err, BaseExceptionGroup):
|
||||||
|
suberrs: list[BaseException] = err.exceptions
|
||||||
|
else:
|
||||||
|
suberrs: list[BaseException] = [err]
|
||||||
|
|
||||||
|
for exc in suberrs:
|
||||||
assert exc.boxed_type == errtype
|
assert exc.boxed_type == errtype
|
||||||
|
|
||||||
|
|
||||||
def test_multierror(reg_addr):
|
def test_multierror(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
||||||
more then one actor errors.
|
more then one actor errors.
|
||||||
|
|
|
@ -444,6 +444,7 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
fan_out=fan_out,
|
fan_out=fan_out,
|
||||||
)
|
)
|
||||||
|
# should raise RAE diectly
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
@ -461,12 +462,11 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed errors
|
# ensure boxed error type
|
||||||
for exc in excinfo.value.exceptions:
|
excinfo.value.boxed_type == Exception
|
||||||
assert exc.boxed_type == Exception
|
|
||||||
|
|
||||||
|
|
||||||
def test_trio_closes_early_and_channel_exits(reg_addr):
|
def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||||
|
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||||
exit_early=True,
|
exit_early=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger remote actor error
|
# should raise RAE diectly
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
# should be a quiet exit on a simple channel exit
|
# should be a quiet exit on a simple channel exit
|
||||||
|
@ -492,15 +492,17 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||||
aio_raise_err=True,
|
aio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger remote actor error
|
# should trigger RAE directly, not an eg.
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(BaseExceptionGroup) as excinfo:
|
with pytest.raises(
|
||||||
|
# NOTE: bc we directly wait on `Portal.result()` instead
|
||||||
|
# of capturing it inside the `ActorNursery` machinery.
|
||||||
|
expected_exception=RemoteActorError,
|
||||||
|
) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed errors
|
excinfo.value.boxed_type == Exception
|
||||||
for exc in excinfo.value.exceptions:
|
|
||||||
assert exc.boxed_type == Exception
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -55,9 +55,10 @@ from tractor._testing import (
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def sleep_forever(
|
async def open_stream_then_sleep_forever(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
expect_ctxc: bool = False,
|
expect_ctxc: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Sync the context, open a stream then just sleep.
|
Sync the context, open a stream then just sleep.
|
||||||
|
@ -67,6 +68,10 @@ async def sleep_forever(
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
|
# NOTE: the below means this child will send a `Stop`
|
||||||
|
# to it's parent-side task despite that side never
|
||||||
|
# opening a stream itself.
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
@ -100,7 +105,7 @@ async def error_before_started(
|
||||||
'''
|
'''
|
||||||
async with tractor.wait_for_actor('sleeper') as p2:
|
async with tractor.wait_for_actor('sleeper') as p2:
|
||||||
async with (
|
async with (
|
||||||
p2.open_context(sleep_forever) as (peer_ctx, first),
|
p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first),
|
||||||
peer_ctx.open_stream(),
|
peer_ctx.open_stream(),
|
||||||
):
|
):
|
||||||
# NOTE: this WAS inside an @acm body but i factored it
|
# NOTE: this WAS inside an @acm body but i factored it
|
||||||
|
@ -204,9 +209,13 @@ async def stream_ints(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def stream_from_peer(
|
async def stream_from_peer(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
debug_mode: bool,
|
||||||
peer_name: str = 'sleeper',
|
peer_name: str = 'sleeper',
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
# sanity
|
||||||
|
assert tractor._state.debug_mode() == debug_mode
|
||||||
|
|
||||||
peer: Portal
|
peer: Portal
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
|
@ -240,26 +249,54 @@ async def stream_from_peer(
|
||||||
assert msg is not None
|
assert msg is not None
|
||||||
print(msg)
|
print(msg)
|
||||||
|
|
||||||
# NOTE: cancellation of the (sleeper) peer should always
|
# NOTE: cancellation of the (sleeper) peer should always cause
|
||||||
# cause a `ContextCancelled` raise in this streaming
|
# a `ContextCancelled` raise in this streaming actor.
|
||||||
# actor.
|
except ContextCancelled as _ctxc:
|
||||||
except ContextCancelled as ctxc:
|
ctxc = _ctxc
|
||||||
ctxerr = ctxc
|
|
||||||
|
|
||||||
assert peer_ctx._remote_error is ctxerr
|
# print("TRYING TO ENTER PAUSSE!!!")
|
||||||
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
# await tractor.pause(shield=True)
|
||||||
|
re: ContextCancelled = peer_ctx._remote_error
|
||||||
|
|
||||||
# XXX YES, bc exact same msg instances
|
# XXX YES XXX, remote error should be unpacked only once!
|
||||||
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
|
assert (
|
||||||
|
re
|
||||||
|
is
|
||||||
|
peer_ctx.maybe_error
|
||||||
|
is
|
||||||
|
ctxc
|
||||||
|
is
|
||||||
|
peer_ctx._local_error
|
||||||
|
)
|
||||||
|
# NOTE: these errors should all match!
|
||||||
|
# ------ - ------
|
||||||
|
# XXX [2024-05-03] XXX
|
||||||
|
# ------ - ------
|
||||||
|
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
|
||||||
|
# where the `Error()` msg was directly raising the ctxc
|
||||||
|
# instead of just returning up to the caller inside
|
||||||
|
# `Context.return()` which would results in a diff instance of
|
||||||
|
# the same remote error bubbling out above vs what was
|
||||||
|
# already unpacked and set inside `Context.
|
||||||
|
assert (
|
||||||
|
peer_ctx._remote_error.msgdata
|
||||||
|
==
|
||||||
|
ctxc.msgdata
|
||||||
|
)
|
||||||
|
# ^-XXX-^ notice the data is of course the exact same.. so
|
||||||
|
# the above larger assert makes sense to also always be true!
|
||||||
|
|
||||||
# XXX NO, bc new one always created for property accesss
|
# XXX YES XXX, bc should be exact same msg instances
|
||||||
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
|
assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg
|
||||||
|
|
||||||
|
# XXX NO XXX, bc new one always created for property accesss
|
||||||
|
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
|
||||||
|
|
||||||
# the peer ctx is the canceller even though it's canceller
|
# the peer ctx is the canceller even though it's canceller
|
||||||
# is the "canceller" XD
|
# is the "canceller" XD
|
||||||
assert peer_name in peer_ctx.canceller
|
assert peer_name in peer_ctx.canceller
|
||||||
|
|
||||||
assert "canceller" in ctxerr.canceller
|
assert "canceller" in ctxc.canceller
|
||||||
|
|
||||||
# caller peer should not be the cancel requester
|
# caller peer should not be the cancel requester
|
||||||
assert not ctx.cancel_called
|
assert not ctx.cancel_called
|
||||||
|
@ -283,12 +320,13 @@ async def stream_from_peer(
|
||||||
|
|
||||||
# TODO / NOTE `.canceller` won't have been set yet
|
# TODO / NOTE `.canceller` won't have been set yet
|
||||||
# here because that machinery is inside
|
# here because that machinery is inside
|
||||||
# `.open_context().__aexit__()` BUT, if we had
|
# `Portal.open_context().__aexit__()` BUT, if we had
|
||||||
# a way to know immediately (from the last
|
# a way to know immediately (from the last
|
||||||
# checkpoint) that cancellation was due to
|
# checkpoint) that cancellation was due to
|
||||||
# a remote, we COULD assert this here..see,
|
# a remote, we COULD assert this here..see,
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
#
|
#
|
||||||
|
# await tractor.pause()
|
||||||
# assert 'canceller' in ctx.canceller
|
# assert 'canceller' in ctx.canceller
|
||||||
|
|
||||||
# root/parent actor task should NEVER HAVE cancelled us!
|
# root/parent actor task should NEVER HAVE cancelled us!
|
||||||
|
@ -392,12 +430,13 @@ def test_peer_canceller(
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
sleeper.open_context(
|
sleeper.open_context(
|
||||||
sleep_forever,
|
open_stream_then_sleep_forever,
|
||||||
expect_ctxc=True,
|
expect_ctxc=True,
|
||||||
) as (sleeper_ctx, sent),
|
) as (sleeper_ctx, sent),
|
||||||
|
|
||||||
just_caller.open_context(
|
just_caller.open_context(
|
||||||
stream_from_peer,
|
stream_from_peer,
|
||||||
|
debug_mode=debug_mode,
|
||||||
) as (caller_ctx, sent),
|
) as (caller_ctx, sent),
|
||||||
|
|
||||||
canceller.open_context(
|
canceller.open_context(
|
||||||
|
@ -423,10 +462,11 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# should always raise since this root task does
|
# should always raise since this root task does
|
||||||
# not request the sleeper cancellation ;)
|
# not request the sleeper cancellation ;)
|
||||||
except ContextCancelled as ctxerr:
|
except ContextCancelled as _ctxc:
|
||||||
|
ctxc = _ctxc
|
||||||
print(
|
print(
|
||||||
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
|
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
|
||||||
f'{ctxerr}\n'
|
f'{ctxc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# canceller and caller peers should not
|
# canceller and caller peers should not
|
||||||
|
@ -437,7 +477,7 @@ def test_peer_canceller(
|
||||||
# we were not the actor, our peer was
|
# we were not the actor, our peer was
|
||||||
assert not sleeper_ctx.cancel_acked
|
assert not sleeper_ctx.cancel_acked
|
||||||
|
|
||||||
assert ctxerr.canceller[0] == 'canceller'
|
assert ctxc.canceller[0] == 'canceller'
|
||||||
|
|
||||||
# XXX NOTE XXX: since THIS `ContextCancelled`
|
# XXX NOTE XXX: since THIS `ContextCancelled`
|
||||||
# HAS NOT YET bubbled up to the
|
# HAS NOT YET bubbled up to the
|
||||||
|
@ -448,7 +488,7 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# CASE_1: error-during-ctxc-handling,
|
# CASE_1: error-during-ctxc-handling,
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
raise RuntimeError('Simulated error during teardown')
|
raise RuntimeError('Simulated RTE re-raise during ctxc handling')
|
||||||
|
|
||||||
# CASE_2: standard teardown inside in `.open_context()` block
|
# CASE_2: standard teardown inside in `.open_context()` block
|
||||||
raise
|
raise
|
||||||
|
@ -513,6 +553,9 @@ def test_peer_canceller(
|
||||||
# should be cancelled by US.
|
# should be cancelled by US.
|
||||||
#
|
#
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
|
print(f'loc_err: {_loc_err}\n')
|
||||||
|
assert isinstance(loc_err, RuntimeError)
|
||||||
|
|
||||||
# since we do a rte reraise above, the
|
# since we do a rte reraise above, the
|
||||||
# `.open_context()` error handling should have
|
# `.open_context()` error handling should have
|
||||||
# raised a local rte, thus the internal
|
# raised a local rte, thus the internal
|
||||||
|
@ -521,9 +564,6 @@ def test_peer_canceller(
|
||||||
# a `trio.Cancelled` due to a local
|
# a `trio.Cancelled` due to a local
|
||||||
# `._scope.cancel()` call.
|
# `._scope.cancel()` call.
|
||||||
assert not sleeper_ctx._scope.cancelled_caught
|
assert not sleeper_ctx._scope.cancelled_caught
|
||||||
|
|
||||||
assert isinstance(loc_err, RuntimeError)
|
|
||||||
print(f'_loc_err: {_loc_err}\n')
|
|
||||||
# assert sleeper_ctx._local_error is _loc_err
|
# assert sleeper_ctx._local_error is _loc_err
|
||||||
# assert sleeper_ctx._local_error is _loc_err
|
# assert sleeper_ctx._local_error is _loc_err
|
||||||
assert not (
|
assert not (
|
||||||
|
@ -560,10 +600,13 @@ def test_peer_canceller(
|
||||||
|
|
||||||
else: # the other 2 ctxs
|
else: # the other 2 ctxs
|
||||||
assert (
|
assert (
|
||||||
|
isinstance(re, ContextCancelled)
|
||||||
|
and (
|
||||||
re.canceller
|
re.canceller
|
||||||
==
|
==
|
||||||
canceller.channel.uid
|
canceller.channel.uid
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# since the sleeper errors while handling a
|
# since the sleeper errors while handling a
|
||||||
# peer-cancelled (by ctxc) scenario, we expect
|
# peer-cancelled (by ctxc) scenario, we expect
|
||||||
|
@ -811,8 +854,7 @@ async def serve_subactors(
|
||||||
async with open_nursery() as an:
|
async with open_nursery() as an:
|
||||||
|
|
||||||
# sanity
|
# sanity
|
||||||
if debug_mode:
|
assert tractor._state.debug_mode() == debug_mode
|
||||||
assert tractor._state.debug_mode()
|
|
||||||
|
|
||||||
await ctx.started(peer_name)
|
await ctx.started(peer_name)
|
||||||
async with ctx.open_stream() as ipc:
|
async with ctx.open_stream() as ipc:
|
||||||
|
@ -1091,7 +1133,6 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
'-> root checking `client_ctx.result()`,\n'
|
'-> root checking `client_ctx.result()`,\n'
|
||||||
f'-> checking that sub-spawn {peer_name} is down\n'
|
f'-> checking that sub-spawn {peer_name} is down\n'
|
||||||
)
|
)
|
||||||
# else:
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = await client_ctx.result(hide_tb=False)
|
res = await client_ctx.result(hide_tb=False)
|
||||||
|
|
|
@ -2,7 +2,9 @@
|
||||||
Spawning basics
|
Spawning basics
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Optional
|
from typing import (
|
||||||
|
Any,
|
||||||
|
)
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
|
@ -25,13 +27,11 @@ async def spawn(
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_root_actor(
|
||||||
arbiter_addr=reg_addr,
|
arbiter_addr=reg_addr,
|
||||||
):
|
):
|
||||||
|
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
assert actor.is_arbiter == is_arbiter
|
assert actor.is_arbiter == is_arbiter
|
||||||
data = data_to_pass_down
|
data = data_to_pass_down
|
||||||
|
|
||||||
if actor.is_arbiter:
|
if actor.is_arbiter:
|
||||||
|
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
|
|
||||||
# forks here
|
# forks here
|
||||||
|
@ -95,7 +95,9 @@ async def test_movie_theatre_convo(start_method):
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
async def cellar_door(return_value: Optional[str]):
|
async def cellar_door(
|
||||||
|
return_value: str|None,
|
||||||
|
):
|
||||||
return return_value
|
return return_value
|
||||||
|
|
||||||
|
|
||||||
|
@ -105,16 +107,18 @@ async def cellar_door(return_value: Optional[str]):
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_most_beautiful_word(
|
async def test_most_beautiful_word(
|
||||||
start_method,
|
start_method: str,
|
||||||
return_value
|
return_value: Any,
|
||||||
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
The main ``tractor`` routine.
|
The main ``tractor`` routine.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1):
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as n:
|
||||||
portal = await n.run_in_actor(
|
portal = await n.run_in_actor(
|
||||||
cellar_door,
|
cellar_door,
|
||||||
return_value=return_value,
|
return_value=return_value,
|
||||||
|
|
|
@ -42,6 +42,7 @@ from ._supervise import (
|
||||||
from ._state import (
|
from ._state import (
|
||||||
current_actor as current_actor,
|
current_actor as current_actor,
|
||||||
is_root_process as is_root_process,
|
is_root_process as is_root_process,
|
||||||
|
current_ipc_ctx as current_ipc_ctx,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled as ContextCancelled,
|
ContextCancelled as ContextCancelled,
|
||||||
|
|
|
@ -41,6 +41,7 @@ from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
Mapping,
|
Mapping,
|
||||||
Type,
|
Type,
|
||||||
|
TypeAlias,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
@ -94,7 +95,7 @@ if TYPE_CHECKING:
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._ipc import MsgTransport
|
from ._ipc import MsgTransport
|
||||||
from .devx._code import (
|
from .devx._frame_stack import (
|
||||||
CallerInfo,
|
CallerInfo,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -155,6 +156,41 @@ class Context:
|
||||||
# payload receiver
|
# payload receiver
|
||||||
_pld_rx: msgops.PldRx
|
_pld_rx: msgops.PldRx
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pld_rx(self) -> msgops.PldRx:
|
||||||
|
'''
|
||||||
|
The current `tractor.Context`'s msg-payload-receiver.
|
||||||
|
|
||||||
|
A payload receiver is the IPC-msg processing sub-sys which
|
||||||
|
filters inter-actor-task communicated payload data, i.e. the
|
||||||
|
`PayloadMsg.pld: PayloadT` field value, AFTER its container
|
||||||
|
shuttlle msg (eg. `Started`/`Yield`/`Return) has been
|
||||||
|
delivered up from `tractor`'s transport layer but BEFORE the
|
||||||
|
data is yielded to `tractor` application code.
|
||||||
|
|
||||||
|
The "IPC-primitive API" is normally one of a `Context` (this)` or a `MsgStream`
|
||||||
|
or some higher level API using one of them.
|
||||||
|
|
||||||
|
For ex. `pld_data: PayloadT = MsgStream.receive()` implicitly
|
||||||
|
calls into the stream's parent `Context.pld_rx.recv_pld().` to
|
||||||
|
receive the latest `PayloadMsg.pld` value.
|
||||||
|
|
||||||
|
Modification of the current payload spec via `limit_plds()`
|
||||||
|
allows a `tractor` application to contextually filter IPC
|
||||||
|
payload content with a type specification as supported by the
|
||||||
|
interchange backend.
|
||||||
|
|
||||||
|
- for `msgspec` see <PUTLINKHERE>.
|
||||||
|
|
||||||
|
Note that the `PldRx` itself is a per-`Context` instance that
|
||||||
|
normally only changes when some (sub-)task, on a given "side"
|
||||||
|
of the IPC ctx (either a "child"-side RPC or inside
|
||||||
|
a "parent"-side `Portal.open_context()` block), modifies it
|
||||||
|
using the `.msg._ops.limit_plds()` API.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._pld_rx
|
||||||
|
|
||||||
# full "namespace-path" to target RPC function
|
# full "namespace-path" to target RPC function
|
||||||
_nsf: NamespacePath
|
_nsf: NamespacePath
|
||||||
|
|
||||||
|
@ -231,6 +267,8 @@ class Context:
|
||||||
|
|
||||||
# init and streaming state
|
# init and streaming state
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
|
_started_msg: MsgType|None = None
|
||||||
|
_started_pld: Any = None
|
||||||
_stream_opened: bool = False
|
_stream_opened: bool = False
|
||||||
_stream: MsgStream|None = None
|
_stream: MsgStream|None = None
|
||||||
|
|
||||||
|
@ -623,7 +661,7 @@ class Context:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Setting remote error for ctx\n\n'
|
'Setting remote error for ctx\n\n'
|
||||||
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||||
f'=> {self.side!r}\n\n'
|
f'=> {self.side!r}: {self._actor.uid}\n\n'
|
||||||
f'{error}'
|
f'{error}'
|
||||||
)
|
)
|
||||||
self._remote_error: BaseException = error
|
self._remote_error: BaseException = error
|
||||||
|
@ -678,7 +716,7 @@ class Context:
|
||||||
log.error(
|
log.error(
|
||||||
f'Remote context error:\n\n'
|
f'Remote context error:\n\n'
|
||||||
# f'{pformat(self)}\n'
|
# f'{pformat(self)}\n'
|
||||||
f'{error}\n'
|
f'{error}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._canceller is None:
|
if self._canceller is None:
|
||||||
|
@ -724,8 +762,10 @@ class Context:
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
|
# from .devx import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
|
||||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
|
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
||||||
if (
|
if (
|
||||||
cs
|
cs
|
||||||
and
|
and
|
||||||
|
@ -805,6 +845,7 @@ class Context:
|
||||||
# f'{ci.api_nsp}()\n'
|
# f'{ci.api_nsp}()\n'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
# TODO: use `.dev._frame_stack` scanning to find caller!
|
||||||
return 'Portal.open_context()'
|
return 'Portal.open_context()'
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
|
@ -1304,17 +1345,6 @@ class Context:
|
||||||
ctx=self,
|
ctx=self,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
for msg in drained_msgs:
|
|
||||||
|
|
||||||
# TODO: mask this by default..
|
|
||||||
if isinstance(msg, Return):
|
|
||||||
# from .devx import pause
|
|
||||||
# await pause()
|
|
||||||
# raise InternalError(
|
|
||||||
log.warning(
|
|
||||||
'Final `return` msg should never be drained !?!?\n\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
drained_status: str = (
|
drained_status: str = (
|
||||||
'Ctx drained to final outcome msg\n\n'
|
'Ctx drained to final outcome msg\n\n'
|
||||||
|
@ -1435,6 +1465,10 @@ class Context:
|
||||||
self._result
|
self._result
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_outcome(self) -> bool:
|
||||||
|
return bool(self.maybe_error) or self._final_result_is_set()
|
||||||
|
|
||||||
# @property
|
# @property
|
||||||
def repr_outcome(
|
def repr_outcome(
|
||||||
self,
|
self,
|
||||||
|
@ -1637,8 +1671,6 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
if rt_started != started_msg:
|
if rt_started != started_msg:
|
||||||
# TODO: break these methods out from the struct subtype?
|
|
||||||
|
|
||||||
# TODO: make that one a mod func too..
|
# TODO: make that one a mod func too..
|
||||||
diff = pretty_struct.Struct.__sub__(
|
diff = pretty_struct.Struct.__sub__(
|
||||||
rt_started,
|
rt_started,
|
||||||
|
@ -1674,6 +1706,8 @@ class Context:
|
||||||
) from verr
|
) from verr
|
||||||
|
|
||||||
self._started_called = True
|
self._started_called = True
|
||||||
|
self._started_msg = started_msg
|
||||||
|
self._started_pld = value
|
||||||
|
|
||||||
async def _drain_overflows(
|
async def _drain_overflows(
|
||||||
self,
|
self,
|
||||||
|
@ -1961,6 +1995,7 @@ async def open_context_from_portal(
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
func: Callable,
|
func: Callable,
|
||||||
|
|
||||||
|
pld_spec: TypeAlias|None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
|
|
||||||
# TODO: if we set this the wrapping `@acm` body will
|
# TODO: if we set this the wrapping `@acm` body will
|
||||||
|
@ -2026,7 +2061,7 @@ async def open_context_from_portal(
|
||||||
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
||||||
# with "self" since the local feeder mem-chan processing
|
# with "self" since the local feeder mem-chan processing
|
||||||
# is not built for it.
|
# is not built for it.
|
||||||
if portal.channel.uid == portal.actor.uid:
|
if (uid := portal.channel.uid) == portal.actor.uid:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
'** !! Invalid Operation !! **\n'
|
'** !! Invalid Operation !! **\n'
|
||||||
'Can not open an IPC ctx with the local actor!\n'
|
'Can not open an IPC ctx with the local actor!\n'
|
||||||
|
@ -2054,6 +2089,21 @@ async def open_context_from_portal(
|
||||||
assert ctx._caller_info
|
assert ctx._caller_info
|
||||||
_ctxvar_Context.set(ctx)
|
_ctxvar_Context.set(ctx)
|
||||||
|
|
||||||
|
# placeholder for any exception raised in the runtime
|
||||||
|
# or by user tasks which cause this context's closure.
|
||||||
|
scope_err: BaseException|None = None
|
||||||
|
ctxc_from_callee: ContextCancelled|None = None
|
||||||
|
try:
|
||||||
|
async with (
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
msgops.maybe_limit_plds(
|
||||||
|
ctx=ctx,
|
||||||
|
spec=pld_spec,
|
||||||
|
) as maybe_msgdec,
|
||||||
|
):
|
||||||
|
if maybe_msgdec:
|
||||||
|
assert maybe_msgdec.pld_spec == pld_spec
|
||||||
|
|
||||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||||
# `Started`-msg any cancellation triggered
|
# `Started`-msg any cancellation triggered
|
||||||
# in `._maybe_cancel_and_set_remote_error()` will
|
# in `._maybe_cancel_and_set_remote_error()` will
|
||||||
|
@ -2061,25 +2111,23 @@ async def open_context_from_portal(
|
||||||
# -> it's expected that if there is an error in this phase of
|
# -> it's expected that if there is an error in this phase of
|
||||||
# the dialog, the `Error` msg should be raised from the `msg`
|
# the dialog, the `Error` msg should be raised from the `msg`
|
||||||
# handling block below.
|
# handling block below.
|
||||||
first: Any = await ctx._pld_rx.recv_pld(
|
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
|
||||||
ctx=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
|
passthrough_non_pld_msgs=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# from .devx import pause
|
||||||
|
# await pause()
|
||||||
ctx._started_called: bool = True
|
ctx._started_called: bool = True
|
||||||
|
ctx._started_msg: bool = started_msg
|
||||||
|
ctx._started_pld: bool = first
|
||||||
|
|
||||||
uid: tuple = portal.channel.uid
|
# NOTE: this in an implicit runtime nursery used to,
|
||||||
cid: str = ctx.cid
|
# - start overrun queuing tasks when as well as
|
||||||
|
# for cancellation of the scope opened by the user.
|
||||||
# placeholder for any exception raised in the runtime
|
ctx._scope_nursery: trio.Nursery = tn
|
||||||
# or by user tasks which cause this context's closure.
|
ctx._scope: trio.CancelScope = tn.cancel_scope
|
||||||
scope_err: BaseException|None = None
|
|
||||||
ctxc_from_callee: ContextCancelled|None = None
|
|
||||||
try:
|
|
||||||
async with trio.open_nursery() as nurse:
|
|
||||||
|
|
||||||
# NOTE: used to start overrun queuing tasks
|
|
||||||
ctx._scope_nursery: trio.Nursery = nurse
|
|
||||||
ctx._scope: trio.CancelScope = nurse.cancel_scope
|
|
||||||
|
|
||||||
# deliver context instance and .started() msg value
|
# deliver context instance and .started() msg value
|
||||||
# in enter tuple.
|
# in enter tuple.
|
||||||
|
@ -2126,13 +2174,13 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
# when in allow_overruns mode there may be
|
# when in allow_overruns mode there may be
|
||||||
# lingering overflow sender tasks remaining?
|
# lingering overflow sender tasks remaining?
|
||||||
if nurse.child_tasks:
|
if tn.child_tasks:
|
||||||
# XXX: ensure we are in overrun state
|
# XXX: ensure we are in overrun state
|
||||||
# with ``._allow_overruns=True`` bc otherwise
|
# with ``._allow_overruns=True`` bc otherwise
|
||||||
# there should be no tasks in this nursery!
|
# there should be no tasks in this nursery!
|
||||||
if (
|
if (
|
||||||
not ctx._allow_overruns
|
not ctx._allow_overruns
|
||||||
or len(nurse.child_tasks) > 1
|
or len(tn.child_tasks) > 1
|
||||||
):
|
):
|
||||||
raise InternalError(
|
raise InternalError(
|
||||||
'Context has sub-tasks but is '
|
'Context has sub-tasks but is '
|
||||||
|
@ -2304,7 +2352,7 @@ async def open_context_from_portal(
|
||||||
):
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
'IPC connection for context is broken?\n'
|
'IPC connection for context is broken?\n'
|
||||||
f'task:{cid}\n'
|
f'task: {ctx.cid}\n'
|
||||||
f'actor: {uid}'
|
f'actor: {uid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2455,9 +2503,8 @@ async def open_context_from_portal(
|
||||||
and ctx.cancel_acked
|
and ctx.cancel_acked
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Context cancelled by {ctx.side!r}-side task\n'
|
f'Context cancelled by {ctx.side!r}-side task\n'
|
||||||
f'|_{ctx._task}\n\n'
|
f'|_{ctx._task}\n\n'
|
||||||
|
|
||||||
f'{repr(scope_err)}\n'
|
f'{repr(scope_err)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2485,7 +2532,7 @@ async def open_context_from_portal(
|
||||||
f'cid: {ctx.cid}\n'
|
f'cid: {ctx.cid}\n'
|
||||||
)
|
)
|
||||||
portal.actor._contexts.pop(
|
portal.actor._contexts.pop(
|
||||||
(uid, cid),
|
(uid, ctx.cid),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2513,11 +2560,12 @@ def mk_context(
|
||||||
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
||||||
|
|
||||||
# TODO: only scan caller-info if log level so high!
|
# TODO: only scan caller-info if log level so high!
|
||||||
from .devx._code import find_caller_info
|
from .devx._frame_stack import find_caller_info
|
||||||
caller_info: CallerInfo|None = find_caller_info()
|
caller_info: CallerInfo|None = find_caller_info()
|
||||||
|
|
||||||
# TODO: when/how do we apply `.limit_plds()` from here?
|
pld_rx = msgops.PldRx(
|
||||||
pld_rx: msgops.PldRx = msgops.current_pldrx()
|
_pld_dec=msgops._def_any_pldec,
|
||||||
|
)
|
||||||
|
|
||||||
ctx = Context(
|
ctx = Context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
|
@ -2531,13 +2579,16 @@ def mk_context(
|
||||||
_caller_info=caller_info,
|
_caller_info=caller_info,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
pld_rx._ctx = ctx
|
||||||
ctx._result = Unresolved
|
ctx._result = Unresolved
|
||||||
return ctx
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
# TODO: use the new type-parameters to annotate this in 3.13?
|
# TODO: use the new type-parameters to annotate this in 3.13?
|
||||||
# -[ ] https://peps.python.org/pep-0718/#unknown-types
|
# -[ ] https://peps.python.org/pep-0718/#unknown-types
|
||||||
def context(func: Callable) -> Callable:
|
def context(
|
||||||
|
func: Callable,
|
||||||
|
) -> Callable:
|
||||||
'''
|
'''
|
||||||
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
||||||
child-`trio.Task`, IPC endpoint otherwise known more
|
child-`trio.Task`, IPC endpoint otherwise known more
|
||||||
|
|
|
@ -716,4 +716,5 @@ async def _connect_chan(
|
||||||
chan = Channel((host, port))
|
chan = Channel((host, port))
|
||||||
await chan.connect()
|
await chan.connect()
|
||||||
yield chan
|
yield chan
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
await chan.aclose()
|
await chan.aclose()
|
||||||
|
|
|
@ -47,6 +47,7 @@ from ._ipc import Channel
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
# Error,
|
# Error,
|
||||||
|
PayloadMsg,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
|
@ -98,7 +99,8 @@ class Portal:
|
||||||
|
|
||||||
self.chan = channel
|
self.chan = channel
|
||||||
# during the portal's lifetime
|
# during the portal's lifetime
|
||||||
self._final_result: Any|None = None
|
self._final_result_pld: Any|None = None
|
||||||
|
self._final_result_msg: PayloadMsg|None = None
|
||||||
|
|
||||||
# When set to a ``Context`` (when _submit_for_result is called)
|
# When set to a ``Context`` (when _submit_for_result is called)
|
||||||
# it is expected that ``result()`` will be awaited at some
|
# it is expected that ``result()`` will be awaited at some
|
||||||
|
@ -132,7 +134,7 @@ class Portal:
|
||||||
'A pending main result has already been submitted'
|
'A pending main result has already been submitted'
|
||||||
)
|
)
|
||||||
|
|
||||||
self._expect_result_ctx = await self.actor.start_remote_task(
|
self._expect_result_ctx: Context = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=NamespacePath(f'{ns}:{func}'),
|
nsf=NamespacePath(f'{ns}:{func}'),
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
@ -163,13 +165,22 @@ class Portal:
|
||||||
# expecting a "main" result
|
# expecting a "main" result
|
||||||
assert self._expect_result_ctx
|
assert self._expect_result_ctx
|
||||||
|
|
||||||
if self._final_result is None:
|
if self._final_result_msg is None:
|
||||||
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
|
try:
|
||||||
ctx=self._expect_result_ctx,
|
(
|
||||||
|
self._final_result_msg,
|
||||||
|
self._final_result_pld,
|
||||||
|
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
|
||||||
|
ipc=self._expect_result_ctx,
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
except BaseException as err:
|
||||||
|
# TODO: wrap this into `@api_frame` optionally with
|
||||||
|
# some kinda filtering mechanism like log levels?
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
raise err
|
||||||
|
|
||||||
return self._final_result
|
return self._final_result_pld
|
||||||
|
|
||||||
async def _cancel_streams(self):
|
async def _cancel_streams(self):
|
||||||
# terminate all locally running async generator
|
# terminate all locally running async generator
|
||||||
|
@ -301,7 +312,7 @@ class Portal:
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
return await ctx._pld_rx.recv_pld(
|
return await ctx._pld_rx.recv_pld(
|
||||||
ctx=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -320,6 +331,8 @@ class Portal:
|
||||||
remote rpc task or a local async generator instance.
|
remote rpc task or a local async generator instance.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
|
||||||
if isinstance(func, str):
|
if isinstance(func, str):
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
"`Portal.run(namespace: str, funcname: str)` is now"
|
"`Portal.run(namespace: str, funcname: str)` is now"
|
||||||
|
@ -353,7 +366,7 @@ class Portal:
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
return await ctx._pld_rx.recv_pld(
|
return await ctx._pld_rx.recv_pld(
|
||||||
ctx=ctx,
|
ipc=ctx,
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
Root actor runtime ignition(s).
|
Root actor runtime ignition(s).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
|
@ -60,7 +60,7 @@ _default_lo_addrs: list[tuple[str, int]] = [(
|
||||||
logger = log.get_logger('tractor')
|
logger = log.get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def open_root_actor(
|
async def open_root_actor(
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
@ -96,6 +96,7 @@ async def open_root_actor(
|
||||||
Runtime init entry point for ``tractor``.
|
Runtime init entry point for ``tractor``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__ = True
|
||||||
# TODO: stick this in a `@cm` defined in `devx._debug`?
|
# TODO: stick this in a `@cm` defined in `devx._debug`?
|
||||||
#
|
#
|
||||||
# Override the global debugger hook to make it play nice with
|
# Override the global debugger hook to make it play nice with
|
||||||
|
@ -358,7 +359,11 @@ async def open_root_actor(
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
) as err:
|
) as err:
|
||||||
|
|
||||||
entered: bool = await _debug._maybe_enter_pm(err)
|
import inspect
|
||||||
|
entered: bool = await _debug._maybe_enter_pm(
|
||||||
|
err,
|
||||||
|
api_frame=inspect.currentframe(),
|
||||||
|
)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not entered
|
not entered
|
||||||
|
|
|
@ -70,7 +70,6 @@ from .msg import (
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
CancelAck,
|
CancelAck,
|
||||||
Error,
|
Error,
|
||||||
Msg,
|
|
||||||
MsgType,
|
MsgType,
|
||||||
Return,
|
Return,
|
||||||
Start,
|
Start,
|
||||||
|
@ -250,10 +249,17 @@ async def _errors_relayed_via_ipc(
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
# NOTE: we normally always hide this frame in call-stack tracebacks
|
||||||
|
# if the crash originated from an RPC task (since normally the
|
||||||
|
# user is only going to care about their own code not this
|
||||||
|
# internal runtime frame) and we DID NOT
|
||||||
|
# fail due to an IPC transport error!
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
# TODO: a debug nursery when in debug mode!
|
# TODO: a debug nursery when in debug mode!
|
||||||
# async with maybe_open_debugger_nursery() as debug_tn:
|
# async with maybe_open_debugger_nursery() as debug_tn:
|
||||||
# => see matching comment in side `._debug._pause()`
|
# => see matching comment in side `._debug._pause()`
|
||||||
|
rpc_err: BaseException|None = None
|
||||||
try:
|
try:
|
||||||
yield # run RPC invoke body
|
yield # run RPC invoke body
|
||||||
|
|
||||||
|
@ -264,16 +270,7 @@ async def _errors_relayed_via_ipc(
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
KeyboardInterrupt,
|
KeyboardInterrupt,
|
||||||
) as err:
|
) as err:
|
||||||
|
rpc_err = err
|
||||||
# NOTE: always hide this frame from debug REPL call stack
|
|
||||||
# if the crash originated from an RPC task and we DID NOT
|
|
||||||
# fail due to an IPC transport error!
|
|
||||||
if (
|
|
||||||
is_rpc
|
|
||||||
and
|
|
||||||
chan.connected()
|
|
||||||
):
|
|
||||||
__tracebackhide__: bool = hide_tb
|
|
||||||
|
|
||||||
# TODO: maybe we'll want different "levels" of debugging
|
# TODO: maybe we'll want different "levels" of debugging
|
||||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||||
|
@ -318,11 +315,19 @@ async def _errors_relayed_via_ipc(
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
)
|
)
|
||||||
if not entered_debug:
|
if not entered_debug:
|
||||||
|
# if we prolly should have entered the REPL but
|
||||||
|
# didn't, maybe there was an internal error in
|
||||||
|
# the above code and we do want to show this
|
||||||
|
# frame!
|
||||||
|
if _state.debug_mode():
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
log.exception(
|
log.exception(
|
||||||
'RPC task crashed\n'
|
'RPC task crashed\n'
|
||||||
f'|_{ctx}'
|
f'|_{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# ALWAYS try to ship RPC errors back to parent/caller task
|
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
|
|
||||||
|
@ -355,6 +360,20 @@ async def _errors_relayed_via_ipc(
|
||||||
# `Actor._service_n`, we add "handles" to each such that
|
# `Actor._service_n`, we add "handles" to each such that
|
||||||
# they can be individually ccancelled.
|
# they can be individually ccancelled.
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
|
# if the error is not from user code and instead a failure
|
||||||
|
# of a runtime RPC or transport failure we do prolly want to
|
||||||
|
# show this frame
|
||||||
|
if (
|
||||||
|
rpc_err
|
||||||
|
and (
|
||||||
|
not is_rpc
|
||||||
|
or
|
||||||
|
not chan.connected()
|
||||||
|
)
|
||||||
|
):
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ctx: Context
|
ctx: Context
|
||||||
func: Callable
|
func: Callable
|
||||||
|
@ -444,9 +463,10 @@ async def _invoke(
|
||||||
# open the stream with this option.
|
# open the stream with this option.
|
||||||
# allow_overruns=True,
|
# allow_overruns=True,
|
||||||
)
|
)
|
||||||
context: bool = False
|
context_ep_func: bool = False
|
||||||
|
|
||||||
assert not _state._ctxvar_Context.get()
|
# set the current IPC ctx var for this RPC task
|
||||||
|
_state._ctxvar_Context.set(ctx)
|
||||||
|
|
||||||
# TODO: deprecate this style..
|
# TODO: deprecate this style..
|
||||||
if getattr(func, '_tractor_stream_function', False):
|
if getattr(func, '_tractor_stream_function', False):
|
||||||
|
@ -475,7 +495,7 @@ async def _invoke(
|
||||||
# handle decorated ``@tractor.context`` async function
|
# handle decorated ``@tractor.context`` async function
|
||||||
elif getattr(func, '_tractor_context_function', False):
|
elif getattr(func, '_tractor_context_function', False):
|
||||||
kwargs['ctx'] = ctx
|
kwargs['ctx'] = ctx
|
||||||
context = True
|
context_ep_func = True
|
||||||
|
|
||||||
# errors raised inside this block are propgated back to caller
|
# errors raised inside this block are propgated back to caller
|
||||||
async with _errors_relayed_via_ipc(
|
async with _errors_relayed_via_ipc(
|
||||||
|
@ -501,7 +521,7 @@ async def _invoke(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# TODO: impl all these cases in terms of the `Context` one!
|
# TODO: impl all these cases in terms of the `Context` one!
|
||||||
if not context:
|
if not context_ep_func:
|
||||||
await _invoke_non_context(
|
await _invoke_non_context(
|
||||||
actor,
|
actor,
|
||||||
cancel_scope,
|
cancel_scope,
|
||||||
|
@ -571,7 +591,6 @@ async def _invoke(
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as tn:
|
||||||
ctx._scope_nursery = tn
|
ctx._scope_nursery = tn
|
||||||
ctx._scope = tn.cancel_scope
|
ctx._scope = tn.cancel_scope
|
||||||
_state._ctxvar_Context.set(ctx)
|
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
|
||||||
# TODO: should would be nice to have our
|
# TODO: should would be nice to have our
|
||||||
|
@ -831,7 +850,7 @@ async def process_messages(
|
||||||
(as utilized inside `Portal.cancel_actor()` ).
|
(as utilized inside `Portal.cancel_actor()` ).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert actor._service_n # state sanity
|
assert actor._service_n # runtime state sanity
|
||||||
|
|
||||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||||
# should use it?
|
# should use it?
|
||||||
|
@ -844,7 +863,7 @@ async def process_messages(
|
||||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
||||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
||||||
nursery_cancelled_before_task: bool = False
|
nursery_cancelled_before_task: bool = False
|
||||||
msg: Msg|None = None
|
msg: MsgType|None = None
|
||||||
try:
|
try:
|
||||||
# NOTE: this internal scope allows for keeping this
|
# NOTE: this internal scope allows for keeping this
|
||||||
# message loop running despite the current task having
|
# message loop running despite the current task having
|
||||||
|
|
|
@ -644,7 +644,7 @@ class Actor:
|
||||||
peers_str: str = ''
|
peers_str: str = ''
|
||||||
for uid, chans in self._peers.items():
|
for uid, chans in self._peers.items():
|
||||||
peers_str += (
|
peers_str += (
|
||||||
f'|_ uid: {uid}\n'
|
f'uid: {uid}\n'
|
||||||
)
|
)
|
||||||
for i, chan in enumerate(chans):
|
for i, chan in enumerate(chans):
|
||||||
peers_str += (
|
peers_str += (
|
||||||
|
@ -678,10 +678,12 @@ class Actor:
|
||||||
# XXX => YES IT DOES, when i was testing ctl-c
|
# XXX => YES IT DOES, when i was testing ctl-c
|
||||||
# from broken debug TTY locking due to
|
# from broken debug TTY locking due to
|
||||||
# msg-spec races on application using RunVar...
|
# msg-spec races on application using RunVar...
|
||||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
|
||||||
if (
|
if (
|
||||||
pdb_user_uid
|
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||||
and local_nursery
|
and
|
||||||
|
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||||
|
and
|
||||||
|
local_nursery
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(
|
entry: tuple|None = local_nursery._children.get(
|
||||||
tuple(pdb_user_uid)
|
tuple(pdb_user_uid)
|
||||||
|
@ -1169,13 +1171,17 @@ class Actor:
|
||||||
|
|
||||||
# kill any debugger request task to avoid deadlock
|
# kill any debugger request task to avoid deadlock
|
||||||
# with the root actor in this tree
|
# with the root actor in this tree
|
||||||
dbcs = _debug.DebugStatus.req_cs
|
debug_req = _debug.DebugStatus
|
||||||
if dbcs is not None:
|
lock_req_ctx: Context = debug_req.req_ctx
|
||||||
|
if lock_req_ctx is not None:
|
||||||
msg += (
|
msg += (
|
||||||
'-> Cancelling active debugger request..\n'
|
'-> Cancelling active debugger request..\n'
|
||||||
f'|_{_debug.Lock.pformat()}'
|
f'|_{_debug.Lock.repr()}\n\n'
|
||||||
|
f'|_{lock_req_ctx}\n\n'
|
||||||
)
|
)
|
||||||
dbcs.cancel()
|
# lock_req_ctx._scope.cancel()
|
||||||
|
# TODO: wrap this in a method-API..
|
||||||
|
debug_req.req_cs.cancel()
|
||||||
|
|
||||||
# self-cancel **all** ongoing RPC tasks
|
# self-cancel **all** ongoing RPC tasks
|
||||||
await self.cancel_rpc_tasks(
|
await self.cancel_rpc_tasks(
|
||||||
|
@ -1375,15 +1381,17 @@ class Actor:
|
||||||
"IPC channel's "
|
"IPC channel's "
|
||||||
)
|
)
|
||||||
rent_chan_repr: str = (
|
rent_chan_repr: str = (
|
||||||
f'|_{parent_chan}'
|
f' |_{parent_chan}\n\n'
|
||||||
if parent_chan
|
if parent_chan
|
||||||
else ''
|
else ''
|
||||||
)
|
)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
|
f'Cancelling {descr} RPC tasks\n\n'
|
||||||
f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
|
f'<= canceller: {req_uid}\n'
|
||||||
f' {rent_chan_repr}\n'
|
f'{rent_chan_repr}'
|
||||||
# f'{self}\n'
|
f'=> cancellee: {self.uid}\n'
|
||||||
|
f' |_{self}.cancel_rpc_tasks()\n'
|
||||||
|
f' |_tasks: {len(tasks)}\n'
|
||||||
# f'{tasks_str}'
|
# f'{tasks_str}'
|
||||||
)
|
)
|
||||||
for (
|
for (
|
||||||
|
@ -1413,7 +1421,7 @@ class Actor:
|
||||||
if tasks:
|
if tasks:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Waiting for remaining rpc tasks to complete\n'
|
'Waiting for remaining rpc tasks to complete\n'
|
||||||
f'|_{tasks}'
|
f'|_{tasks_str}'
|
||||||
)
|
)
|
||||||
await self._ongoing_rpc_tasks.wait()
|
await self._ongoing_rpc_tasks.wait()
|
||||||
|
|
||||||
|
@ -1466,7 +1474,10 @@ class Actor:
|
||||||
assert self._parent_chan, "No parent channel for this actor?"
|
assert self._parent_chan, "No parent channel for this actor?"
|
||||||
return Portal(self._parent_chan)
|
return Portal(self._parent_chan)
|
||||||
|
|
||||||
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
|
def get_chans(
|
||||||
|
self,
|
||||||
|
uid: tuple[str, str],
|
||||||
|
) -> list[Channel]:
|
||||||
'''
|
'''
|
||||||
Return all IPC channels to the actor with provided `uid`.
|
Return all IPC channels to the actor with provided `uid`.
|
||||||
|
|
||||||
|
@ -1626,7 +1637,9 @@ async def async_main(
|
||||||
# tranport address bind errors - normally it's
|
# tranport address bind errors - normally it's
|
||||||
# something silly like the wrong socket-address
|
# something silly like the wrong socket-address
|
||||||
# passed via a config or CLI Bo
|
# passed via a config or CLI Bo
|
||||||
entered_debug = await _debug._maybe_enter_pm(oserr)
|
entered_debug = await _debug._maybe_enter_pm(
|
||||||
|
oserr,
|
||||||
|
)
|
||||||
if entered_debug:
|
if entered_debug:
|
||||||
log.runtime('Exited debug REPL..')
|
log.runtime('Exited debug REPL..')
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -142,7 +142,9 @@ async def exhaust_portal(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
try:
|
try:
|
||||||
log.debug(f"Waiting on final result from {actor.uid}")
|
log.debug(
|
||||||
|
f'Waiting on final result from {actor.uid}'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: streams should never be reaped here since they should
|
# XXX: streams should never be reaped here since they should
|
||||||
# always be established and shutdown using a context manager api
|
# always be established and shutdown using a context manager api
|
||||||
|
@ -195,7 +197,10 @@ async def cancel_on_completion(
|
||||||
# if this call errors we store the exception for later
|
# if this call errors we store the exception for later
|
||||||
# in ``errors`` which will be reraised inside
|
# in ``errors`` which will be reraised inside
|
||||||
# an exception group and we still send out a cancel request
|
# an exception group and we still send out a cancel request
|
||||||
result: Any|Exception = await exhaust_portal(portal, actor)
|
result: Any|Exception = await exhaust_portal(
|
||||||
|
portal,
|
||||||
|
actor,
|
||||||
|
)
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors[actor.uid]: Exception = result
|
errors[actor.uid]: Exception = result
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -503,14 +508,6 @@ async def trio_proc(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# await chan.send({
|
|
||||||
# '_parent_main_data': subactor._parent_main_data,
|
|
||||||
# 'enable_modules': subactor.enable_modules,
|
|
||||||
# 'reg_addrs': subactor.reg_addrs,
|
|
||||||
# 'bind_addrs': bind_addrs,
|
|
||||||
# '_runtime_vars': _runtime_vars,
|
|
||||||
# })
|
|
||||||
|
|
||||||
# track subactor in current nursery
|
# track subactor in current nursery
|
||||||
curr_actor: Actor = current_actor()
|
curr_actor: Actor = current_actor()
|
||||||
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||||
|
@ -554,8 +551,8 @@ async def trio_proc(
|
||||||
# killing the process too early.
|
# killing the process too early.
|
||||||
if proc:
|
if proc:
|
||||||
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
# don't clobber an ongoing pdb
|
# don't clobber an ongoing pdb
|
||||||
if cancelled_during_spawn:
|
if cancelled_during_spawn:
|
||||||
# Try again to avoid TTY clobbering.
|
# Try again to avoid TTY clobbering.
|
||||||
|
|
|
@ -124,9 +124,15 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def current_ipc_ctx() -> Context:
|
def current_ipc_ctx(
|
||||||
|
error_on_not_set: bool = False,
|
||||||
|
) -> Context|None:
|
||||||
ctx: Context = _ctxvar_Context.get()
|
ctx: Context = _ctxvar_Context.get()
|
||||||
if not ctx:
|
|
||||||
|
if (
|
||||||
|
not ctx
|
||||||
|
and error_on_not_set
|
||||||
|
):
|
||||||
from ._exceptions import InternalError
|
from ._exceptions import InternalError
|
||||||
raise InternalError(
|
raise InternalError(
|
||||||
'No IPC context has been allocated for this task yet?\n'
|
'No IPC context has been allocated for this task yet?\n'
|
||||||
|
|
|
@ -52,6 +52,7 @@ from tractor.msg import (
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._context import Context
|
from ._context import Context
|
||||||
|
from ._ipc import Channel
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -65,10 +66,10 @@ log = get_logger(__name__)
|
||||||
class MsgStream(trio.abc.Channel):
|
class MsgStream(trio.abc.Channel):
|
||||||
'''
|
'''
|
||||||
A bidirectional message stream for receiving logically sequenced
|
A bidirectional message stream for receiving logically sequenced
|
||||||
values over an inter-actor IPC ``Channel``.
|
values over an inter-actor IPC `Channel`.
|
||||||
|
|
||||||
This is the type returned to a local task which entered either
|
This is the type returned to a local task which entered either
|
||||||
``Portal.open_stream_from()`` or ``Context.open_stream()``.
|
`Portal.open_stream_from()` or `Context.open_stream()`.
|
||||||
|
|
||||||
Termination rules:
|
Termination rules:
|
||||||
|
|
||||||
|
@ -95,6 +96,22 @@ class MsgStream(trio.abc.Channel):
|
||||||
self._eoc: bool|trio.EndOfChannel = False
|
self._eoc: bool|trio.EndOfChannel = False
|
||||||
self._closed: bool|trio.ClosedResourceError = False
|
self._closed: bool|trio.ClosedResourceError = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ctx(self) -> Context:
|
||||||
|
'''
|
||||||
|
This stream's IPC `Context` ref.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._ctx
|
||||||
|
|
||||||
|
@property
|
||||||
|
def chan(self) -> Channel:
|
||||||
|
'''
|
||||||
|
Ref to the containing `Context`'s transport `Channel`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._ctx.chan
|
||||||
|
|
||||||
# TODO: could we make this a direct method bind to `PldRx`?
|
# TODO: could we make this a direct method bind to `PldRx`?
|
||||||
# -> receive_nowait = PldRx.recv_pld
|
# -> receive_nowait = PldRx.recv_pld
|
||||||
# |_ means latter would have to accept `MsgStream`-as-`self`?
|
# |_ means latter would have to accept `MsgStream`-as-`self`?
|
||||||
|
@ -109,7 +126,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
):
|
):
|
||||||
ctx: Context = self._ctx
|
ctx: Context = self._ctx
|
||||||
return ctx._pld_rx.recv_pld_nowait(
|
return ctx._pld_rx.recv_pld_nowait(
|
||||||
ctx=ctx,
|
ipc=self,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -148,7 +165,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
try:
|
try:
|
||||||
|
|
||||||
ctx: Context = self._ctx
|
ctx: Context = self._ctx
|
||||||
return await ctx._pld_rx.recv_pld(ctx=ctx)
|
return await ctx._pld_rx.recv_pld(ipc=self)
|
||||||
|
|
||||||
# XXX: the stream terminates on either of:
|
# XXX: the stream terminates on either of:
|
||||||
# - via `self._rx_chan.receive()` raising after manual closure
|
# - via `self._rx_chan.receive()` raising after manual closure
|
||||||
|
|
|
@ -84,6 +84,7 @@ class ActorNursery:
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
errors: dict[tuple[str, str], BaseException],
|
errors: dict[tuple[str, str], BaseException],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
self._actor: Actor = actor
|
self._actor: Actor = actor
|
||||||
|
@ -105,6 +106,7 @@ class ActorNursery:
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.exited = trio.Event()
|
self.exited = trio.Event()
|
||||||
|
self._scope_error: BaseException|None = None
|
||||||
|
|
||||||
# NOTE: when no explicit call is made to
|
# NOTE: when no explicit call is made to
|
||||||
# `.open_root_actor()` by application code,
|
# `.open_root_actor()` by application code,
|
||||||
|
@ -117,7 +119,9 @@ class ActorNursery:
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
|
||||||
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
|
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
|
||||||
rpc_module_paths: list[str]|None = None,
|
rpc_module_paths: list[str]|None = None,
|
||||||
enable_modules: list[str]|None = None,
|
enable_modules: list[str]|None = None,
|
||||||
|
@ -125,6 +129,7 @@ class ActorNursery:
|
||||||
nursery: trio.Nursery|None = None,
|
nursery: trio.Nursery|None = None,
|
||||||
debug_mode: bool|None = None,
|
debug_mode: bool|None = None,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
'''
|
'''
|
||||||
Start a (daemon) actor: an process that has no designated
|
Start a (daemon) actor: an process that has no designated
|
||||||
|
@ -189,6 +194,13 @@ class ActorNursery:
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: DEPRECATE THIS:
|
||||||
|
# -[ ] impl instead as a hilevel wrapper on
|
||||||
|
# top of a `@context` style invocation.
|
||||||
|
# |_ dynamic @context decoration on child side
|
||||||
|
# |_ implicit `Portal.open_context() as (ctx, first):`
|
||||||
|
# and `return first` on parent side.
|
||||||
|
# -[ ] use @api_frame on the wrapper
|
||||||
async def run_in_actor(
|
async def run_in_actor(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
|
@ -221,7 +233,7 @@ class ActorNursery:
|
||||||
# use the explicit function name if not provided
|
# use the explicit function name if not provided
|
||||||
name = fn.__name__
|
name = fn.__name__
|
||||||
|
|
||||||
portal = await self.start_actor(
|
portal: Portal = await self.start_actor(
|
||||||
name,
|
name,
|
||||||
enable_modules=[mod_path] + (
|
enable_modules=[mod_path] + (
|
||||||
enable_modules or rpc_module_paths or []
|
enable_modules or rpc_module_paths or []
|
||||||
|
@ -250,6 +262,7 @@ class ActorNursery:
|
||||||
)
|
)
|
||||||
return portal
|
return portal
|
||||||
|
|
||||||
|
# @api_frame
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
hard_kill: bool = False,
|
hard_kill: bool = False,
|
||||||
|
@ -347,8 +360,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
|
||||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
|
|
||||||
# TODO: yay or nay?
|
# normally don't need to show user by default
|
||||||
__tracebackhide__ = True
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
|
outer_err: BaseException|None = None
|
||||||
|
inner_err: BaseException|None = None
|
||||||
|
|
||||||
# the collection of errors retreived from spawned sub-actors
|
# the collection of errors retreived from spawned sub-actors
|
||||||
errors: dict[tuple[str, str], BaseException] = {}
|
errors: dict[tuple[str, str], BaseException] = {}
|
||||||
|
@ -358,7 +374,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# handling errors that are generated by the inner nursery in
|
# handling errors that are generated by the inner nursery in
|
||||||
# a supervisor strategy **before** blocking indefinitely to wait for
|
# a supervisor strategy **before** blocking indefinitely to wait for
|
||||||
# actors spawned in "daemon mode" (aka started using
|
# actors spawned in "daemon mode" (aka started using
|
||||||
# ``ActorNursery.start_actor()``).
|
# `ActorNursery.start_actor()`).
|
||||||
|
|
||||||
# errors from this daemon actor nursery bubble up to caller
|
# errors from this daemon actor nursery bubble up to caller
|
||||||
async with trio.open_nursery() as da_nursery:
|
async with trio.open_nursery() as da_nursery:
|
||||||
|
@ -393,7 +409,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
)
|
)
|
||||||
an._join_procs.set()
|
an._join_procs.set()
|
||||||
|
|
||||||
except BaseException as inner_err:
|
except BaseException as _inner_err:
|
||||||
|
inner_err = _inner_err
|
||||||
errors[actor.uid] = inner_err
|
errors[actor.uid] = inner_err
|
||||||
|
|
||||||
# If we error in the root but the debugger is
|
# If we error in the root but the debugger is
|
||||||
|
@ -471,8 +488,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
Exception,
|
Exception,
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
trio.Cancelled
|
trio.Cancelled
|
||||||
|
) as _outer_err:
|
||||||
|
outer_err = _outer_err
|
||||||
|
|
||||||
) as err:
|
an._scope_error = outer_err or inner_err
|
||||||
|
|
||||||
# XXX: yet another guard before allowing the cancel
|
# XXX: yet another guard before allowing the cancel
|
||||||
# sequence in case a (single) child is in debug.
|
# sequence in case a (single) child is in debug.
|
||||||
|
@ -487,7 +506,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
if an._children:
|
if an._children:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Actor-nursery cancelling due error type:\n'
|
'Actor-nursery cancelling due error type:\n'
|
||||||
f'{err}\n'
|
f'{outer_err}\n'
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
@ -514,11 +533,19 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
else:
|
else:
|
||||||
raise list(errors.values())[0]
|
raise list(errors.values())[0]
|
||||||
|
|
||||||
|
# show frame on any (likely) internal error
|
||||||
|
if (
|
||||||
|
not an.cancelled
|
||||||
|
and an._scope_error
|
||||||
|
):
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
# da_nursery scope end - nursery checkpoint
|
# da_nursery scope end - nursery checkpoint
|
||||||
# final exit
|
# final exit
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
# @api_frame
|
||||||
async def open_nursery(
|
async def open_nursery(
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -538,6 +565,7 @@ async def open_nursery(
|
||||||
which cancellation scopes correspond to each spawned subactor set.
|
which cancellation scopes correspond to each spawned subactor set.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = True
|
||||||
implicit_runtime: bool = False
|
implicit_runtime: bool = False
|
||||||
actor: Actor = current_actor(err_on_no_runtime=False)
|
actor: Actor = current_actor(err_on_no_runtime=False)
|
||||||
an: ActorNursery|None = None
|
an: ActorNursery|None = None
|
||||||
|
@ -588,6 +616,14 @@ async def open_nursery(
|
||||||
an.exited.set()
|
an.exited.set()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
# show frame on any internal runtime-scope error
|
||||||
|
if (
|
||||||
|
an
|
||||||
|
and not an.cancelled
|
||||||
|
and an._scope_error
|
||||||
|
):
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
msg: str = (
|
msg: str = (
|
||||||
'Actor-nursery exited\n'
|
'Actor-nursery exited\n'
|
||||||
f'|_{an}\n'
|
f'|_{an}\n'
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -20,11 +20,8 @@ as it pertains to improving the grok-ability of our runtime!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from functools import partial
|
||||||
import inspect
|
import inspect
|
||||||
# import msgspec
|
|
||||||
# from pprint import pformat
|
|
||||||
import textwrap
|
|
||||||
import traceback
|
|
||||||
from types import (
|
from types import (
|
||||||
FrameType,
|
FrameType,
|
||||||
FunctionType,
|
FunctionType,
|
||||||
|
@ -32,9 +29,8 @@ from types import (
|
||||||
# CodeType,
|
# CodeType,
|
||||||
)
|
)
|
||||||
from typing import (
|
from typing import (
|
||||||
# Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
# TYPE_CHECKING,
|
|
||||||
Type,
|
Type,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -42,6 +38,7 @@ from tractor.msg import (
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
)
|
)
|
||||||
|
import wrapt
|
||||||
|
|
||||||
|
|
||||||
# TODO: yeah, i don't love this and we should prolly just
|
# TODO: yeah, i don't love this and we should prolly just
|
||||||
|
@ -83,6 +80,31 @@ def get_class_from_frame(fr: FrameType) -> (
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_ns_and_func_from_frame(
|
||||||
|
frame: FrameType,
|
||||||
|
) -> Callable:
|
||||||
|
'''
|
||||||
|
Return the corresponding function object reference from
|
||||||
|
a `FrameType`, and return it and it's parent namespace `dict`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
ns: dict[str, Any]
|
||||||
|
|
||||||
|
# for a method, go up a frame and lookup the name in locals()
|
||||||
|
if '.' in (qualname := frame.f_code.co_qualname):
|
||||||
|
cls_name, _, func_name = qualname.partition('.')
|
||||||
|
ns = frame.f_back.f_locals[cls_name].__dict__
|
||||||
|
|
||||||
|
else:
|
||||||
|
func_name: str = frame.f_code.co_name
|
||||||
|
ns = frame.f_globals
|
||||||
|
|
||||||
|
return (
|
||||||
|
ns,
|
||||||
|
ns[func_name],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def func_ref_from_frame(
|
def func_ref_from_frame(
|
||||||
frame: FrameType,
|
frame: FrameType,
|
||||||
) -> Callable:
|
) -> Callable:
|
||||||
|
@ -98,34 +120,63 @@ def func_ref_from_frame(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO: move all this into new `.devx._code`!
|
|
||||||
# -[ ] prolly create a `@runtime_api` dec?
|
|
||||||
# -[ ] ^- make it capture and/or accept buncha optional
|
|
||||||
# meta-data like a fancier version of `@pdbp.hideframe`.
|
|
||||||
#
|
|
||||||
class CallerInfo(pretty_struct.Struct):
|
class CallerInfo(pretty_struct.Struct):
|
||||||
rt_fi: inspect.FrameInfo
|
# https://docs.python.org/dev/reference/datamodel.html#frame-objects
|
||||||
call_frame: FrameType
|
# https://docs.python.org/dev/library/inspect.html#the-interpreter-stack
|
||||||
|
_api_frame: FrameType
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def api_func_ref(self) -> Callable|None:
|
def api_frame(self) -> FrameType:
|
||||||
return func_ref_from_frame(self.rt_fi.frame)
|
try:
|
||||||
|
self._api_frame.clear()
|
||||||
|
except RuntimeError:
|
||||||
|
# log.warning(
|
||||||
|
print(
|
||||||
|
f'Frame {self._api_frame} for {self.api_func} is still active!'
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._api_frame
|
||||||
|
|
||||||
|
_api_func: Callable
|
||||||
|
|
||||||
|
@property
|
||||||
|
def api_func(self) -> Callable:
|
||||||
|
return self._api_func
|
||||||
|
|
||||||
|
_caller_frames_up: int|None = 1
|
||||||
|
_caller_frame: FrameType|None = None # cached after first stack scan
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def api_nsp(self) -> NamespacePath|None:
|
def api_nsp(self) -> NamespacePath|None:
|
||||||
func: FunctionType = self.api_func_ref
|
func: FunctionType = self.api_func
|
||||||
if func:
|
if func:
|
||||||
return NamespacePath.from_ref(func)
|
return NamespacePath.from_ref(func)
|
||||||
|
|
||||||
return '<unknown>'
|
return '<unknown>'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def caller_func_ref(self) -> Callable|None:
|
def caller_frame(self) -> FrameType:
|
||||||
return func_ref_from_frame(self.call_frame)
|
|
||||||
|
# if not already cached, scan up stack explicitly by
|
||||||
|
# configured count.
|
||||||
|
if not self._caller_frame:
|
||||||
|
if self._caller_frames_up:
|
||||||
|
for _ in range(self._caller_frames_up):
|
||||||
|
caller_frame: FrameType|None = self.api_frame.f_back
|
||||||
|
|
||||||
|
if not caller_frame:
|
||||||
|
raise ValueError(
|
||||||
|
'No frame exists {self._caller_frames_up} up from\n'
|
||||||
|
f'{self.api_frame} @ {self.api_nsp}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
self._caller_frame = caller_frame
|
||||||
|
|
||||||
|
return self._caller_frame
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def caller_nsp(self) -> NamespacePath|None:
|
def caller_nsp(self) -> NamespacePath|None:
|
||||||
func: FunctionType = self.caller_func_ref
|
func: FunctionType = self.api_func
|
||||||
if func:
|
if func:
|
||||||
return NamespacePath.from_ref(func)
|
return NamespacePath.from_ref(func)
|
||||||
|
|
||||||
|
@ -172,108 +223,66 @@ def find_caller_info(
|
||||||
call_frame = call_frame.f_back
|
call_frame = call_frame.f_back
|
||||||
|
|
||||||
return CallerInfo(
|
return CallerInfo(
|
||||||
rt_fi=fi,
|
_api_frame=rt_frame,
|
||||||
call_frame=call_frame,
|
_api_func=func_ref_from_frame(rt_frame),
|
||||||
|
_caller_frames_up=go_up_iframes,
|
||||||
)
|
)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def pformat_boxed_tb(
|
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
|
||||||
tb_str: str,
|
|
||||||
fields_str: str|None = None,
|
|
||||||
field_prefix: str = ' |_',
|
|
||||||
|
|
||||||
tb_box_indent: int|None = None,
|
|
||||||
tb_body_indent: int = 1,
|
|
||||||
|
|
||||||
) -> str:
|
# TODO: -[x] move all this into new `.devx._code`!
|
||||||
'''
|
# -[ ] consider rename to _callstack?
|
||||||
Create a "boxed" looking traceback string.
|
# -[ ] prolly create a `@runtime_api` dec?
|
||||||
|
# |_ @api_frame seems better?
|
||||||
|
# -[ ] ^- make it capture and/or accept buncha optional
|
||||||
|
# meta-data like a fancier version of `@pdbp.hideframe`.
|
||||||
|
#
|
||||||
|
def api_frame(
|
||||||
|
wrapped: Callable|None = None,
|
||||||
|
*,
|
||||||
|
caller_frames_up: int = 1,
|
||||||
|
|
||||||
Useful for emphasizing traceback text content as being an
|
) -> Callable:
|
||||||
embedded attribute of some other object (like
|
|
||||||
a `RemoteActorError` or other boxing remote error shuttle
|
|
||||||
container).
|
|
||||||
|
|
||||||
Any other parent/container "fields" can be passed in the
|
# handle the decorator called WITHOUT () case,
|
||||||
`fields_str` input along with other prefix/indent settings.
|
# i.e. just @api_frame, NOT @api_frame(extra=<blah>)
|
||||||
|
if wrapped is None:
|
||||||
|
return partial(
|
||||||
|
api_frame,
|
||||||
|
caller_frames_up=caller_frames_up,
|
||||||
|
)
|
||||||
|
|
||||||
'''
|
@wrapt.decorator
|
||||||
if (
|
async def wrapper(
|
||||||
fields_str
|
wrapped: Callable,
|
||||||
and
|
instance: object,
|
||||||
field_prefix
|
args: tuple,
|
||||||
|
kwargs: dict,
|
||||||
):
|
):
|
||||||
fields: str = textwrap.indent(
|
# maybe cache the API frame for this call
|
||||||
fields_str,
|
global _frame2callerinfo_cache
|
||||||
prefix=field_prefix,
|
this_frame: FrameType = inspect.currentframe()
|
||||||
)
|
api_frame: FrameType = this_frame.f_back
|
||||||
else:
|
|
||||||
fields = fields_str or ''
|
|
||||||
|
|
||||||
tb_body = tb_str
|
if not _frame2callerinfo_cache.get(api_frame):
|
||||||
if tb_body_indent:
|
_frame2callerinfo_cache[api_frame] = CallerInfo(
|
||||||
tb_body: str = textwrap.indent(
|
_api_frame=api_frame,
|
||||||
tb_str,
|
_api_func=wrapped,
|
||||||
prefix=tb_body_indent * ' ',
|
_caller_frames_up=caller_frames_up,
|
||||||
)
|
)
|
||||||
|
|
||||||
tb_box: str = (
|
return wrapped(*args, **kwargs)
|
||||||
|
|
||||||
# orig
|
# annotate the function as a "api function", meaning it is
|
||||||
# f' |\n'
|
# a function for which the function above it in the call stack should be
|
||||||
# f' ------ - ------\n\n'
|
# non-`tractor` code aka "user code".
|
||||||
# f'{tb_str}\n'
|
#
|
||||||
# f' ------ - ------\n'
|
# in the global frame cache for easy lookup from a given
|
||||||
# f' _|\n'
|
# func-instance
|
||||||
|
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
|
||||||
f'|\n'
|
wrapped.__api_func__: bool = True
|
||||||
f' ------ - ------\n\n'
|
return wrapper(wrapped)
|
||||||
# f'{tb_str}\n'
|
|
||||||
f'{tb_body}'
|
|
||||||
f' ------ - ------\n'
|
|
||||||
f'_|\n'
|
|
||||||
)
|
|
||||||
tb_box_indent: str = (
|
|
||||||
tb_box_indent
|
|
||||||
or
|
|
||||||
1
|
|
||||||
|
|
||||||
# (len(field_prefix))
|
|
||||||
# ? ^-TODO-^ ? if you wanted another indent level
|
|
||||||
)
|
|
||||||
if tb_box_indent > 0:
|
|
||||||
tb_box: str = textwrap.indent(
|
|
||||||
tb_box,
|
|
||||||
prefix=tb_box_indent * ' ',
|
|
||||||
)
|
|
||||||
|
|
||||||
return (
|
|
||||||
fields
|
|
||||||
+
|
|
||||||
tb_box
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def pformat_caller_frame(
|
|
||||||
stack_limit: int = 1,
|
|
||||||
box_tb: bool = True,
|
|
||||||
) -> str:
|
|
||||||
'''
|
|
||||||
Capture and return the traceback text content from
|
|
||||||
`stack_limit` call frames up.
|
|
||||||
|
|
||||||
'''
|
|
||||||
tb_str: str = (
|
|
||||||
'\n'.join(
|
|
||||||
traceback.format_stack(limit=stack_limit)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if box_tb:
|
|
||||||
tb_str: str = pformat_boxed_tb(
|
|
||||||
tb_str=tb_str,
|
|
||||||
field_prefix=' ',
|
|
||||||
indent='',
|
|
||||||
)
|
|
||||||
return tb_str
|
|
|
@ -57,8 +57,8 @@ LEVELS: dict[str, int] = {
|
||||||
'TRANSPORT': 5,
|
'TRANSPORT': 5,
|
||||||
'RUNTIME': 15,
|
'RUNTIME': 15,
|
||||||
'CANCEL': 16,
|
'CANCEL': 16,
|
||||||
|
'DEVX': 400,
|
||||||
'PDB': 500,
|
'PDB': 500,
|
||||||
'DEVX': 600,
|
|
||||||
}
|
}
|
||||||
# _custom_levels: set[str] = {
|
# _custom_levels: set[str] = {
|
||||||
# lvlname.lower for lvlname in LEVELS.keys()
|
# lvlname.lower for lvlname in LEVELS.keys()
|
||||||
|
@ -137,7 +137,7 @@ class StackLevelAdapter(LoggerAdapter):
|
||||||
"Developer experience" sub-sys statuses.
|
"Developer experience" sub-sys statuses.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.log(600, msg)
|
return self.log(400, msg)
|
||||||
|
|
||||||
def log(
|
def log(
|
||||||
self,
|
self,
|
||||||
|
@ -202,8 +202,29 @@ class StackLevelAdapter(LoggerAdapter):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO IDEA:
|
||||||
|
# -[ ] do per task-name and actor-name color coding
|
||||||
|
# -[ ] unique color per task-id and actor-uuid
|
||||||
|
def pformat_task_uid(
|
||||||
|
id_part: str = 'tail'
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Return `str`-ified unique for a `trio.Task` via a combo of its
|
||||||
|
`.name: str` and `id()` truncated output.
|
||||||
|
|
||||||
|
'''
|
||||||
|
task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
tid: str = str(id(task))
|
||||||
|
if id_part == 'tail':
|
||||||
|
tid_part: str = tid[-6:]
|
||||||
|
else:
|
||||||
|
tid_part: str = tid[:6]
|
||||||
|
|
||||||
|
return f'{task.name}[{tid_part}]'
|
||||||
|
|
||||||
|
|
||||||
_conc_name_getters = {
|
_conc_name_getters = {
|
||||||
'task': lambda: trio.lowlevel.current_task().name,
|
'task': pformat_task_uid,
|
||||||
'actor': lambda: current_actor(),
|
'actor': lambda: current_actor(),
|
||||||
'actor_name': lambda: current_actor().name,
|
'actor_name': lambda: current_actor().name,
|
||||||
'actor_uid': lambda: current_actor().uid[1][:6],
|
'actor_uid': lambda: current_actor().uid[1][:6],
|
||||||
|
@ -211,7 +232,10 @@ _conc_name_getters = {
|
||||||
|
|
||||||
|
|
||||||
class ActorContextInfo(Mapping):
|
class ActorContextInfo(Mapping):
|
||||||
"Dyanmic lookup for local actor and task names"
|
'''
|
||||||
|
Dyanmic lookup for local actor and task names.
|
||||||
|
|
||||||
|
'''
|
||||||
_context_keys = (
|
_context_keys = (
|
||||||
'task',
|
'task',
|
||||||
'actor',
|
'actor',
|
||||||
|
|
|
@ -44,7 +44,7 @@ from ._codec import (
|
||||||
# )
|
# )
|
||||||
|
|
||||||
from .types import (
|
from .types import (
|
||||||
Msg as Msg,
|
PayloadMsg as PayloadMsg,
|
||||||
|
|
||||||
Aid as Aid,
|
Aid as Aid,
|
||||||
SpawnSpec as SpawnSpec,
|
SpawnSpec as SpawnSpec,
|
||||||
|
|
|
@ -432,7 +432,7 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
# ) -> Any|Struct:
|
# ) -> Any|Struct:
|
||||||
|
|
||||||
# msg: Msg = codec.dec.decode(msg)
|
# msg: PayloadMsg = codec.dec.decode(msg)
|
||||||
# payload_tag: str = msg.header.payload_tag
|
# payload_tag: str = msg.header.payload_tag
|
||||||
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
||||||
# return payload_dec.decode(msg.pld)
|
# return payload_dec.decode(msg.pld)
|
||||||
|
|
|
@ -22,10 +22,9 @@ operational helpers for processing transaction flows.
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
# asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
from contextvars import ContextVar
|
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Type,
|
Type,
|
||||||
|
@ -50,6 +49,7 @@ from tractor._exceptions import (
|
||||||
_mk_msg_type_err,
|
_mk_msg_type_err,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
)
|
)
|
||||||
|
from tractor._state import current_ipc_ctx
|
||||||
from ._codec import (
|
from ._codec import (
|
||||||
mk_dec,
|
mk_dec,
|
||||||
MsgDec,
|
MsgDec,
|
||||||
|
@ -75,7 +75,7 @@ if TYPE_CHECKING:
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
_def_any_pldec: MsgDec = mk_dec()
|
_def_any_pldec: MsgDec[Any] = mk_dec()
|
||||||
|
|
||||||
|
|
||||||
class PldRx(Struct):
|
class PldRx(Struct):
|
||||||
|
@ -104,15 +104,19 @@ class PldRx(Struct):
|
||||||
'''
|
'''
|
||||||
# TODO: better to bind it here?
|
# TODO: better to bind it here?
|
||||||
# _rx_mc: trio.MemoryReceiveChannel
|
# _rx_mc: trio.MemoryReceiveChannel
|
||||||
_pldec: MsgDec
|
_pld_dec: MsgDec
|
||||||
|
_ctx: Context|None = None
|
||||||
_ipc: Context|MsgStream|None = None
|
_ipc: Context|MsgStream|None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pld_dec(self) -> MsgDec:
|
def pld_dec(self) -> MsgDec:
|
||||||
return self._pldec
|
return self._pld_dec
|
||||||
|
|
||||||
|
# TODO: a better name?
|
||||||
|
# -[ ] when would this be used as it avoids needingn to pass the
|
||||||
|
# ipc prim to every method
|
||||||
@cm
|
@cm
|
||||||
def apply_to_ipc(
|
def wraps_ipc(
|
||||||
self,
|
self,
|
||||||
ipc_prim: Context|MsgStream,
|
ipc_prim: Context|MsgStream,
|
||||||
|
|
||||||
|
@ -140,49 +144,50 @@ class PldRx(Struct):
|
||||||
exit.
|
exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
orig_dec: MsgDec = self._pldec
|
orig_dec: MsgDec = self._pld_dec
|
||||||
limit_dec: MsgDec = mk_dec(spec=spec)
|
limit_dec: MsgDec = mk_dec(spec=spec)
|
||||||
try:
|
try:
|
||||||
self._pldec = limit_dec
|
self._pld_dec = limit_dec
|
||||||
yield limit_dec
|
yield limit_dec
|
||||||
finally:
|
finally:
|
||||||
self._pldec = orig_dec
|
self._pld_dec = orig_dec
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def dec(self) -> msgpack.Decoder:
|
def dec(self) -> msgpack.Decoder:
|
||||||
return self._pldec.dec
|
return self._pld_dec.dec
|
||||||
|
|
||||||
def recv_pld_nowait(
|
def recv_pld_nowait(
|
||||||
self,
|
self,
|
||||||
# TODO: make this `MsgStream` compat as well, see above^
|
# TODO: make this `MsgStream` compat as well, see above^
|
||||||
# ipc_prim: Context|MsgStream,
|
# ipc_prim: Context|MsgStream,
|
||||||
ctx: Context,
|
ipc: Context|MsgStream,
|
||||||
|
|
||||||
ipc_msg: MsgType|None = None,
|
ipc_msg: MsgType|None = None,
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
|
hide_tb: bool = False,
|
||||||
**dec_msg_kwargs,
|
**dec_msg_kwargs,
|
||||||
|
|
||||||
) -> Any|Raw:
|
) -> Any|Raw:
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
msg: MsgType = (
|
msg: MsgType = (
|
||||||
ipc_msg
|
ipc_msg
|
||||||
or
|
or
|
||||||
|
|
||||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||||
ctx._rx_chan.receive_nowait()
|
ipc._rx_chan.receive_nowait()
|
||||||
)
|
)
|
||||||
return self.dec_msg(
|
return self.dec_msg(
|
||||||
msg,
|
msg,
|
||||||
ctx=ctx,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
|
hide_tb=hide_tb,
|
||||||
**dec_msg_kwargs,
|
**dec_msg_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def recv_pld(
|
async def recv_pld(
|
||||||
self,
|
self,
|
||||||
ctx: Context,
|
ipc: Context|MsgStream,
|
||||||
ipc_msg: MsgType|None = None,
|
ipc_msg: MsgType|None = None,
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
@ -200,11 +205,11 @@ class PldRx(Struct):
|
||||||
or
|
or
|
||||||
|
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ctx._rx_chan.receive()
|
await ipc._rx_chan.receive()
|
||||||
)
|
)
|
||||||
return self.dec_msg(
|
return self.dec_msg(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
ctx=ctx,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
**dec_msg_kwargs,
|
**dec_msg_kwargs,
|
||||||
)
|
)
|
||||||
|
@ -212,7 +217,7 @@ class PldRx(Struct):
|
||||||
def dec_msg(
|
def dec_msg(
|
||||||
self,
|
self,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
ctx: Context,
|
ipc: Context|MsgStream,
|
||||||
expect_msg: Type[MsgType]|None,
|
expect_msg: Type[MsgType]|None,
|
||||||
|
|
||||||
raise_error: bool = True,
|
raise_error: bool = True,
|
||||||
|
@ -225,6 +230,9 @@ class PldRx(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
|
_src_err = None
|
||||||
|
src_err: BaseException|None = None
|
||||||
match msg:
|
match msg:
|
||||||
# payload-data shuttle msg; deliver the `.pld` value
|
# payload-data shuttle msg; deliver the `.pld` value
|
||||||
# directly to IPC (primitive) client-consumer code.
|
# directly to IPC (primitive) client-consumer code.
|
||||||
|
@ -234,7 +242,7 @@ class PldRx(Struct):
|
||||||
|Return(pld=pld) # termination phase
|
|Return(pld=pld) # termination phase
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
pld: PayloadT = self._pldec.decode(pld)
|
pld: PayloadT = self._pld_dec.decode(pld)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Decoded msg payload\n\n'
|
'Decoded msg payload\n\n'
|
||||||
f'{msg}\n\n'
|
f'{msg}\n\n'
|
||||||
|
@ -243,25 +251,30 @@ class PldRx(Struct):
|
||||||
)
|
)
|
||||||
return pld
|
return pld
|
||||||
|
|
||||||
# XXX pld-type failure
|
# XXX pld-value type failure
|
||||||
except ValidationError as src_err:
|
except ValidationError as valerr:
|
||||||
|
# pack mgterr into error-msg for
|
||||||
|
# reraise below; ensure remote-actor-err
|
||||||
|
# info is displayed nicely?
|
||||||
msgterr: MsgTypeError = _mk_msg_type_err(
|
msgterr: MsgTypeError = _mk_msg_type_err(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
codec=self.pld_dec,
|
codec=self.pld_dec,
|
||||||
src_validation_error=src_err,
|
src_validation_error=valerr,
|
||||||
is_invalid_payload=True,
|
is_invalid_payload=True,
|
||||||
)
|
)
|
||||||
msg: Error = pack_from_raise(
|
msg: Error = pack_from_raise(
|
||||||
local_err=msgterr,
|
local_err=msgterr,
|
||||||
cid=msg.cid,
|
cid=msg.cid,
|
||||||
src_uid=ctx.chan.uid,
|
src_uid=ipc.chan.uid,
|
||||||
)
|
)
|
||||||
|
src_err = valerr
|
||||||
|
|
||||||
# XXX some other decoder specific failure?
|
# XXX some other decoder specific failure?
|
||||||
# except TypeError as src_error:
|
# except TypeError as src_error:
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
# mk_pdb().set_trace()
|
# mk_pdb().set_trace()
|
||||||
# raise src_error
|
# raise src_error
|
||||||
|
# ^-TODO-^ can remove?
|
||||||
|
|
||||||
# a runtime-internal RPC endpoint response.
|
# a runtime-internal RPC endpoint response.
|
||||||
# always passthrough since (internal) runtime
|
# always passthrough since (internal) runtime
|
||||||
|
@ -299,6 +312,7 @@ class PldRx(Struct):
|
||||||
return src_err
|
return src_err
|
||||||
|
|
||||||
case Stop(cid=cid):
|
case Stop(cid=cid):
|
||||||
|
ctx: Context = getattr(ipc, 'ctx', ipc)
|
||||||
message: str = (
|
message: str = (
|
||||||
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
|
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
|
||||||
f'{ctx.peer_side!r} peer ?\n'
|
f'{ctx.peer_side!r} peer ?\n'
|
||||||
|
@ -341,14 +355,21 @@ class PldRx(Struct):
|
||||||
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
|
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
|
||||||
#
|
#
|
||||||
# fallthrough and raise from `src_err`
|
# fallthrough and raise from `src_err`
|
||||||
|
try:
|
||||||
_raise_from_unexpected_msg(
|
_raise_from_unexpected_msg(
|
||||||
ctx=ctx,
|
ctx=getattr(ipc, 'ctx', ipc),
|
||||||
msg=msg,
|
msg=msg,
|
||||||
src_err=src_err,
|
src_err=src_err,
|
||||||
log=log,
|
log=log,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
|
except UnboundLocalError:
|
||||||
|
# XXX if there's an internal lookup error in the above
|
||||||
|
# code (prolly on `src_err`) we want to show this frame
|
||||||
|
# in the tb!
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
raise
|
||||||
|
|
||||||
async def recv_msg_w_pld(
|
async def recv_msg_w_pld(
|
||||||
self,
|
self,
|
||||||
|
@ -378,52 +399,13 @@ class PldRx(Struct):
|
||||||
# msg instance?
|
# msg instance?
|
||||||
pld: PayloadT = self.dec_msg(
|
pld: PayloadT = self.dec_msg(
|
||||||
msg,
|
msg,
|
||||||
ctx=ipc,
|
ipc=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
return msg, pld
|
return msg, pld
|
||||||
|
|
||||||
|
|
||||||
# Always maintain a task-context-global `PldRx`
|
|
||||||
_def_pld_rx: PldRx = PldRx(
|
|
||||||
_pldec=_def_any_pldec,
|
|
||||||
)
|
|
||||||
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
|
|
||||||
'pld_rx',
|
|
||||||
default=_def_pld_rx,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def current_pldrx() -> PldRx:
|
|
||||||
'''
|
|
||||||
Return the current `trio.Task.context`'s msg-payload-receiver.
|
|
||||||
|
|
||||||
A payload receiver is the IPC-msg processing sub-sys which
|
|
||||||
filters inter-actor-task communicated payload data, i.e. the
|
|
||||||
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
|
|
||||||
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
|
|
||||||
up from `tractor`'s transport layer but BEFORE the data is
|
|
||||||
yielded to application code, normally via an IPC primitive API
|
|
||||||
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
|
|
||||||
|
|
||||||
Modification of the current payload spec via `limit_plds()`
|
|
||||||
allows a `tractor` application to contextually filter IPC
|
|
||||||
payload content with a type specification as supported by
|
|
||||||
the interchange backend.
|
|
||||||
|
|
||||||
- for `msgspec` see <PUTLINKHERE>.
|
|
||||||
|
|
||||||
NOTE that the `PldRx` itself is a per-`Context` global sub-system
|
|
||||||
that normally does not change other then the applied pld-spec
|
|
||||||
for the current `trio.Task`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# ctx: context = current_ipc_ctx()
|
|
||||||
# return ctx._pld_rx
|
|
||||||
return _ctxvar_PldRx.get()
|
|
||||||
|
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
def limit_plds(
|
def limit_plds(
|
||||||
spec: Union[Type[Struct]],
|
spec: Union[Type[Struct]],
|
||||||
|
@ -439,29 +421,55 @@ def limit_plds(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
try:
|
try:
|
||||||
# sanity on orig settings
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
orig_pldrx: PldRx = current_pldrx()
|
rx: PldRx = curr_ctx._pld_rx
|
||||||
orig_pldec: MsgDec = orig_pldrx.pld_dec
|
orig_pldec: MsgDec = rx.pld_dec
|
||||||
|
|
||||||
with orig_pldrx.limit_plds(
|
with rx.limit_plds(
|
||||||
spec=spec,
|
spec=spec,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) as pldec:
|
) as pldec:
|
||||||
log.info(
|
log.runtime(
|
||||||
'Applying payload-decoder\n\n'
|
'Applying payload-decoder\n\n'
|
||||||
f'{pldec}\n'
|
f'{pldec}\n'
|
||||||
)
|
)
|
||||||
yield pldec
|
yield pldec
|
||||||
finally:
|
finally:
|
||||||
log.info(
|
log.runtime(
|
||||||
'Reverted to previous payload-decoder\n\n'
|
'Reverted to previous payload-decoder\n\n'
|
||||||
f'{orig_pldec}\n'
|
f'{orig_pldec}\n'
|
||||||
)
|
)
|
||||||
assert (
|
# sanity on orig settings
|
||||||
(pldrx := current_pldrx()) is orig_pldrx
|
assert rx.pld_dec is orig_pldec
|
||||||
and
|
|
||||||
pldrx.pld_dec is orig_pldec
|
|
||||||
)
|
@acm
|
||||||
|
async def maybe_limit_plds(
|
||||||
|
ctx: Context,
|
||||||
|
spec: Union[Type[Struct]]|None = None,
|
||||||
|
**kwargs,
|
||||||
|
) -> MsgDec|None:
|
||||||
|
'''
|
||||||
|
Async compat maybe-payload type limiter.
|
||||||
|
|
||||||
|
Mostly for use inside other internal `@acm`s such that a separate
|
||||||
|
indent block isn't needed when an async one is already being
|
||||||
|
used.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if spec is None:
|
||||||
|
yield None
|
||||||
|
return
|
||||||
|
|
||||||
|
# sanity on scoping
|
||||||
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
|
assert ctx is curr_ctx
|
||||||
|
|
||||||
|
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
|
||||||
|
yield msgdec
|
||||||
|
|
||||||
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
|
assert ctx is curr_ctx
|
||||||
|
|
||||||
|
|
||||||
async def drain_to_final_msg(
|
async def drain_to_final_msg(
|
||||||
|
@ -543,21 +551,12 @@ async def drain_to_final_msg(
|
||||||
match msg:
|
match msg:
|
||||||
|
|
||||||
# final result arrived!
|
# final result arrived!
|
||||||
case Return(
|
case Return():
|
||||||
# cid=cid,
|
|
||||||
# pld=res,
|
|
||||||
):
|
|
||||||
# ctx._result: Any = res
|
|
||||||
ctx._result: Any = pld
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Context delivered final draining msg:\n'
|
'Context delivered final draining msg:\n'
|
||||||
f'{pretty_struct.pformat(msg)}'
|
f'{pretty_struct.pformat(msg)}'
|
||||||
)
|
)
|
||||||
# XXX: only close the rx mem chan AFTER
|
ctx._result: Any = pld
|
||||||
# a final result is retreived.
|
|
||||||
# if ctx._rx_chan:
|
|
||||||
# await ctx._rx_chan.aclose()
|
|
||||||
# TODO: ^ we don't need it right?
|
|
||||||
result_msg = msg
|
result_msg = msg
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -664,24 +663,6 @@ async def drain_to_final_msg(
|
||||||
result_msg = msg
|
result_msg = msg
|
||||||
break # OOOOOF, yeah obvi we need this..
|
break # OOOOOF, yeah obvi we need this..
|
||||||
|
|
||||||
# XXX we should never really get here
|
|
||||||
# right! since `._deliver_msg()` should
|
|
||||||
# always have detected an {'error': ..}
|
|
||||||
# msg and already called this right!?!
|
|
||||||
# elif error := unpack_error(
|
|
||||||
# msg=msg,
|
|
||||||
# chan=ctx._portal.channel,
|
|
||||||
# hide_tb=False,
|
|
||||||
# ):
|
|
||||||
# log.critical('SHOULD NEVER GET HERE!?')
|
|
||||||
# assert msg is ctx._cancel_msg
|
|
||||||
# assert error.msgdata == ctx._remote_error.msgdata
|
|
||||||
# assert error.ipc_msg == ctx._remote_error.ipc_msg
|
|
||||||
# from .devx._debug import pause
|
|
||||||
# await pause()
|
|
||||||
# ctx._maybe_cancel_and_set_remote_error(error)
|
|
||||||
# ctx._maybe_raise_remote_err(error)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# bubble the original src key error
|
# bubble the original src key error
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -56,8 +56,7 @@ log = get_logger('tractor.msgspec')
|
||||||
PayloadT = TypeVar('PayloadT')
|
PayloadT = TypeVar('PayloadT')
|
||||||
|
|
||||||
|
|
||||||
# TODO: PayloadMsg
|
class PayloadMsg(
|
||||||
class Msg(
|
|
||||||
Struct,
|
Struct,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
|
|
||||||
|
@ -110,6 +109,10 @@ class Msg(
|
||||||
pld: Raw
|
pld: Raw
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: complete rename
|
||||||
|
Msg = PayloadMsg
|
||||||
|
|
||||||
|
|
||||||
class Aid(
|
class Aid(
|
||||||
Struct,
|
Struct,
|
||||||
tag=True,
|
tag=True,
|
||||||
|
@ -299,7 +302,7 @@ class StartAck(
|
||||||
|
|
||||||
|
|
||||||
class Started(
|
class Started(
|
||||||
Msg,
|
PayloadMsg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -313,12 +316,12 @@ class Started(
|
||||||
|
|
||||||
# TODO: instead of using our existing `Start`
|
# TODO: instead of using our existing `Start`
|
||||||
# for this (as we did with the original `{'cmd': ..}` style)
|
# for this (as we did with the original `{'cmd': ..}` style)
|
||||||
# class Cancel(Msg):
|
# class Cancel:
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
|
|
||||||
class Yield(
|
class Yield(
|
||||||
Msg,
|
PayloadMsg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -345,7 +348,7 @@ class Stop(
|
||||||
|
|
||||||
# TODO: is `Result` or `Out[come]` a better name?
|
# TODO: is `Result` or `Out[come]` a better name?
|
||||||
class Return(
|
class Return(
|
||||||
Msg,
|
PayloadMsg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -357,7 +360,7 @@ class Return(
|
||||||
|
|
||||||
|
|
||||||
class CancelAck(
|
class CancelAck(
|
||||||
Msg,
|
PayloadMsg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -463,14 +466,14 @@ def from_dict_msg(
|
||||||
|
|
||||||
# TODO: should be make a msg version of `ContextCancelled?`
|
# TODO: should be make a msg version of `ContextCancelled?`
|
||||||
# and/or with a scope field or a full `ActorCancelled`?
|
# and/or with a scope field or a full `ActorCancelled`?
|
||||||
# class Cancelled(Msg):
|
# class Cancelled(MsgType):
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
# TODO what about overruns?
|
# TODO what about overruns?
|
||||||
# class Overrun(Msg):
|
# class Overrun(MsgType):
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
_runtime_msgs: list[Msg] = [
|
_runtime_msgs: list[Struct] = [
|
||||||
|
|
||||||
# identity handshake on first IPC `Channel` contact.
|
# identity handshake on first IPC `Channel` contact.
|
||||||
Aid,
|
Aid,
|
||||||
|
@ -496,9 +499,9 @@ _runtime_msgs: list[Msg] = [
|
||||||
]
|
]
|
||||||
|
|
||||||
# the no-outcome-yet IAC (inter-actor-communication) sub-set which
|
# the no-outcome-yet IAC (inter-actor-communication) sub-set which
|
||||||
# can be `Msg.pld` payload field type-limited by application code
|
# can be `PayloadMsg.pld` payload field type-limited by application code
|
||||||
# using `apply_codec()` and `limit_msg_spec()`.
|
# using `apply_codec()` and `limit_msg_spec()`.
|
||||||
_payload_msgs: list[Msg] = [
|
_payload_msgs: list[PayloadMsg] = [
|
||||||
# first <value> from `Context.started(<value>)`
|
# first <value> from `Context.started(<value>)`
|
||||||
Started,
|
Started,
|
||||||
|
|
||||||
|
@ -541,8 +544,8 @@ def mk_msg_spec(
|
||||||
] = 'indexed_generics',
|
] = 'indexed_generics',
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
Union[Type[Msg]],
|
Union[MsgType],
|
||||||
list[Type[Msg]],
|
list[MsgType],
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Create a payload-(data-)type-parameterized IPC message specification.
|
Create a payload-(data-)type-parameterized IPC message specification.
|
||||||
|
@ -554,7 +557,7 @@ def mk_msg_spec(
|
||||||
determined by the input `payload_type_union: Union[Type]`.
|
determined by the input `payload_type_union: Union[Type]`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
submsg_types: list[Type[Msg]] = Msg.__subclasses__()
|
submsg_types: list[MsgType] = Msg.__subclasses__()
|
||||||
bases: tuple = (
|
bases: tuple = (
|
||||||
# XXX NOTE XXX the below generic-parameterization seems to
|
# XXX NOTE XXX the below generic-parameterization seems to
|
||||||
# be THE ONLY way to get this to work correctly in terms
|
# be THE ONLY way to get this to work correctly in terms
|
||||||
|
|
Loading…
Reference in New Issue