Compare commits
No commits in common. "b22f7dcae042dae0a9d068021a76f2c818489d7d" and "343b7c971249b25480c6a59d8974856107e736b7" have entirely different histories.
b22f7dcae0
...
343b7c9712
|
@ -1,11 +1,6 @@
|
||||||
import time
|
import time
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import (
|
|
||||||
ActorNursery,
|
|
||||||
MsgStream,
|
|
||||||
Portal,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# this is the first 2 actors, streamer_1 and streamer_2
|
# this is the first 2 actors, streamer_1 and streamer_2
|
||||||
|
@ -17,18 +12,14 @@ async def stream_data(seed):
|
||||||
|
|
||||||
# this is the third actor; the aggregator
|
# this is the third actor; the aggregator
|
||||||
async def aggregate(seed):
|
async def aggregate(seed):
|
||||||
'''
|
"""Ensure that the two streams we receive match but only stream
|
||||||
Ensure that the two streams we receive match but only stream
|
|
||||||
a single set of values to the parent.
|
a single set of values to the parent.
|
||||||
|
"""
|
||||||
'''
|
async with tractor.open_nursery() as nursery:
|
||||||
an: ActorNursery
|
portals = []
|
||||||
async with tractor.open_nursery() as an:
|
|
||||||
portals: list[Portal] = []
|
|
||||||
for i in range(1, 3):
|
for i in range(1, 3):
|
||||||
|
# fork point
|
||||||
# fork/spawn call
|
portal = await nursery.start_actor(
|
||||||
portal = await an.start_actor(
|
|
||||||
name=f'streamer_{i}',
|
name=f'streamer_{i}',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
@ -52,11 +43,7 @@ async def aggregate(seed):
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
|
|
||||||
for portal in portals:
|
for portal in portals:
|
||||||
n.start_soon(
|
n.start_soon(push_to_chan, portal, send_chan.clone())
|
||||||
push_to_chan,
|
|
||||||
portal,
|
|
||||||
send_chan.clone(),
|
|
||||||
)
|
|
||||||
|
|
||||||
# close this local task's reference to send side
|
# close this local task's reference to send side
|
||||||
await send_chan.aclose()
|
await send_chan.aclose()
|
||||||
|
@ -73,7 +60,7 @@ async def aggregate(seed):
|
||||||
|
|
||||||
print("FINISHED ITERATING in aggregator")
|
print("FINISHED ITERATING in aggregator")
|
||||||
|
|
||||||
await an.cancel()
|
await nursery.cancel()
|
||||||
print("WAITING on `ActorNursery` to finish")
|
print("WAITING on `ActorNursery` to finish")
|
||||||
print("AGGREGATOR COMPLETE!")
|
print("AGGREGATOR COMPLETE!")
|
||||||
|
|
||||||
|
@ -88,21 +75,18 @@ async def main() -> list[int]:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# yes, a nursery which spawns `trio`-"actors" B)
|
# yes, a nursery which spawns `trio`-"actors" B)
|
||||||
an: ActorNursery
|
nursery: tractor.ActorNursery
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as nursery:
|
||||||
loglevel='cancel',
|
|
||||||
debug_mode=True,
|
|
||||||
) as an:
|
|
||||||
|
|
||||||
seed = int(1e3)
|
seed = int(1e3)
|
||||||
pre_start = time.time()
|
pre_start = time.time()
|
||||||
|
|
||||||
portal: Portal = await an.start_actor(
|
portal: tractor.Portal = await nursery.start_actor(
|
||||||
name='aggregator',
|
name='aggregator',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
stream: MsgStream
|
stream: tractor.MsgStream
|
||||||
async with portal.open_stream_from(
|
async with portal.open_stream_from(
|
||||||
aggregate,
|
aggregate,
|
||||||
seed=seed,
|
seed=seed,
|
||||||
|
@ -111,12 +95,11 @@ async def main() -> list[int]:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
# the portal call returns exactly what you'd expect
|
# the portal call returns exactly what you'd expect
|
||||||
# as if the remote "aggregate" function was called locally
|
# as if the remote "aggregate" function was called locally
|
||||||
result_stream: list[int] = []
|
result_stream = []
|
||||||
async for value in stream:
|
async for value in stream:
|
||||||
result_stream.append(value)
|
result_stream.append(value)
|
||||||
|
|
||||||
cancelled: bool = await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
assert cancelled
|
|
||||||
|
|
||||||
print(f"STREAM TIME = {time.time() - start}")
|
print(f"STREAM TIME = {time.time() - start}")
|
||||||
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
|
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
|
||||||
|
|
|
@ -55,7 +55,6 @@ xontrib-vox = "^0.0.1"
|
||||||
optional = false
|
optional = false
|
||||||
[tool.poetry.group.dev.dependencies]
|
[tool.poetry.group.dev.dependencies]
|
||||||
pytest = "^8.2.0"
|
pytest = "^8.2.0"
|
||||||
pexpect = "^4.9.0"
|
|
||||||
|
|
||||||
# only for xonsh as sh..
|
# only for xonsh as sh..
|
||||||
xontrib-vox = "^0.0.1"
|
xontrib-vox = "^0.0.1"
|
||||||
|
|
|
@ -97,7 +97,6 @@ def test_ipc_channel_break_during_stream(
|
||||||
examples_dir() / 'advanced_faults'
|
examples_dir() / 'advanced_faults'
|
||||||
/ 'ipc_failure_during_stream.py',
|
/ 'ipc_failure_during_stream.py',
|
||||||
root=examples_dir(),
|
root=examples_dir(),
|
||||||
consider_namespace_packages=False,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# by def we expect KBI from user after a simulated "hang
|
# by def we expect KBI from user after a simulated "hang
|
||||||
|
|
|
@ -89,30 +89,17 @@ def test_remote_error(reg_addr, args_err):
|
||||||
assert excinfo.value.boxed_type == errtype
|
assert excinfo.value.boxed_type == errtype
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# the root task will also error on the `Portal.result()`
|
# the root task will also error on the `.result()` call
|
||||||
# call so we expect an error from there AND the child.
|
# so we expect an error from there AND the child.
|
||||||
# |_ tho seems like on new `trio` this doesn't always
|
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||||
# happen?
|
|
||||||
with pytest.raises((
|
|
||||||
BaseExceptionGroup,
|
|
||||||
tractor.RemoteActorError,
|
|
||||||
)) as excinfo:
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed errors are `errtype`
|
# ensure boxed errors
|
||||||
err: BaseException = excinfo.value
|
for exc in excinfo.value.exceptions:
|
||||||
if isinstance(err, BaseExceptionGroup):
|
|
||||||
suberrs: list[BaseException] = err.exceptions
|
|
||||||
else:
|
|
||||||
suberrs: list[BaseException] = [err]
|
|
||||||
|
|
||||||
for exc in suberrs:
|
|
||||||
assert exc.boxed_type == errtype
|
assert exc.boxed_type == errtype
|
||||||
|
|
||||||
|
|
||||||
def test_multierror(
|
def test_multierror(reg_addr):
|
||||||
reg_addr: tuple[str, int],
|
|
||||||
):
|
|
||||||
'''
|
'''
|
||||||
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
|
||||||
more then one actor errors.
|
more then one actor errors.
|
||||||
|
|
|
@ -444,7 +444,6 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
fan_out=fan_out,
|
fan_out=fan_out,
|
||||||
)
|
)
|
||||||
# should raise RAE diectly
|
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
@ -462,11 +461,12 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed error type
|
# ensure boxed errors
|
||||||
excinfo.value.boxed_type == Exception
|
for exc in excinfo.value.exceptions:
|
||||||
|
assert exc.boxed_type == Exception
|
||||||
|
|
||||||
|
|
||||||
def test_trio_closes_early_and_channel_exits(reg_addr):
|
def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||||
|
@ -477,7 +477,7 @@ def test_trio_closes_early_and_channel_exits(reg_addr):
|
||||||
exit_early=True,
|
exit_early=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should raise RAE diectly
|
# should trigger remote actor error
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
# should be a quiet exit on a simple channel exit
|
# should be a quiet exit on a simple channel exit
|
||||||
|
@ -492,17 +492,15 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||||
aio_raise_err=True,
|
aio_raise_err=True,
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# should trigger RAE directly, not an eg.
|
# should trigger remote actor error
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||||
# NOTE: bc we directly wait on `Portal.result()` instead
|
|
||||||
# of capturing it inside the `ActorNursery` machinery.
|
|
||||||
expected_exception=RemoteActorError,
|
|
||||||
) as excinfo:
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
excinfo.value.boxed_type == Exception
|
# ensure boxed errors
|
||||||
|
for exc in excinfo.value.exceptions:
|
||||||
|
assert exc.boxed_type == Exception
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -55,10 +55,9 @@ from tractor._testing import (
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def open_stream_then_sleep_forever(
|
async def sleep_forever(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
expect_ctxc: bool = False,
|
expect_ctxc: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Sync the context, open a stream then just sleep.
|
Sync the context, open a stream then just sleep.
|
||||||
|
@ -68,10 +67,6 @@ async def open_stream_then_sleep_forever(
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
# NOTE: the below means this child will send a `Stop`
|
|
||||||
# to it's parent-side task despite that side never
|
|
||||||
# opening a stream itself.
|
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
@ -105,7 +100,7 @@ async def error_before_started(
|
||||||
'''
|
'''
|
||||||
async with tractor.wait_for_actor('sleeper') as p2:
|
async with tractor.wait_for_actor('sleeper') as p2:
|
||||||
async with (
|
async with (
|
||||||
p2.open_context(open_stream_then_sleep_forever) as (peer_ctx, first),
|
p2.open_context(sleep_forever) as (peer_ctx, first),
|
||||||
peer_ctx.open_stream(),
|
peer_ctx.open_stream(),
|
||||||
):
|
):
|
||||||
# NOTE: this WAS inside an @acm body but i factored it
|
# NOTE: this WAS inside an @acm body but i factored it
|
||||||
|
@ -209,13 +204,9 @@ async def stream_ints(
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def stream_from_peer(
|
async def stream_from_peer(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
debug_mode: bool,
|
|
||||||
peer_name: str = 'sleeper',
|
peer_name: str = 'sleeper',
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# sanity
|
|
||||||
assert tractor._state.debug_mode() == debug_mode
|
|
||||||
|
|
||||||
peer: Portal
|
peer: Portal
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
|
@ -249,54 +240,26 @@ async def stream_from_peer(
|
||||||
assert msg is not None
|
assert msg is not None
|
||||||
print(msg)
|
print(msg)
|
||||||
|
|
||||||
# NOTE: cancellation of the (sleeper) peer should always cause
|
# NOTE: cancellation of the (sleeper) peer should always
|
||||||
# a `ContextCancelled` raise in this streaming actor.
|
# cause a `ContextCancelled` raise in this streaming
|
||||||
except ContextCancelled as _ctxc:
|
# actor.
|
||||||
ctxc = _ctxc
|
except ContextCancelled as ctxc:
|
||||||
|
ctxerr = ctxc
|
||||||
|
|
||||||
# print("TRYING TO ENTER PAUSSE!!!")
|
assert peer_ctx._remote_error is ctxerr
|
||||||
# await tractor.pause(shield=True)
|
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
|
||||||
re: ContextCancelled = peer_ctx._remote_error
|
|
||||||
|
|
||||||
# XXX YES XXX, remote error should be unpacked only once!
|
# XXX YES, bc exact same msg instances
|
||||||
assert (
|
assert peer_ctx._remote_error._ipc_msg is ctxerr._ipc_msg
|
||||||
re
|
|
||||||
is
|
|
||||||
peer_ctx.maybe_error
|
|
||||||
is
|
|
||||||
ctxc
|
|
||||||
is
|
|
||||||
peer_ctx._local_error
|
|
||||||
)
|
|
||||||
# NOTE: these errors should all match!
|
|
||||||
# ------ - ------
|
|
||||||
# XXX [2024-05-03] XXX
|
|
||||||
# ------ - ------
|
|
||||||
# broke this due to a re-raise inside `.msg._ops.drain_to_final_msg()`
|
|
||||||
# where the `Error()` msg was directly raising the ctxc
|
|
||||||
# instead of just returning up to the caller inside
|
|
||||||
# `Context.return()` which would results in a diff instance of
|
|
||||||
# the same remote error bubbling out above vs what was
|
|
||||||
# already unpacked and set inside `Context.
|
|
||||||
assert (
|
|
||||||
peer_ctx._remote_error.msgdata
|
|
||||||
==
|
|
||||||
ctxc.msgdata
|
|
||||||
)
|
|
||||||
# ^-XXX-^ notice the data is of course the exact same.. so
|
|
||||||
# the above larger assert makes sense to also always be true!
|
|
||||||
|
|
||||||
# XXX YES XXX, bc should be exact same msg instances
|
# XXX NO, bc new one always created for property accesss
|
||||||
assert peer_ctx._remote_error._ipc_msg is ctxc._ipc_msg
|
assert peer_ctx._remote_error.ipc_msg != ctxerr.ipc_msg
|
||||||
|
|
||||||
# XXX NO XXX, bc new one always created for property accesss
|
|
||||||
assert peer_ctx._remote_error.ipc_msg != ctxc.ipc_msg
|
|
||||||
|
|
||||||
# the peer ctx is the canceller even though it's canceller
|
# the peer ctx is the canceller even though it's canceller
|
||||||
# is the "canceller" XD
|
# is the "canceller" XD
|
||||||
assert peer_name in peer_ctx.canceller
|
assert peer_name in peer_ctx.canceller
|
||||||
|
|
||||||
assert "canceller" in ctxc.canceller
|
assert "canceller" in ctxerr.canceller
|
||||||
|
|
||||||
# caller peer should not be the cancel requester
|
# caller peer should not be the cancel requester
|
||||||
assert not ctx.cancel_called
|
assert not ctx.cancel_called
|
||||||
|
@ -320,13 +283,12 @@ async def stream_from_peer(
|
||||||
|
|
||||||
# TODO / NOTE `.canceller` won't have been set yet
|
# TODO / NOTE `.canceller` won't have been set yet
|
||||||
# here because that machinery is inside
|
# here because that machinery is inside
|
||||||
# `Portal.open_context().__aexit__()` BUT, if we had
|
# `.open_context().__aexit__()` BUT, if we had
|
||||||
# a way to know immediately (from the last
|
# a way to know immediately (from the last
|
||||||
# checkpoint) that cancellation was due to
|
# checkpoint) that cancellation was due to
|
||||||
# a remote, we COULD assert this here..see,
|
# a remote, we COULD assert this here..see,
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
#
|
#
|
||||||
# await tractor.pause()
|
|
||||||
# assert 'canceller' in ctx.canceller
|
# assert 'canceller' in ctx.canceller
|
||||||
|
|
||||||
# root/parent actor task should NEVER HAVE cancelled us!
|
# root/parent actor task should NEVER HAVE cancelled us!
|
||||||
|
@ -430,13 +392,12 @@ def test_peer_canceller(
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
sleeper.open_context(
|
sleeper.open_context(
|
||||||
open_stream_then_sleep_forever,
|
sleep_forever,
|
||||||
expect_ctxc=True,
|
expect_ctxc=True,
|
||||||
) as (sleeper_ctx, sent),
|
) as (sleeper_ctx, sent),
|
||||||
|
|
||||||
just_caller.open_context(
|
just_caller.open_context(
|
||||||
stream_from_peer,
|
stream_from_peer,
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as (caller_ctx, sent),
|
) as (caller_ctx, sent),
|
||||||
|
|
||||||
canceller.open_context(
|
canceller.open_context(
|
||||||
|
@ -462,11 +423,10 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# should always raise since this root task does
|
# should always raise since this root task does
|
||||||
# not request the sleeper cancellation ;)
|
# not request the sleeper cancellation ;)
|
||||||
except ContextCancelled as _ctxc:
|
except ContextCancelled as ctxerr:
|
||||||
ctxc = _ctxc
|
|
||||||
print(
|
print(
|
||||||
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
|
'CAUGHT REMOTE CONTEXT CANCEL\n\n'
|
||||||
f'{ctxc}\n'
|
f'{ctxerr}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# canceller and caller peers should not
|
# canceller and caller peers should not
|
||||||
|
@ -477,7 +437,7 @@ def test_peer_canceller(
|
||||||
# we were not the actor, our peer was
|
# we were not the actor, our peer was
|
||||||
assert not sleeper_ctx.cancel_acked
|
assert not sleeper_ctx.cancel_acked
|
||||||
|
|
||||||
assert ctxc.canceller[0] == 'canceller'
|
assert ctxerr.canceller[0] == 'canceller'
|
||||||
|
|
||||||
# XXX NOTE XXX: since THIS `ContextCancelled`
|
# XXX NOTE XXX: since THIS `ContextCancelled`
|
||||||
# HAS NOT YET bubbled up to the
|
# HAS NOT YET bubbled up to the
|
||||||
|
@ -488,7 +448,7 @@ def test_peer_canceller(
|
||||||
|
|
||||||
# CASE_1: error-during-ctxc-handling,
|
# CASE_1: error-during-ctxc-handling,
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
raise RuntimeError('Simulated RTE re-raise during ctxc handling')
|
raise RuntimeError('Simulated error during teardown')
|
||||||
|
|
||||||
# CASE_2: standard teardown inside in `.open_context()` block
|
# CASE_2: standard teardown inside in `.open_context()` block
|
||||||
raise
|
raise
|
||||||
|
@ -553,9 +513,6 @@ def test_peer_canceller(
|
||||||
# should be cancelled by US.
|
# should be cancelled by US.
|
||||||
#
|
#
|
||||||
if error_during_ctxerr_handling:
|
if error_during_ctxerr_handling:
|
||||||
print(f'loc_err: {_loc_err}\n')
|
|
||||||
assert isinstance(loc_err, RuntimeError)
|
|
||||||
|
|
||||||
# since we do a rte reraise above, the
|
# since we do a rte reraise above, the
|
||||||
# `.open_context()` error handling should have
|
# `.open_context()` error handling should have
|
||||||
# raised a local rte, thus the internal
|
# raised a local rte, thus the internal
|
||||||
|
@ -564,6 +521,9 @@ def test_peer_canceller(
|
||||||
# a `trio.Cancelled` due to a local
|
# a `trio.Cancelled` due to a local
|
||||||
# `._scope.cancel()` call.
|
# `._scope.cancel()` call.
|
||||||
assert not sleeper_ctx._scope.cancelled_caught
|
assert not sleeper_ctx._scope.cancelled_caught
|
||||||
|
|
||||||
|
assert isinstance(loc_err, RuntimeError)
|
||||||
|
print(f'_loc_err: {_loc_err}\n')
|
||||||
# assert sleeper_ctx._local_error is _loc_err
|
# assert sleeper_ctx._local_error is _loc_err
|
||||||
# assert sleeper_ctx._local_error is _loc_err
|
# assert sleeper_ctx._local_error is _loc_err
|
||||||
assert not (
|
assert not (
|
||||||
|
@ -600,13 +560,10 @@ def test_peer_canceller(
|
||||||
|
|
||||||
else: # the other 2 ctxs
|
else: # the other 2 ctxs
|
||||||
assert (
|
assert (
|
||||||
isinstance(re, ContextCancelled)
|
|
||||||
and (
|
|
||||||
re.canceller
|
re.canceller
|
||||||
==
|
==
|
||||||
canceller.channel.uid
|
canceller.channel.uid
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# since the sleeper errors while handling a
|
# since the sleeper errors while handling a
|
||||||
# peer-cancelled (by ctxc) scenario, we expect
|
# peer-cancelled (by ctxc) scenario, we expect
|
||||||
|
@ -854,7 +811,8 @@ async def serve_subactors(
|
||||||
async with open_nursery() as an:
|
async with open_nursery() as an:
|
||||||
|
|
||||||
# sanity
|
# sanity
|
||||||
assert tractor._state.debug_mode() == debug_mode
|
if debug_mode:
|
||||||
|
assert tractor._state.debug_mode()
|
||||||
|
|
||||||
await ctx.started(peer_name)
|
await ctx.started(peer_name)
|
||||||
async with ctx.open_stream() as ipc:
|
async with ctx.open_stream() as ipc:
|
||||||
|
@ -1133,6 +1091,7 @@ def test_peer_spawns_and_cancels_service_subactor(
|
||||||
'-> root checking `client_ctx.result()`,\n'
|
'-> root checking `client_ctx.result()`,\n'
|
||||||
f'-> checking that sub-spawn {peer_name} is down\n'
|
f'-> checking that sub-spawn {peer_name} is down\n'
|
||||||
)
|
)
|
||||||
|
# else:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = await client_ctx.result(hide_tb=False)
|
res = await client_ctx.result(hide_tb=False)
|
||||||
|
|
|
@ -2,9 +2,7 @@
|
||||||
Spawning basics
|
Spawning basics
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import (
|
from typing import Optional
|
||||||
Any,
|
|
||||||
)
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
|
@ -27,11 +25,13 @@ async def spawn(
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_root_actor(
|
||||||
arbiter_addr=reg_addr,
|
arbiter_addr=reg_addr,
|
||||||
):
|
):
|
||||||
|
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
assert actor.is_arbiter == is_arbiter
|
assert actor.is_arbiter == is_arbiter
|
||||||
data = data_to_pass_down
|
data = data_to_pass_down
|
||||||
|
|
||||||
if actor.is_arbiter:
|
if actor.is_arbiter:
|
||||||
|
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
|
|
||||||
# forks here
|
# forks here
|
||||||
|
@ -95,9 +95,7 @@ async def test_movie_theatre_convo(start_method):
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
async def cellar_door(
|
async def cellar_door(return_value: Optional[str]):
|
||||||
return_value: str|None,
|
|
||||||
):
|
|
||||||
return return_value
|
return return_value
|
||||||
|
|
||||||
|
|
||||||
|
@ -107,18 +105,16 @@ async def cellar_door(
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_most_beautiful_word(
|
async def test_most_beautiful_word(
|
||||||
start_method: str,
|
start_method,
|
||||||
return_value: Any,
|
return_value
|
||||||
debug_mode: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
The main ``tractor`` routine.
|
The main ``tractor`` routine.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1):
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as n:
|
|
||||||
portal = await n.run_in_actor(
|
portal = await n.run_in_actor(
|
||||||
cellar_door,
|
cellar_door,
|
||||||
return_value=return_value,
|
return_value=return_value,
|
||||||
|
|
|
@ -42,7 +42,6 @@ from ._supervise import (
|
||||||
from ._state import (
|
from ._state import (
|
||||||
current_actor as current_actor,
|
current_actor as current_actor,
|
||||||
is_root_process as is_root_process,
|
is_root_process as is_root_process,
|
||||||
current_ipc_ctx as current_ipc_ctx,
|
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled as ContextCancelled,
|
ContextCancelled as ContextCancelled,
|
||||||
|
|
|
@ -41,7 +41,6 @@ from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
Mapping,
|
Mapping,
|
||||||
Type,
|
Type,
|
||||||
TypeAlias,
|
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
@ -95,7 +94,7 @@ if TYPE_CHECKING:
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._ipc import MsgTransport
|
from ._ipc import MsgTransport
|
||||||
from .devx._frame_stack import (
|
from .devx._code import (
|
||||||
CallerInfo,
|
CallerInfo,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -156,41 +155,6 @@ class Context:
|
||||||
# payload receiver
|
# payload receiver
|
||||||
_pld_rx: msgops.PldRx
|
_pld_rx: msgops.PldRx
|
||||||
|
|
||||||
@property
|
|
||||||
def pld_rx(self) -> msgops.PldRx:
|
|
||||||
'''
|
|
||||||
The current `tractor.Context`'s msg-payload-receiver.
|
|
||||||
|
|
||||||
A payload receiver is the IPC-msg processing sub-sys which
|
|
||||||
filters inter-actor-task communicated payload data, i.e. the
|
|
||||||
`PayloadMsg.pld: PayloadT` field value, AFTER its container
|
|
||||||
shuttlle msg (eg. `Started`/`Yield`/`Return) has been
|
|
||||||
delivered up from `tractor`'s transport layer but BEFORE the
|
|
||||||
data is yielded to `tractor` application code.
|
|
||||||
|
|
||||||
The "IPC-primitive API" is normally one of a `Context` (this)` or a `MsgStream`
|
|
||||||
or some higher level API using one of them.
|
|
||||||
|
|
||||||
For ex. `pld_data: PayloadT = MsgStream.receive()` implicitly
|
|
||||||
calls into the stream's parent `Context.pld_rx.recv_pld().` to
|
|
||||||
receive the latest `PayloadMsg.pld` value.
|
|
||||||
|
|
||||||
Modification of the current payload spec via `limit_plds()`
|
|
||||||
allows a `tractor` application to contextually filter IPC
|
|
||||||
payload content with a type specification as supported by the
|
|
||||||
interchange backend.
|
|
||||||
|
|
||||||
- for `msgspec` see <PUTLINKHERE>.
|
|
||||||
|
|
||||||
Note that the `PldRx` itself is a per-`Context` instance that
|
|
||||||
normally only changes when some (sub-)task, on a given "side"
|
|
||||||
of the IPC ctx (either a "child"-side RPC or inside
|
|
||||||
a "parent"-side `Portal.open_context()` block), modifies it
|
|
||||||
using the `.msg._ops.limit_plds()` API.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return self._pld_rx
|
|
||||||
|
|
||||||
# full "namespace-path" to target RPC function
|
# full "namespace-path" to target RPC function
|
||||||
_nsf: NamespacePath
|
_nsf: NamespacePath
|
||||||
|
|
||||||
|
@ -267,8 +231,6 @@ class Context:
|
||||||
|
|
||||||
# init and streaming state
|
# init and streaming state
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_started_msg: MsgType|None = None
|
|
||||||
_started_pld: Any = None
|
|
||||||
_stream_opened: bool = False
|
_stream_opened: bool = False
|
||||||
_stream: MsgStream|None = None
|
_stream: MsgStream|None = None
|
||||||
|
|
||||||
|
@ -661,7 +623,7 @@ class Context:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Setting remote error for ctx\n\n'
|
'Setting remote error for ctx\n\n'
|
||||||
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||||
f'=> {self.side!r}: {self._actor.uid}\n\n'
|
f'=> {self.side!r}\n\n'
|
||||||
f'{error}'
|
f'{error}'
|
||||||
)
|
)
|
||||||
self._remote_error: BaseException = error
|
self._remote_error: BaseException = error
|
||||||
|
@ -716,7 +678,7 @@ class Context:
|
||||||
log.error(
|
log.error(
|
||||||
f'Remote context error:\n\n'
|
f'Remote context error:\n\n'
|
||||||
# f'{pformat(self)}\n'
|
# f'{pformat(self)}\n'
|
||||||
f'{error}'
|
f'{error}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._canceller is None:
|
if self._canceller is None:
|
||||||
|
@ -762,10 +724,8 @@ class Context:
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
# from .devx import mk_pdb
|
|
||||||
# mk_pdb().set_trace()
|
|
||||||
|
|
||||||
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
|
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?'
|
||||||
if (
|
if (
|
||||||
cs
|
cs
|
||||||
and
|
and
|
||||||
|
@ -845,7 +805,6 @@ class Context:
|
||||||
# f'{ci.api_nsp}()\n'
|
# f'{ci.api_nsp}()\n'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# TODO: use `.dev._frame_stack` scanning to find caller!
|
|
||||||
return 'Portal.open_context()'
|
return 'Portal.open_context()'
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
|
@ -1345,6 +1304,17 @@ class Context:
|
||||||
ctx=self,
|
ctx=self,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
|
for msg in drained_msgs:
|
||||||
|
|
||||||
|
# TODO: mask this by default..
|
||||||
|
if isinstance(msg, Return):
|
||||||
|
# from .devx import pause
|
||||||
|
# await pause()
|
||||||
|
# raise InternalError(
|
||||||
|
log.warning(
|
||||||
|
'Final `return` msg should never be drained !?!?\n\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
|
|
||||||
drained_status: str = (
|
drained_status: str = (
|
||||||
'Ctx drained to final outcome msg\n\n'
|
'Ctx drained to final outcome msg\n\n'
|
||||||
|
@ -1465,10 +1435,6 @@ class Context:
|
||||||
self._result
|
self._result
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
|
||||||
def has_outcome(self) -> bool:
|
|
||||||
return bool(self.maybe_error) or self._final_result_is_set()
|
|
||||||
|
|
||||||
# @property
|
# @property
|
||||||
def repr_outcome(
|
def repr_outcome(
|
||||||
self,
|
self,
|
||||||
|
@ -1671,6 +1637,8 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
if rt_started != started_msg:
|
if rt_started != started_msg:
|
||||||
|
# TODO: break these methods out from the struct subtype?
|
||||||
|
|
||||||
# TODO: make that one a mod func too..
|
# TODO: make that one a mod func too..
|
||||||
diff = pretty_struct.Struct.__sub__(
|
diff = pretty_struct.Struct.__sub__(
|
||||||
rt_started,
|
rt_started,
|
||||||
|
@ -1706,8 +1674,6 @@ class Context:
|
||||||
) from verr
|
) from verr
|
||||||
|
|
||||||
self._started_called = True
|
self._started_called = True
|
||||||
self._started_msg = started_msg
|
|
||||||
self._started_pld = value
|
|
||||||
|
|
||||||
async def _drain_overflows(
|
async def _drain_overflows(
|
||||||
self,
|
self,
|
||||||
|
@ -1995,7 +1961,6 @@ async def open_context_from_portal(
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
func: Callable,
|
func: Callable,
|
||||||
|
|
||||||
pld_spec: TypeAlias|None = None,
|
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
|
|
||||||
# TODO: if we set this the wrapping `@acm` body will
|
# TODO: if we set this the wrapping `@acm` body will
|
||||||
|
@ -2061,7 +2026,7 @@ async def open_context_from_portal(
|
||||||
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
||||||
# with "self" since the local feeder mem-chan processing
|
# with "self" since the local feeder mem-chan processing
|
||||||
# is not built for it.
|
# is not built for it.
|
||||||
if (uid := portal.channel.uid) == portal.actor.uid:
|
if portal.channel.uid == portal.actor.uid:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
'** !! Invalid Operation !! **\n'
|
'** !! Invalid Operation !! **\n'
|
||||||
'Can not open an IPC ctx with the local actor!\n'
|
'Can not open an IPC ctx with the local actor!\n'
|
||||||
|
@ -2089,21 +2054,6 @@ async def open_context_from_portal(
|
||||||
assert ctx._caller_info
|
assert ctx._caller_info
|
||||||
_ctxvar_Context.set(ctx)
|
_ctxvar_Context.set(ctx)
|
||||||
|
|
||||||
# placeholder for any exception raised in the runtime
|
|
||||||
# or by user tasks which cause this context's closure.
|
|
||||||
scope_err: BaseException|None = None
|
|
||||||
ctxc_from_callee: ContextCancelled|None = None
|
|
||||||
try:
|
|
||||||
async with (
|
|
||||||
trio.open_nursery() as tn,
|
|
||||||
msgops.maybe_limit_plds(
|
|
||||||
ctx=ctx,
|
|
||||||
spec=pld_spec,
|
|
||||||
) as maybe_msgdec,
|
|
||||||
):
|
|
||||||
if maybe_msgdec:
|
|
||||||
assert maybe_msgdec.pld_spec == pld_spec
|
|
||||||
|
|
||||||
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
|
||||||
# `Started`-msg any cancellation triggered
|
# `Started`-msg any cancellation triggered
|
||||||
# in `._maybe_cancel_and_set_remote_error()` will
|
# in `._maybe_cancel_and_set_remote_error()` will
|
||||||
|
@ -2111,23 +2061,25 @@ async def open_context_from_portal(
|
||||||
# -> it's expected that if there is an error in this phase of
|
# -> it's expected that if there is an error in this phase of
|
||||||
# the dialog, the `Error` msg should be raised from the `msg`
|
# the dialog, the `Error` msg should be raised from the `msg`
|
||||||
# handling block below.
|
# handling block below.
|
||||||
started_msg, first = await ctx._pld_rx.recv_msg_w_pld(
|
first: Any = await ctx._pld_rx.recv_pld(
|
||||||
ipc=ctx,
|
ctx=ctx,
|
||||||
expect_msg=Started,
|
expect_msg=Started,
|
||||||
passthrough_non_pld_msgs=False,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# from .devx import pause
|
|
||||||
# await pause()
|
|
||||||
ctx._started_called: bool = True
|
ctx._started_called: bool = True
|
||||||
ctx._started_msg: bool = started_msg
|
|
||||||
ctx._started_pld: bool = first
|
|
||||||
|
|
||||||
# NOTE: this in an implicit runtime nursery used to,
|
uid: tuple = portal.channel.uid
|
||||||
# - start overrun queuing tasks when as well as
|
cid: str = ctx.cid
|
||||||
# for cancellation of the scope opened by the user.
|
|
||||||
ctx._scope_nursery: trio.Nursery = tn
|
# placeholder for any exception raised in the runtime
|
||||||
ctx._scope: trio.CancelScope = tn.cancel_scope
|
# or by user tasks which cause this context's closure.
|
||||||
|
scope_err: BaseException|None = None
|
||||||
|
ctxc_from_callee: ContextCancelled|None = None
|
||||||
|
try:
|
||||||
|
async with trio.open_nursery() as nurse:
|
||||||
|
|
||||||
|
# NOTE: used to start overrun queuing tasks
|
||||||
|
ctx._scope_nursery: trio.Nursery = nurse
|
||||||
|
ctx._scope: trio.CancelScope = nurse.cancel_scope
|
||||||
|
|
||||||
# deliver context instance and .started() msg value
|
# deliver context instance and .started() msg value
|
||||||
# in enter tuple.
|
# in enter tuple.
|
||||||
|
@ -2174,13 +2126,13 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
# when in allow_overruns mode there may be
|
# when in allow_overruns mode there may be
|
||||||
# lingering overflow sender tasks remaining?
|
# lingering overflow sender tasks remaining?
|
||||||
if tn.child_tasks:
|
if nurse.child_tasks:
|
||||||
# XXX: ensure we are in overrun state
|
# XXX: ensure we are in overrun state
|
||||||
# with ``._allow_overruns=True`` bc otherwise
|
# with ``._allow_overruns=True`` bc otherwise
|
||||||
# there should be no tasks in this nursery!
|
# there should be no tasks in this nursery!
|
||||||
if (
|
if (
|
||||||
not ctx._allow_overruns
|
not ctx._allow_overruns
|
||||||
or len(tn.child_tasks) > 1
|
or len(nurse.child_tasks) > 1
|
||||||
):
|
):
|
||||||
raise InternalError(
|
raise InternalError(
|
||||||
'Context has sub-tasks but is '
|
'Context has sub-tasks but is '
|
||||||
|
@ -2352,8 +2304,8 @@ async def open_context_from_portal(
|
||||||
):
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
'IPC connection for context is broken?\n'
|
'IPC connection for context is broken?\n'
|
||||||
f'task: {ctx.cid}\n'
|
f'task:{cid}\n'
|
||||||
f'actor: {uid}'
|
f'actor:{uid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
raise # duh
|
raise # duh
|
||||||
|
@ -2503,8 +2455,9 @@ async def open_context_from_portal(
|
||||||
and ctx.cancel_acked
|
and ctx.cancel_acked
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context cancelled by {ctx.side!r}-side task\n'
|
'Context cancelled by {ctx.side!r}-side task\n'
|
||||||
f'|_{ctx._task}\n\n'
|
f'|_{ctx._task}\n\n'
|
||||||
|
|
||||||
f'{repr(scope_err)}\n'
|
f'{repr(scope_err)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2532,7 +2485,7 @@ async def open_context_from_portal(
|
||||||
f'cid: {ctx.cid}\n'
|
f'cid: {ctx.cid}\n'
|
||||||
)
|
)
|
||||||
portal.actor._contexts.pop(
|
portal.actor._contexts.pop(
|
||||||
(uid, ctx.cid),
|
(uid, cid),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2560,12 +2513,11 @@ def mk_context(
|
||||||
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
||||||
|
|
||||||
# TODO: only scan caller-info if log level so high!
|
# TODO: only scan caller-info if log level so high!
|
||||||
from .devx._frame_stack import find_caller_info
|
from .devx._code import find_caller_info
|
||||||
caller_info: CallerInfo|None = find_caller_info()
|
caller_info: CallerInfo|None = find_caller_info()
|
||||||
|
|
||||||
pld_rx = msgops.PldRx(
|
# TODO: when/how do we apply `.limit_plds()` from here?
|
||||||
_pld_dec=msgops._def_any_pldec,
|
pld_rx: msgops.PldRx = msgops.current_pldrx()
|
||||||
)
|
|
||||||
|
|
||||||
ctx = Context(
|
ctx = Context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
|
@ -2579,16 +2531,13 @@ def mk_context(
|
||||||
_caller_info=caller_info,
|
_caller_info=caller_info,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
pld_rx._ctx = ctx
|
|
||||||
ctx._result = Unresolved
|
ctx._result = Unresolved
|
||||||
return ctx
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
# TODO: use the new type-parameters to annotate this in 3.13?
|
# TODO: use the new type-parameters to annotate this in 3.13?
|
||||||
# -[ ] https://peps.python.org/pep-0718/#unknown-types
|
# -[ ] https://peps.python.org/pep-0718/#unknown-types
|
||||||
def context(
|
def context(func: Callable) -> Callable:
|
||||||
func: Callable,
|
|
||||||
) -> Callable:
|
|
||||||
'''
|
'''
|
||||||
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
Mark an (async) function as an SC-supervised, inter-`Actor`,
|
||||||
child-`trio.Task`, IPC endpoint otherwise known more
|
child-`trio.Task`, IPC endpoint otherwise known more
|
||||||
|
|
|
@ -716,5 +716,4 @@ async def _connect_chan(
|
||||||
chan = Channel((host, port))
|
chan = Channel((host, port))
|
||||||
await chan.connect()
|
await chan.connect()
|
||||||
yield chan
|
yield chan
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await chan.aclose()
|
await chan.aclose()
|
||||||
|
|
|
@ -47,7 +47,6 @@ from ._ipc import Channel
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
# Error,
|
# Error,
|
||||||
PayloadMsg,
|
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
|
@ -99,8 +98,7 @@ class Portal:
|
||||||
|
|
||||||
self.chan = channel
|
self.chan = channel
|
||||||
# during the portal's lifetime
|
# during the portal's lifetime
|
||||||
self._final_result_pld: Any|None = None
|
self._final_result: Any|None = None
|
||||||
self._final_result_msg: PayloadMsg|None = None
|
|
||||||
|
|
||||||
# When set to a ``Context`` (when _submit_for_result is called)
|
# When set to a ``Context`` (when _submit_for_result is called)
|
||||||
# it is expected that ``result()`` will be awaited at some
|
# it is expected that ``result()`` will be awaited at some
|
||||||
|
@ -134,7 +132,7 @@ class Portal:
|
||||||
'A pending main result has already been submitted'
|
'A pending main result has already been submitted'
|
||||||
)
|
)
|
||||||
|
|
||||||
self._expect_result_ctx: Context = await self.actor.start_remote_task(
|
self._expect_result_ctx = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=NamespacePath(f'{ns}:{func}'),
|
nsf=NamespacePath(f'{ns}:{func}'),
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
@ -165,22 +163,13 @@ class Portal:
|
||||||
# expecting a "main" result
|
# expecting a "main" result
|
||||||
assert self._expect_result_ctx
|
assert self._expect_result_ctx
|
||||||
|
|
||||||
if self._final_result_msg is None:
|
if self._final_result is None:
|
||||||
try:
|
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
|
||||||
(
|
ctx=self._expect_result_ctx,
|
||||||
self._final_result_msg,
|
|
||||||
self._final_result_pld,
|
|
||||||
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
|
|
||||||
ipc=self._expect_result_ctx,
|
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
except BaseException as err:
|
|
||||||
# TODO: wrap this into `@api_frame` optionally with
|
|
||||||
# some kinda filtering mechanism like log levels?
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
raise err
|
|
||||||
|
|
||||||
return self._final_result_pld
|
return self._final_result
|
||||||
|
|
||||||
async def _cancel_streams(self):
|
async def _cancel_streams(self):
|
||||||
# terminate all locally running async generator
|
# terminate all locally running async generator
|
||||||
|
@ -312,7 +301,7 @@ class Portal:
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
return await ctx._pld_rx.recv_pld(
|
return await ctx._pld_rx.recv_pld(
|
||||||
ipc=ctx,
|
ctx=ctx,
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -331,8 +320,6 @@ class Portal:
|
||||||
remote rpc task or a local async generator instance.
|
remote rpc task or a local async generator instance.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
|
||||||
|
|
||||||
if isinstance(func, str):
|
if isinstance(func, str):
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
"`Portal.run(namespace: str, funcname: str)` is now"
|
"`Portal.run(namespace: str, funcname: str)` is now"
|
||||||
|
@ -366,7 +353,7 @@ class Portal:
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
return await ctx._pld_rx.recv_pld(
|
return await ctx._pld_rx.recv_pld(
|
||||||
ipc=ctx,
|
ctx=ctx,
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
Root actor runtime ignition(s).
|
Root actor runtime ignition(s).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
|
@ -60,7 +60,7 @@ _default_lo_addrs: list[tuple[str, int]] = [(
|
||||||
logger = log.get_logger('tractor')
|
logger = log.get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@asynccontextmanager
|
||||||
async def open_root_actor(
|
async def open_root_actor(
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
@ -96,7 +96,6 @@ async def open_root_actor(
|
||||||
Runtime init entry point for ``tractor``.
|
Runtime init entry point for ``tractor``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
|
||||||
# TODO: stick this in a `@cm` defined in `devx._debug`?
|
# TODO: stick this in a `@cm` defined in `devx._debug`?
|
||||||
#
|
#
|
||||||
# Override the global debugger hook to make it play nice with
|
# Override the global debugger hook to make it play nice with
|
||||||
|
@ -359,11 +358,7 @@ async def open_root_actor(
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
) as err:
|
) as err:
|
||||||
|
|
||||||
import inspect
|
entered: bool = await _debug._maybe_enter_pm(err)
|
||||||
entered: bool = await _debug._maybe_enter_pm(
|
|
||||||
err,
|
|
||||||
api_frame=inspect.currentframe(),
|
|
||||||
)
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not entered
|
not entered
|
||||||
|
|
|
@ -70,6 +70,7 @@ from .msg import (
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
CancelAck,
|
CancelAck,
|
||||||
Error,
|
Error,
|
||||||
|
Msg,
|
||||||
MsgType,
|
MsgType,
|
||||||
Return,
|
Return,
|
||||||
Start,
|
Start,
|
||||||
|
@ -249,17 +250,10 @@ async def _errors_relayed_via_ipc(
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# NOTE: we normally always hide this frame in call-stack tracebacks
|
|
||||||
# if the crash originated from an RPC task (since normally the
|
|
||||||
# user is only going to care about their own code not this
|
|
||||||
# internal runtime frame) and we DID NOT
|
|
||||||
# fail due to an IPC transport error!
|
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
# TODO: a debug nursery when in debug mode!
|
# TODO: a debug nursery when in debug mode!
|
||||||
# async with maybe_open_debugger_nursery() as debug_tn:
|
# async with maybe_open_debugger_nursery() as debug_tn:
|
||||||
# => see matching comment in side `._debug._pause()`
|
# => see matching comment in side `._debug._pause()`
|
||||||
rpc_err: BaseException|None = None
|
|
||||||
try:
|
try:
|
||||||
yield # run RPC invoke body
|
yield # run RPC invoke body
|
||||||
|
|
||||||
|
@ -270,7 +264,16 @@ async def _errors_relayed_via_ipc(
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
KeyboardInterrupt,
|
KeyboardInterrupt,
|
||||||
) as err:
|
) as err:
|
||||||
rpc_err = err
|
|
||||||
|
# NOTE: always hide this frame from debug REPL call stack
|
||||||
|
# if the crash originated from an RPC task and we DID NOT
|
||||||
|
# fail due to an IPC transport error!
|
||||||
|
if (
|
||||||
|
is_rpc
|
||||||
|
and
|
||||||
|
chan.connected()
|
||||||
|
):
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
# TODO: maybe we'll want different "levels" of debugging
|
# TODO: maybe we'll want different "levels" of debugging
|
||||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||||
|
@ -315,19 +318,11 @@ async def _errors_relayed_via_ipc(
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
)
|
)
|
||||||
if not entered_debug:
|
if not entered_debug:
|
||||||
# if we prolly should have entered the REPL but
|
|
||||||
# didn't, maybe there was an internal error in
|
|
||||||
# the above code and we do want to show this
|
|
||||||
# frame!
|
|
||||||
if _state.debug_mode():
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
log.exception(
|
log.exception(
|
||||||
'RPC task crashed\n'
|
'RPC task crashed\n'
|
||||||
f'|_{ctx}'
|
f'|_{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# ALWAYS try to ship RPC errors back to parent/caller task
|
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
|
|
||||||
|
@ -360,20 +355,6 @@ async def _errors_relayed_via_ipc(
|
||||||
# `Actor._service_n`, we add "handles" to each such that
|
# `Actor._service_n`, we add "handles" to each such that
|
||||||
# they can be individually ccancelled.
|
# they can be individually ccancelled.
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
# if the error is not from user code and instead a failure
|
|
||||||
# of a runtime RPC or transport failure we do prolly want to
|
|
||||||
# show this frame
|
|
||||||
if (
|
|
||||||
rpc_err
|
|
||||||
and (
|
|
||||||
not is_rpc
|
|
||||||
or
|
|
||||||
not chan.connected()
|
|
||||||
)
|
|
||||||
):
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ctx: Context
|
ctx: Context
|
||||||
func: Callable
|
func: Callable
|
||||||
|
@ -463,10 +444,9 @@ async def _invoke(
|
||||||
# open the stream with this option.
|
# open the stream with this option.
|
||||||
# allow_overruns=True,
|
# allow_overruns=True,
|
||||||
)
|
)
|
||||||
context_ep_func: bool = False
|
context: bool = False
|
||||||
|
|
||||||
# set the current IPC ctx var for this RPC task
|
assert not _state._ctxvar_Context.get()
|
||||||
_state._ctxvar_Context.set(ctx)
|
|
||||||
|
|
||||||
# TODO: deprecate this style..
|
# TODO: deprecate this style..
|
||||||
if getattr(func, '_tractor_stream_function', False):
|
if getattr(func, '_tractor_stream_function', False):
|
||||||
|
@ -495,7 +475,7 @@ async def _invoke(
|
||||||
# handle decorated ``@tractor.context`` async function
|
# handle decorated ``@tractor.context`` async function
|
||||||
elif getattr(func, '_tractor_context_function', False):
|
elif getattr(func, '_tractor_context_function', False):
|
||||||
kwargs['ctx'] = ctx
|
kwargs['ctx'] = ctx
|
||||||
context_ep_func = True
|
context = True
|
||||||
|
|
||||||
# errors raised inside this block are propgated back to caller
|
# errors raised inside this block are propgated back to caller
|
||||||
async with _errors_relayed_via_ipc(
|
async with _errors_relayed_via_ipc(
|
||||||
|
@ -521,7 +501,7 @@ async def _invoke(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# TODO: impl all these cases in terms of the `Context` one!
|
# TODO: impl all these cases in terms of the `Context` one!
|
||||||
if not context_ep_func:
|
if not context:
|
||||||
await _invoke_non_context(
|
await _invoke_non_context(
|
||||||
actor,
|
actor,
|
||||||
cancel_scope,
|
cancel_scope,
|
||||||
|
@ -591,6 +571,7 @@ async def _invoke(
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as tn:
|
||||||
ctx._scope_nursery = tn
|
ctx._scope_nursery = tn
|
||||||
ctx._scope = tn.cancel_scope
|
ctx._scope = tn.cancel_scope
|
||||||
|
_state._ctxvar_Context.set(ctx)
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
|
||||||
# TODO: should would be nice to have our
|
# TODO: should would be nice to have our
|
||||||
|
@ -850,7 +831,7 @@ async def process_messages(
|
||||||
(as utilized inside `Portal.cancel_actor()` ).
|
(as utilized inside `Portal.cancel_actor()` ).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert actor._service_n # runtime state sanity
|
assert actor._service_n # state sanity
|
||||||
|
|
||||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||||
# should use it?
|
# should use it?
|
||||||
|
@ -863,7 +844,7 @@ async def process_messages(
|
||||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
||||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
||||||
nursery_cancelled_before_task: bool = False
|
nursery_cancelled_before_task: bool = False
|
||||||
msg: MsgType|None = None
|
msg: Msg|None = None
|
||||||
try:
|
try:
|
||||||
# NOTE: this internal scope allows for keeping this
|
# NOTE: this internal scope allows for keeping this
|
||||||
# message loop running despite the current task having
|
# message loop running despite the current task having
|
||||||
|
|
|
@ -644,7 +644,7 @@ class Actor:
|
||||||
peers_str: str = ''
|
peers_str: str = ''
|
||||||
for uid, chans in self._peers.items():
|
for uid, chans in self._peers.items():
|
||||||
peers_str += (
|
peers_str += (
|
||||||
f'uid: {uid}\n'
|
f'|_ uid: {uid}\n'
|
||||||
)
|
)
|
||||||
for i, chan in enumerate(chans):
|
for i, chan in enumerate(chans):
|
||||||
peers_str += (
|
peers_str += (
|
||||||
|
@ -678,12 +678,10 @@ class Actor:
|
||||||
# XXX => YES IT DOES, when i was testing ctl-c
|
# XXX => YES IT DOES, when i was testing ctl-c
|
||||||
# from broken debug TTY locking due to
|
# from broken debug TTY locking due to
|
||||||
# msg-spec races on application using RunVar...
|
# msg-spec races on application using RunVar...
|
||||||
|
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||||
if (
|
if (
|
||||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
pdb_user_uid
|
||||||
and
|
and local_nursery
|
||||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
|
||||||
and
|
|
||||||
local_nursery
|
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(
|
entry: tuple|None = local_nursery._children.get(
|
||||||
tuple(pdb_user_uid)
|
tuple(pdb_user_uid)
|
||||||
|
@ -1171,17 +1169,13 @@ class Actor:
|
||||||
|
|
||||||
# kill any debugger request task to avoid deadlock
|
# kill any debugger request task to avoid deadlock
|
||||||
# with the root actor in this tree
|
# with the root actor in this tree
|
||||||
debug_req = _debug.DebugStatus
|
dbcs = _debug.DebugStatus.req_cs
|
||||||
lock_req_ctx: Context = debug_req.req_ctx
|
if dbcs is not None:
|
||||||
if lock_req_ctx is not None:
|
|
||||||
msg += (
|
msg += (
|
||||||
'-> Cancelling active debugger request..\n'
|
'-> Cancelling active debugger request..\n'
|
||||||
f'|_{_debug.Lock.repr()}\n\n'
|
f'|_{_debug.Lock.pformat()}'
|
||||||
f'|_{lock_req_ctx}\n\n'
|
|
||||||
)
|
)
|
||||||
# lock_req_ctx._scope.cancel()
|
dbcs.cancel()
|
||||||
# TODO: wrap this in a method-API..
|
|
||||||
debug_req.req_cs.cancel()
|
|
||||||
|
|
||||||
# self-cancel **all** ongoing RPC tasks
|
# self-cancel **all** ongoing RPC tasks
|
||||||
await self.cancel_rpc_tasks(
|
await self.cancel_rpc_tasks(
|
||||||
|
@ -1381,17 +1375,15 @@ class Actor:
|
||||||
"IPC channel's "
|
"IPC channel's "
|
||||||
)
|
)
|
||||||
rent_chan_repr: str = (
|
rent_chan_repr: str = (
|
||||||
f' |_{parent_chan}\n\n'
|
f'|_{parent_chan}'
|
||||||
if parent_chan
|
if parent_chan
|
||||||
else ''
|
else ''
|
||||||
)
|
)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Cancelling {descr} RPC tasks\n\n'
|
f'Cancelling {descr} {len(tasks)} rpc tasks\n\n'
|
||||||
f'<= canceller: {req_uid}\n'
|
f'<= `Actor.cancel_rpc_tasks()`: {req_uid}\n'
|
||||||
f'{rent_chan_repr}'
|
f' {rent_chan_repr}\n'
|
||||||
f'=> cancellee: {self.uid}\n'
|
# f'{self}\n'
|
||||||
f' |_{self}.cancel_rpc_tasks()\n'
|
|
||||||
f' |_tasks: {len(tasks)}\n'
|
|
||||||
# f'{tasks_str}'
|
# f'{tasks_str}'
|
||||||
)
|
)
|
||||||
for (
|
for (
|
||||||
|
@ -1421,7 +1413,7 @@ class Actor:
|
||||||
if tasks:
|
if tasks:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Waiting for remaining rpc tasks to complete\n'
|
'Waiting for remaining rpc tasks to complete\n'
|
||||||
f'|_{tasks_str}'
|
f'|_{tasks}'
|
||||||
)
|
)
|
||||||
await self._ongoing_rpc_tasks.wait()
|
await self._ongoing_rpc_tasks.wait()
|
||||||
|
|
||||||
|
@ -1474,10 +1466,7 @@ class Actor:
|
||||||
assert self._parent_chan, "No parent channel for this actor?"
|
assert self._parent_chan, "No parent channel for this actor?"
|
||||||
return Portal(self._parent_chan)
|
return Portal(self._parent_chan)
|
||||||
|
|
||||||
def get_chans(
|
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
|
||||||
self,
|
|
||||||
uid: tuple[str, str],
|
|
||||||
) -> list[Channel]:
|
|
||||||
'''
|
'''
|
||||||
Return all IPC channels to the actor with provided `uid`.
|
Return all IPC channels to the actor with provided `uid`.
|
||||||
|
|
||||||
|
@ -1637,9 +1626,7 @@ async def async_main(
|
||||||
# tranport address bind errors - normally it's
|
# tranport address bind errors - normally it's
|
||||||
# something silly like the wrong socket-address
|
# something silly like the wrong socket-address
|
||||||
# passed via a config or CLI Bo
|
# passed via a config or CLI Bo
|
||||||
entered_debug = await _debug._maybe_enter_pm(
|
entered_debug = await _debug._maybe_enter_pm(oserr)
|
||||||
oserr,
|
|
||||||
)
|
|
||||||
if entered_debug:
|
if entered_debug:
|
||||||
log.runtime('Exited debug REPL..')
|
log.runtime('Exited debug REPL..')
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -142,9 +142,7 @@ async def exhaust_portal(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
try:
|
try:
|
||||||
log.debug(
|
log.debug(f"Waiting on final result from {actor.uid}")
|
||||||
f'Waiting on final result from {actor.uid}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: streams should never be reaped here since they should
|
# XXX: streams should never be reaped here since they should
|
||||||
# always be established and shutdown using a context manager api
|
# always be established and shutdown using a context manager api
|
||||||
|
@ -197,10 +195,7 @@ async def cancel_on_completion(
|
||||||
# if this call errors we store the exception for later
|
# if this call errors we store the exception for later
|
||||||
# in ``errors`` which will be reraised inside
|
# in ``errors`` which will be reraised inside
|
||||||
# an exception group and we still send out a cancel request
|
# an exception group and we still send out a cancel request
|
||||||
result: Any|Exception = await exhaust_portal(
|
result: Any|Exception = await exhaust_portal(portal, actor)
|
||||||
portal,
|
|
||||||
actor,
|
|
||||||
)
|
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors[actor.uid]: Exception = result
|
errors[actor.uid]: Exception = result
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -508,6 +503,14 @@ async def trio_proc(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# await chan.send({
|
||||||
|
# '_parent_main_data': subactor._parent_main_data,
|
||||||
|
# 'enable_modules': subactor.enable_modules,
|
||||||
|
# 'reg_addrs': subactor.reg_addrs,
|
||||||
|
# 'bind_addrs': bind_addrs,
|
||||||
|
# '_runtime_vars': _runtime_vars,
|
||||||
|
# })
|
||||||
|
|
||||||
# track subactor in current nursery
|
# track subactor in current nursery
|
||||||
curr_actor: Actor = current_actor()
|
curr_actor: Actor = current_actor()
|
||||||
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||||
|
@ -551,8 +554,8 @@ async def trio_proc(
|
||||||
# killing the process too early.
|
# killing the process too early.
|
||||||
if proc:
|
if proc:
|
||||||
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
# don't clobber an ongoing pdb
|
# don't clobber an ongoing pdb
|
||||||
if cancelled_during_spawn:
|
if cancelled_during_spawn:
|
||||||
# Try again to avoid TTY clobbering.
|
# Try again to avoid TTY clobbering.
|
||||||
|
|
|
@ -124,15 +124,9 @@ _ctxvar_Context: ContextVar[Context] = ContextVar(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def current_ipc_ctx(
|
def current_ipc_ctx() -> Context:
|
||||||
error_on_not_set: bool = False,
|
|
||||||
) -> Context|None:
|
|
||||||
ctx: Context = _ctxvar_Context.get()
|
ctx: Context = _ctxvar_Context.get()
|
||||||
|
if not ctx:
|
||||||
if (
|
|
||||||
not ctx
|
|
||||||
and error_on_not_set
|
|
||||||
):
|
|
||||||
from ._exceptions import InternalError
|
from ._exceptions import InternalError
|
||||||
raise InternalError(
|
raise InternalError(
|
||||||
'No IPC context has been allocated for this task yet?\n'
|
'No IPC context has been allocated for this task yet?\n'
|
||||||
|
|
|
@ -52,7 +52,6 @@ from tractor.msg import (
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._context import Context
|
from ._context import Context
|
||||||
from ._ipc import Channel
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -66,10 +65,10 @@ log = get_logger(__name__)
|
||||||
class MsgStream(trio.abc.Channel):
|
class MsgStream(trio.abc.Channel):
|
||||||
'''
|
'''
|
||||||
A bidirectional message stream for receiving logically sequenced
|
A bidirectional message stream for receiving logically sequenced
|
||||||
values over an inter-actor IPC `Channel`.
|
values over an inter-actor IPC ``Channel``.
|
||||||
|
|
||||||
This is the type returned to a local task which entered either
|
This is the type returned to a local task which entered either
|
||||||
`Portal.open_stream_from()` or `Context.open_stream()`.
|
``Portal.open_stream_from()`` or ``Context.open_stream()``.
|
||||||
|
|
||||||
Termination rules:
|
Termination rules:
|
||||||
|
|
||||||
|
@ -96,22 +95,6 @@ class MsgStream(trio.abc.Channel):
|
||||||
self._eoc: bool|trio.EndOfChannel = False
|
self._eoc: bool|trio.EndOfChannel = False
|
||||||
self._closed: bool|trio.ClosedResourceError = False
|
self._closed: bool|trio.ClosedResourceError = False
|
||||||
|
|
||||||
@property
|
|
||||||
def ctx(self) -> Context:
|
|
||||||
'''
|
|
||||||
This stream's IPC `Context` ref.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return self._ctx
|
|
||||||
|
|
||||||
@property
|
|
||||||
def chan(self) -> Channel:
|
|
||||||
'''
|
|
||||||
Ref to the containing `Context`'s transport `Channel`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return self._ctx.chan
|
|
||||||
|
|
||||||
# TODO: could we make this a direct method bind to `PldRx`?
|
# TODO: could we make this a direct method bind to `PldRx`?
|
||||||
# -> receive_nowait = PldRx.recv_pld
|
# -> receive_nowait = PldRx.recv_pld
|
||||||
# |_ means latter would have to accept `MsgStream`-as-`self`?
|
# |_ means latter would have to accept `MsgStream`-as-`self`?
|
||||||
|
@ -126,7 +109,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
):
|
):
|
||||||
ctx: Context = self._ctx
|
ctx: Context = self._ctx
|
||||||
return ctx._pld_rx.recv_pld_nowait(
|
return ctx._pld_rx.recv_pld_nowait(
|
||||||
ipc=self,
|
ctx=ctx,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -165,7 +148,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
try:
|
try:
|
||||||
|
|
||||||
ctx: Context = self._ctx
|
ctx: Context = self._ctx
|
||||||
return await ctx._pld_rx.recv_pld(ipc=self)
|
return await ctx._pld_rx.recv_pld(ctx=ctx)
|
||||||
|
|
||||||
# XXX: the stream terminates on either of:
|
# XXX: the stream terminates on either of:
|
||||||
# - via `self._rx_chan.receive()` raising after manual closure
|
# - via `self._rx_chan.receive()` raising after manual closure
|
||||||
|
|
|
@ -84,7 +84,6 @@ class ActorNursery:
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
errors: dict[tuple[str, str], BaseException],
|
errors: dict[tuple[str, str], BaseException],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
self._actor: Actor = actor
|
self._actor: Actor = actor
|
||||||
|
@ -106,7 +105,6 @@ class ActorNursery:
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.exited = trio.Event()
|
self.exited = trio.Event()
|
||||||
self._scope_error: BaseException|None = None
|
|
||||||
|
|
||||||
# NOTE: when no explicit call is made to
|
# NOTE: when no explicit call is made to
|
||||||
# `.open_root_actor()` by application code,
|
# `.open_root_actor()` by application code,
|
||||||
|
@ -119,9 +117,7 @@ class ActorNursery:
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
|
||||||
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
|
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
|
||||||
rpc_module_paths: list[str]|None = None,
|
rpc_module_paths: list[str]|None = None,
|
||||||
enable_modules: list[str]|None = None,
|
enable_modules: list[str]|None = None,
|
||||||
|
@ -129,7 +125,6 @@ class ActorNursery:
|
||||||
nursery: trio.Nursery|None = None,
|
nursery: trio.Nursery|None = None,
|
||||||
debug_mode: bool|None = None,
|
debug_mode: bool|None = None,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
'''
|
'''
|
||||||
Start a (daemon) actor: an process that has no designated
|
Start a (daemon) actor: an process that has no designated
|
||||||
|
@ -194,13 +189,6 @@ class ActorNursery:
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: DEPRECATE THIS:
|
|
||||||
# -[ ] impl instead as a hilevel wrapper on
|
|
||||||
# top of a `@context` style invocation.
|
|
||||||
# |_ dynamic @context decoration on child side
|
|
||||||
# |_ implicit `Portal.open_context() as (ctx, first):`
|
|
||||||
# and `return first` on parent side.
|
|
||||||
# -[ ] use @api_frame on the wrapper
|
|
||||||
async def run_in_actor(
|
async def run_in_actor(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
|
@ -233,7 +221,7 @@ class ActorNursery:
|
||||||
# use the explicit function name if not provided
|
# use the explicit function name if not provided
|
||||||
name = fn.__name__
|
name = fn.__name__
|
||||||
|
|
||||||
portal: Portal = await self.start_actor(
|
portal = await self.start_actor(
|
||||||
name,
|
name,
|
||||||
enable_modules=[mod_path] + (
|
enable_modules=[mod_path] + (
|
||||||
enable_modules or rpc_module_paths or []
|
enable_modules or rpc_module_paths or []
|
||||||
|
@ -262,7 +250,6 @@ class ActorNursery:
|
||||||
)
|
)
|
||||||
return portal
|
return portal
|
||||||
|
|
||||||
# @api_frame
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
hard_kill: bool = False,
|
hard_kill: bool = False,
|
||||||
|
@ -360,11 +347,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
|
||||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
|
|
||||||
# normally don't need to show user by default
|
# TODO: yay or nay?
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__ = True
|
||||||
|
|
||||||
outer_err: BaseException|None = None
|
|
||||||
inner_err: BaseException|None = None
|
|
||||||
|
|
||||||
# the collection of errors retreived from spawned sub-actors
|
# the collection of errors retreived from spawned sub-actors
|
||||||
errors: dict[tuple[str, str], BaseException] = {}
|
errors: dict[tuple[str, str], BaseException] = {}
|
||||||
|
@ -374,7 +358,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# handling errors that are generated by the inner nursery in
|
# handling errors that are generated by the inner nursery in
|
||||||
# a supervisor strategy **before** blocking indefinitely to wait for
|
# a supervisor strategy **before** blocking indefinitely to wait for
|
||||||
# actors spawned in "daemon mode" (aka started using
|
# actors spawned in "daemon mode" (aka started using
|
||||||
# `ActorNursery.start_actor()`).
|
# ``ActorNursery.start_actor()``).
|
||||||
|
|
||||||
# errors from this daemon actor nursery bubble up to caller
|
# errors from this daemon actor nursery bubble up to caller
|
||||||
async with trio.open_nursery() as da_nursery:
|
async with trio.open_nursery() as da_nursery:
|
||||||
|
@ -409,8 +393,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
)
|
)
|
||||||
an._join_procs.set()
|
an._join_procs.set()
|
||||||
|
|
||||||
except BaseException as _inner_err:
|
except BaseException as inner_err:
|
||||||
inner_err = _inner_err
|
|
||||||
errors[actor.uid] = inner_err
|
errors[actor.uid] = inner_err
|
||||||
|
|
||||||
# If we error in the root but the debugger is
|
# If we error in the root but the debugger is
|
||||||
|
@ -488,10 +471,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
Exception,
|
Exception,
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
trio.Cancelled
|
trio.Cancelled
|
||||||
) as _outer_err:
|
|
||||||
outer_err = _outer_err
|
|
||||||
|
|
||||||
an._scope_error = outer_err or inner_err
|
) as err:
|
||||||
|
|
||||||
# XXX: yet another guard before allowing the cancel
|
# XXX: yet another guard before allowing the cancel
|
||||||
# sequence in case a (single) child is in debug.
|
# sequence in case a (single) child is in debug.
|
||||||
|
@ -506,7 +487,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
if an._children:
|
if an._children:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Actor-nursery cancelling due error type:\n'
|
'Actor-nursery cancelling due error type:\n'
|
||||||
f'{outer_err}\n'
|
f'{err}\n'
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
@ -533,19 +514,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
else:
|
else:
|
||||||
raise list(errors.values())[0]
|
raise list(errors.values())[0]
|
||||||
|
|
||||||
# show frame on any (likely) internal error
|
|
||||||
if (
|
|
||||||
not an.cancelled
|
|
||||||
and an._scope_error
|
|
||||||
):
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
# da_nursery scope end - nursery checkpoint
|
# da_nursery scope end - nursery checkpoint
|
||||||
# final exit
|
# final exit
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
# @api_frame
|
|
||||||
async def open_nursery(
|
async def open_nursery(
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -565,7 +538,6 @@ async def open_nursery(
|
||||||
which cancellation scopes correspond to each spawned subactor set.
|
which cancellation scopes correspond to each spawned subactor set.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
|
||||||
implicit_runtime: bool = False
|
implicit_runtime: bool = False
|
||||||
actor: Actor = current_actor(err_on_no_runtime=False)
|
actor: Actor = current_actor(err_on_no_runtime=False)
|
||||||
an: ActorNursery|None = None
|
an: ActorNursery|None = None
|
||||||
|
@ -616,14 +588,6 @@ async def open_nursery(
|
||||||
an.exited.set()
|
an.exited.set()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# show frame on any internal runtime-scope error
|
|
||||||
if (
|
|
||||||
an
|
|
||||||
and not an.cancelled
|
|
||||||
and an._scope_error
|
|
||||||
):
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
msg: str = (
|
msg: str = (
|
||||||
'Actor-nursery exited\n'
|
'Actor-nursery exited\n'
|
||||||
f'|_{an}\n'
|
f'|_{an}\n'
|
||||||
|
|
|
@ -20,8 +20,11 @@ as it pertains to improving the grok-ability of our runtime!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from functools import partial
|
|
||||||
import inspect
|
import inspect
|
||||||
|
# import msgspec
|
||||||
|
# from pprint import pformat
|
||||||
|
import textwrap
|
||||||
|
import traceback
|
||||||
from types import (
|
from types import (
|
||||||
FrameType,
|
FrameType,
|
||||||
FunctionType,
|
FunctionType,
|
||||||
|
@ -29,8 +32,9 @@ from types import (
|
||||||
# CodeType,
|
# CodeType,
|
||||||
)
|
)
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
# Any,
|
||||||
Callable,
|
Callable,
|
||||||
|
# TYPE_CHECKING,
|
||||||
Type,
|
Type,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,7 +42,6 @@ from tractor.msg import (
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
)
|
)
|
||||||
import wrapt
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: yeah, i don't love this and we should prolly just
|
# TODO: yeah, i don't love this and we should prolly just
|
||||||
|
@ -80,31 +83,6 @@ def get_class_from_frame(fr: FrameType) -> (
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_ns_and_func_from_frame(
|
|
||||||
frame: FrameType,
|
|
||||||
) -> Callable:
|
|
||||||
'''
|
|
||||||
Return the corresponding function object reference from
|
|
||||||
a `FrameType`, and return it and it's parent namespace `dict`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
ns: dict[str, Any]
|
|
||||||
|
|
||||||
# for a method, go up a frame and lookup the name in locals()
|
|
||||||
if '.' in (qualname := frame.f_code.co_qualname):
|
|
||||||
cls_name, _, func_name = qualname.partition('.')
|
|
||||||
ns = frame.f_back.f_locals[cls_name].__dict__
|
|
||||||
|
|
||||||
else:
|
|
||||||
func_name: str = frame.f_code.co_name
|
|
||||||
ns = frame.f_globals
|
|
||||||
|
|
||||||
return (
|
|
||||||
ns,
|
|
||||||
ns[func_name],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def func_ref_from_frame(
|
def func_ref_from_frame(
|
||||||
frame: FrameType,
|
frame: FrameType,
|
||||||
) -> Callable:
|
) -> Callable:
|
||||||
|
@ -120,63 +98,34 @@ def func_ref_from_frame(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move all this into new `.devx._code`!
|
||||||
|
# -[ ] prolly create a `@runtime_api` dec?
|
||||||
|
# -[ ] ^- make it capture and/or accept buncha optional
|
||||||
|
# meta-data like a fancier version of `@pdbp.hideframe`.
|
||||||
|
#
|
||||||
class CallerInfo(pretty_struct.Struct):
|
class CallerInfo(pretty_struct.Struct):
|
||||||
# https://docs.python.org/dev/reference/datamodel.html#frame-objects
|
rt_fi: inspect.FrameInfo
|
||||||
# https://docs.python.org/dev/library/inspect.html#the-interpreter-stack
|
call_frame: FrameType
|
||||||
_api_frame: FrameType
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def api_frame(self) -> FrameType:
|
def api_func_ref(self) -> Callable|None:
|
||||||
try:
|
return func_ref_from_frame(self.rt_fi.frame)
|
||||||
self._api_frame.clear()
|
|
||||||
except RuntimeError:
|
|
||||||
# log.warning(
|
|
||||||
print(
|
|
||||||
f'Frame {self._api_frame} for {self.api_func} is still active!'
|
|
||||||
)
|
|
||||||
|
|
||||||
return self._api_frame
|
|
||||||
|
|
||||||
_api_func: Callable
|
|
||||||
|
|
||||||
@property
|
|
||||||
def api_func(self) -> Callable:
|
|
||||||
return self._api_func
|
|
||||||
|
|
||||||
_caller_frames_up: int|None = 1
|
|
||||||
_caller_frame: FrameType|None = None # cached after first stack scan
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def api_nsp(self) -> NamespacePath|None:
|
def api_nsp(self) -> NamespacePath|None:
|
||||||
func: FunctionType = self.api_func
|
func: FunctionType = self.api_func_ref
|
||||||
if func:
|
if func:
|
||||||
return NamespacePath.from_ref(func)
|
return NamespacePath.from_ref(func)
|
||||||
|
|
||||||
return '<unknown>'
|
return '<unknown>'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def caller_frame(self) -> FrameType:
|
def caller_func_ref(self) -> Callable|None:
|
||||||
|
return func_ref_from_frame(self.call_frame)
|
||||||
# if not already cached, scan up stack explicitly by
|
|
||||||
# configured count.
|
|
||||||
if not self._caller_frame:
|
|
||||||
if self._caller_frames_up:
|
|
||||||
for _ in range(self._caller_frames_up):
|
|
||||||
caller_frame: FrameType|None = self.api_frame.f_back
|
|
||||||
|
|
||||||
if not caller_frame:
|
|
||||||
raise ValueError(
|
|
||||||
'No frame exists {self._caller_frames_up} up from\n'
|
|
||||||
f'{self.api_frame} @ {self.api_nsp}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
self._caller_frame = caller_frame
|
|
||||||
|
|
||||||
return self._caller_frame
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def caller_nsp(self) -> NamespacePath|None:
|
def caller_nsp(self) -> NamespacePath|None:
|
||||||
func: FunctionType = self.api_func
|
func: FunctionType = self.caller_func_ref
|
||||||
if func:
|
if func:
|
||||||
return NamespacePath.from_ref(func)
|
return NamespacePath.from_ref(func)
|
||||||
|
|
||||||
|
@ -223,66 +172,108 @@ def find_caller_info(
|
||||||
call_frame = call_frame.f_back
|
call_frame = call_frame.f_back
|
||||||
|
|
||||||
return CallerInfo(
|
return CallerInfo(
|
||||||
_api_frame=rt_frame,
|
rt_fi=fi,
|
||||||
_api_func=func_ref_from_frame(rt_frame),
|
call_frame=call_frame,
|
||||||
_caller_frames_up=go_up_iframes,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
|
def pformat_boxed_tb(
|
||||||
|
tb_str: str,
|
||||||
|
fields_str: str|None = None,
|
||||||
|
field_prefix: str = ' |_',
|
||||||
|
|
||||||
|
tb_box_indent: int|None = None,
|
||||||
|
tb_body_indent: int = 1,
|
||||||
|
|
||||||
# TODO: -[x] move all this into new `.devx._code`!
|
) -> str:
|
||||||
# -[ ] consider rename to _callstack?
|
'''
|
||||||
# -[ ] prolly create a `@runtime_api` dec?
|
Create a "boxed" looking traceback string.
|
||||||
# |_ @api_frame seems better?
|
|
||||||
# -[ ] ^- make it capture and/or accept buncha optional
|
|
||||||
# meta-data like a fancier version of `@pdbp.hideframe`.
|
|
||||||
#
|
|
||||||
def api_frame(
|
|
||||||
wrapped: Callable|None = None,
|
|
||||||
*,
|
|
||||||
caller_frames_up: int = 1,
|
|
||||||
|
|
||||||
) -> Callable:
|
Useful for emphasizing traceback text content as being an
|
||||||
|
embedded attribute of some other object (like
|
||||||
|
a `RemoteActorError` or other boxing remote error shuttle
|
||||||
|
container).
|
||||||
|
|
||||||
# handle the decorator called WITHOUT () case,
|
Any other parent/container "fields" can be passed in the
|
||||||
# i.e. just @api_frame, NOT @api_frame(extra=<blah>)
|
`fields_str` input along with other prefix/indent settings.
|
||||||
if wrapped is None:
|
|
||||||
return partial(
|
|
||||||
api_frame,
|
|
||||||
caller_frames_up=caller_frames_up,
|
|
||||||
)
|
|
||||||
|
|
||||||
@wrapt.decorator
|
'''
|
||||||
async def wrapper(
|
if (
|
||||||
wrapped: Callable,
|
fields_str
|
||||||
instance: object,
|
and
|
||||||
args: tuple,
|
field_prefix
|
||||||
kwargs: dict,
|
|
||||||
):
|
):
|
||||||
# maybe cache the API frame for this call
|
fields: str = textwrap.indent(
|
||||||
global _frame2callerinfo_cache
|
fields_str,
|
||||||
this_frame: FrameType = inspect.currentframe()
|
prefix=field_prefix,
|
||||||
api_frame: FrameType = this_frame.f_back
|
)
|
||||||
|
else:
|
||||||
|
fields = fields_str or ''
|
||||||
|
|
||||||
if not _frame2callerinfo_cache.get(api_frame):
|
tb_body = tb_str
|
||||||
_frame2callerinfo_cache[api_frame] = CallerInfo(
|
if tb_body_indent:
|
||||||
_api_frame=api_frame,
|
tb_body: str = textwrap.indent(
|
||||||
_api_func=wrapped,
|
tb_str,
|
||||||
_caller_frames_up=caller_frames_up,
|
prefix=tb_body_indent * ' ',
|
||||||
)
|
)
|
||||||
|
|
||||||
return wrapped(*args, **kwargs)
|
tb_box: str = (
|
||||||
|
|
||||||
# annotate the function as a "api function", meaning it is
|
# orig
|
||||||
# a function for which the function above it in the call stack should be
|
# f' |\n'
|
||||||
# non-`tractor` code aka "user code".
|
# f' ------ - ------\n\n'
|
||||||
#
|
# f'{tb_str}\n'
|
||||||
# in the global frame cache for easy lookup from a given
|
# f' ------ - ------\n'
|
||||||
# func-instance
|
# f' _|\n'
|
||||||
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
|
|
||||||
wrapped.__api_func__: bool = True
|
f'|\n'
|
||||||
return wrapper(wrapped)
|
f' ------ - ------\n\n'
|
||||||
|
# f'{tb_str}\n'
|
||||||
|
f'{tb_body}'
|
||||||
|
f' ------ - ------\n'
|
||||||
|
f'_|\n'
|
||||||
|
)
|
||||||
|
tb_box_indent: str = (
|
||||||
|
tb_box_indent
|
||||||
|
or
|
||||||
|
1
|
||||||
|
|
||||||
|
# (len(field_prefix))
|
||||||
|
# ? ^-TODO-^ ? if you wanted another indent level
|
||||||
|
)
|
||||||
|
if tb_box_indent > 0:
|
||||||
|
tb_box: str = textwrap.indent(
|
||||||
|
tb_box,
|
||||||
|
prefix=tb_box_indent * ' ',
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
fields
|
||||||
|
+
|
||||||
|
tb_box
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pformat_caller_frame(
|
||||||
|
stack_limit: int = 1,
|
||||||
|
box_tb: bool = True,
|
||||||
|
) -> str:
|
||||||
|
'''
|
||||||
|
Capture and return the traceback text content from
|
||||||
|
`stack_limit` call frames up.
|
||||||
|
|
||||||
|
'''
|
||||||
|
tb_str: str = (
|
||||||
|
'\n'.join(
|
||||||
|
traceback.format_stack(limit=stack_limit)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if box_tb:
|
||||||
|
tb_str: str = pformat_boxed_tb(
|
||||||
|
tb_str=tb_str,
|
||||||
|
field_prefix=' ',
|
||||||
|
indent='',
|
||||||
|
)
|
||||||
|
return tb_str
|
File diff suppressed because it is too large
Load Diff
|
@ -57,8 +57,8 @@ LEVELS: dict[str, int] = {
|
||||||
'TRANSPORT': 5,
|
'TRANSPORT': 5,
|
||||||
'RUNTIME': 15,
|
'RUNTIME': 15,
|
||||||
'CANCEL': 16,
|
'CANCEL': 16,
|
||||||
'DEVX': 400,
|
|
||||||
'PDB': 500,
|
'PDB': 500,
|
||||||
|
'DEVX': 600,
|
||||||
}
|
}
|
||||||
# _custom_levels: set[str] = {
|
# _custom_levels: set[str] = {
|
||||||
# lvlname.lower for lvlname in LEVELS.keys()
|
# lvlname.lower for lvlname in LEVELS.keys()
|
||||||
|
@ -137,7 +137,7 @@ class StackLevelAdapter(LoggerAdapter):
|
||||||
"Developer experience" sub-sys statuses.
|
"Developer experience" sub-sys statuses.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.log(400, msg)
|
return self.log(600, msg)
|
||||||
|
|
||||||
def log(
|
def log(
|
||||||
self,
|
self,
|
||||||
|
@ -202,29 +202,8 @@ class StackLevelAdapter(LoggerAdapter):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO IDEA:
|
|
||||||
# -[ ] do per task-name and actor-name color coding
|
|
||||||
# -[ ] unique color per task-id and actor-uuid
|
|
||||||
def pformat_task_uid(
|
|
||||||
id_part: str = 'tail'
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Return `str`-ified unique for a `trio.Task` via a combo of its
|
|
||||||
`.name: str` and `id()` truncated output.
|
|
||||||
|
|
||||||
'''
|
|
||||||
task: trio.Task = trio.lowlevel.current_task()
|
|
||||||
tid: str = str(id(task))
|
|
||||||
if id_part == 'tail':
|
|
||||||
tid_part: str = tid[-6:]
|
|
||||||
else:
|
|
||||||
tid_part: str = tid[:6]
|
|
||||||
|
|
||||||
return f'{task.name}[{tid_part}]'
|
|
||||||
|
|
||||||
|
|
||||||
_conc_name_getters = {
|
_conc_name_getters = {
|
||||||
'task': pformat_task_uid,
|
'task': lambda: trio.lowlevel.current_task().name,
|
||||||
'actor': lambda: current_actor(),
|
'actor': lambda: current_actor(),
|
||||||
'actor_name': lambda: current_actor().name,
|
'actor_name': lambda: current_actor().name,
|
||||||
'actor_uid': lambda: current_actor().uid[1][:6],
|
'actor_uid': lambda: current_actor().uid[1][:6],
|
||||||
|
@ -232,10 +211,7 @@ _conc_name_getters = {
|
||||||
|
|
||||||
|
|
||||||
class ActorContextInfo(Mapping):
|
class ActorContextInfo(Mapping):
|
||||||
'''
|
"Dyanmic lookup for local actor and task names"
|
||||||
Dyanmic lookup for local actor and task names.
|
|
||||||
|
|
||||||
'''
|
|
||||||
_context_keys = (
|
_context_keys = (
|
||||||
'task',
|
'task',
|
||||||
'actor',
|
'actor',
|
||||||
|
|
|
@ -44,7 +44,7 @@ from ._codec import (
|
||||||
# )
|
# )
|
||||||
|
|
||||||
from .types import (
|
from .types import (
|
||||||
PayloadMsg as PayloadMsg,
|
Msg as Msg,
|
||||||
|
|
||||||
Aid as Aid,
|
Aid as Aid,
|
||||||
SpawnSpec as SpawnSpec,
|
SpawnSpec as SpawnSpec,
|
||||||
|
|
|
@ -432,7 +432,7 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
# ) -> Any|Struct:
|
# ) -> Any|Struct:
|
||||||
|
|
||||||
# msg: PayloadMsg = codec.dec.decode(msg)
|
# msg: Msg = codec.dec.decode(msg)
|
||||||
# payload_tag: str = msg.header.payload_tag
|
# payload_tag: str = msg.header.payload_tag
|
||||||
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
||||||
# return payload_dec.decode(msg.pld)
|
# return payload_dec.decode(msg.pld)
|
||||||
|
|
|
@ -22,9 +22,10 @@ operational helpers for processing transaction flows.
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
# asynccontextmanager as acm,
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
|
from contextvars import ContextVar
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Type,
|
Type,
|
||||||
|
@ -49,7 +50,6 @@ from tractor._exceptions import (
|
||||||
_mk_msg_type_err,
|
_mk_msg_type_err,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
)
|
)
|
||||||
from tractor._state import current_ipc_ctx
|
|
||||||
from ._codec import (
|
from ._codec import (
|
||||||
mk_dec,
|
mk_dec,
|
||||||
MsgDec,
|
MsgDec,
|
||||||
|
@ -75,7 +75,7 @@ if TYPE_CHECKING:
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
_def_any_pldec: MsgDec[Any] = mk_dec()
|
_def_any_pldec: MsgDec = mk_dec()
|
||||||
|
|
||||||
|
|
||||||
class PldRx(Struct):
|
class PldRx(Struct):
|
||||||
|
@ -104,19 +104,15 @@ class PldRx(Struct):
|
||||||
'''
|
'''
|
||||||
# TODO: better to bind it here?
|
# TODO: better to bind it here?
|
||||||
# _rx_mc: trio.MemoryReceiveChannel
|
# _rx_mc: trio.MemoryReceiveChannel
|
||||||
_pld_dec: MsgDec
|
_pldec: MsgDec
|
||||||
_ctx: Context|None = None
|
|
||||||
_ipc: Context|MsgStream|None = None
|
_ipc: Context|MsgStream|None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pld_dec(self) -> MsgDec:
|
def pld_dec(self) -> MsgDec:
|
||||||
return self._pld_dec
|
return self._pldec
|
||||||
|
|
||||||
# TODO: a better name?
|
|
||||||
# -[ ] when would this be used as it avoids needingn to pass the
|
|
||||||
# ipc prim to every method
|
|
||||||
@cm
|
@cm
|
||||||
def wraps_ipc(
|
def apply_to_ipc(
|
||||||
self,
|
self,
|
||||||
ipc_prim: Context|MsgStream,
|
ipc_prim: Context|MsgStream,
|
||||||
|
|
||||||
|
@ -144,50 +140,49 @@ class PldRx(Struct):
|
||||||
exit.
|
exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
orig_dec: MsgDec = self._pld_dec
|
orig_dec: MsgDec = self._pldec
|
||||||
limit_dec: MsgDec = mk_dec(spec=spec)
|
limit_dec: MsgDec = mk_dec(spec=spec)
|
||||||
try:
|
try:
|
||||||
self._pld_dec = limit_dec
|
self._pldec = limit_dec
|
||||||
yield limit_dec
|
yield limit_dec
|
||||||
finally:
|
finally:
|
||||||
self._pld_dec = orig_dec
|
self._pldec = orig_dec
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def dec(self) -> msgpack.Decoder:
|
def dec(self) -> msgpack.Decoder:
|
||||||
return self._pld_dec.dec
|
return self._pldec.dec
|
||||||
|
|
||||||
def recv_pld_nowait(
|
def recv_pld_nowait(
|
||||||
self,
|
self,
|
||||||
# TODO: make this `MsgStream` compat as well, see above^
|
# TODO: make this `MsgStream` compat as well, see above^
|
||||||
# ipc_prim: Context|MsgStream,
|
# ipc_prim: Context|MsgStream,
|
||||||
ipc: Context|MsgStream,
|
ctx: Context,
|
||||||
|
|
||||||
ipc_msg: MsgType|None = None,
|
ipc_msg: MsgType|None = None,
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
hide_tb: bool = False,
|
|
||||||
**dec_msg_kwargs,
|
**dec_msg_kwargs,
|
||||||
|
|
||||||
) -> Any|Raw:
|
) -> Any|Raw:
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
msg: MsgType = (
|
msg: MsgType = (
|
||||||
ipc_msg
|
ipc_msg
|
||||||
or
|
or
|
||||||
|
|
||||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||||
ipc._rx_chan.receive_nowait()
|
ctx._rx_chan.receive_nowait()
|
||||||
)
|
)
|
||||||
return self.dec_msg(
|
return self.dec_msg(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ctx=ctx,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
hide_tb=hide_tb,
|
|
||||||
**dec_msg_kwargs,
|
**dec_msg_kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def recv_pld(
|
async def recv_pld(
|
||||||
self,
|
self,
|
||||||
ipc: Context|MsgStream,
|
ctx: Context,
|
||||||
ipc_msg: MsgType|None = None,
|
ipc_msg: MsgType|None = None,
|
||||||
expect_msg: Type[MsgType]|None = None,
|
expect_msg: Type[MsgType]|None = None,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
@ -205,11 +200,11 @@ class PldRx(Struct):
|
||||||
or
|
or
|
||||||
|
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ipc._rx_chan.receive()
|
await ctx._rx_chan.receive()
|
||||||
)
|
)
|
||||||
return self.dec_msg(
|
return self.dec_msg(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
ipc=ipc,
|
ctx=ctx,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
**dec_msg_kwargs,
|
**dec_msg_kwargs,
|
||||||
)
|
)
|
||||||
|
@ -217,7 +212,7 @@ class PldRx(Struct):
|
||||||
def dec_msg(
|
def dec_msg(
|
||||||
self,
|
self,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
ipc: Context|MsgStream,
|
ctx: Context,
|
||||||
expect_msg: Type[MsgType]|None,
|
expect_msg: Type[MsgType]|None,
|
||||||
|
|
||||||
raise_error: bool = True,
|
raise_error: bool = True,
|
||||||
|
@ -230,9 +225,6 @@ class PldRx(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
_src_err = None
|
|
||||||
src_err: BaseException|None = None
|
|
||||||
match msg:
|
match msg:
|
||||||
# payload-data shuttle msg; deliver the `.pld` value
|
# payload-data shuttle msg; deliver the `.pld` value
|
||||||
# directly to IPC (primitive) client-consumer code.
|
# directly to IPC (primitive) client-consumer code.
|
||||||
|
@ -242,7 +234,7 @@ class PldRx(Struct):
|
||||||
|Return(pld=pld) # termination phase
|
|Return(pld=pld) # termination phase
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
pld: PayloadT = self._pld_dec.decode(pld)
|
pld: PayloadT = self._pldec.decode(pld)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Decoded msg payload\n\n'
|
'Decoded msg payload\n\n'
|
||||||
f'{msg}\n\n'
|
f'{msg}\n\n'
|
||||||
|
@ -251,30 +243,25 @@ class PldRx(Struct):
|
||||||
)
|
)
|
||||||
return pld
|
return pld
|
||||||
|
|
||||||
# XXX pld-value type failure
|
# XXX pld-type failure
|
||||||
except ValidationError as valerr:
|
except ValidationError as src_err:
|
||||||
# pack mgterr into error-msg for
|
|
||||||
# reraise below; ensure remote-actor-err
|
|
||||||
# info is displayed nicely?
|
|
||||||
msgterr: MsgTypeError = _mk_msg_type_err(
|
msgterr: MsgTypeError = _mk_msg_type_err(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
codec=self.pld_dec,
|
codec=self.pld_dec,
|
||||||
src_validation_error=valerr,
|
src_validation_error=src_err,
|
||||||
is_invalid_payload=True,
|
is_invalid_payload=True,
|
||||||
)
|
)
|
||||||
msg: Error = pack_from_raise(
|
msg: Error = pack_from_raise(
|
||||||
local_err=msgterr,
|
local_err=msgterr,
|
||||||
cid=msg.cid,
|
cid=msg.cid,
|
||||||
src_uid=ipc.chan.uid,
|
src_uid=ctx.chan.uid,
|
||||||
)
|
)
|
||||||
src_err = valerr
|
|
||||||
|
|
||||||
# XXX some other decoder specific failure?
|
# XXX some other decoder specific failure?
|
||||||
# except TypeError as src_error:
|
# except TypeError as src_error:
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
# mk_pdb().set_trace()
|
# mk_pdb().set_trace()
|
||||||
# raise src_error
|
# raise src_error
|
||||||
# ^-TODO-^ can remove?
|
|
||||||
|
|
||||||
# a runtime-internal RPC endpoint response.
|
# a runtime-internal RPC endpoint response.
|
||||||
# always passthrough since (internal) runtime
|
# always passthrough since (internal) runtime
|
||||||
|
@ -312,7 +299,6 @@ class PldRx(Struct):
|
||||||
return src_err
|
return src_err
|
||||||
|
|
||||||
case Stop(cid=cid):
|
case Stop(cid=cid):
|
||||||
ctx: Context = getattr(ipc, 'ctx', ipc)
|
|
||||||
message: str = (
|
message: str = (
|
||||||
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
|
f'{ctx.side!r}-side of ctx received stream-`Stop` from '
|
||||||
f'{ctx.peer_side!r} peer ?\n'
|
f'{ctx.peer_side!r} peer ?\n'
|
||||||
|
@ -355,21 +341,14 @@ class PldRx(Struct):
|
||||||
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
|
# |_https://docs.python.org/3.11/library/exceptions.html#BaseException.add_note
|
||||||
#
|
#
|
||||||
# fallthrough and raise from `src_err`
|
# fallthrough and raise from `src_err`
|
||||||
try:
|
|
||||||
_raise_from_unexpected_msg(
|
_raise_from_unexpected_msg(
|
||||||
ctx=getattr(ipc, 'ctx', ipc),
|
ctx=ctx,
|
||||||
msg=msg,
|
msg=msg,
|
||||||
src_err=src_err,
|
src_err=src_err,
|
||||||
log=log,
|
log=log,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
except UnboundLocalError:
|
|
||||||
# XXX if there's an internal lookup error in the above
|
|
||||||
# code (prolly on `src_err`) we want to show this frame
|
|
||||||
# in the tb!
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def recv_msg_w_pld(
|
async def recv_msg_w_pld(
|
||||||
self,
|
self,
|
||||||
|
@ -399,13 +378,52 @@ class PldRx(Struct):
|
||||||
# msg instance?
|
# msg instance?
|
||||||
pld: PayloadT = self.dec_msg(
|
pld: PayloadT = self.dec_msg(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ctx=ipc,
|
||||||
expect_msg=expect_msg,
|
expect_msg=expect_msg,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
return msg, pld
|
return msg, pld
|
||||||
|
|
||||||
|
|
||||||
|
# Always maintain a task-context-global `PldRx`
|
||||||
|
_def_pld_rx: PldRx = PldRx(
|
||||||
|
_pldec=_def_any_pldec,
|
||||||
|
)
|
||||||
|
_ctxvar_PldRx: ContextVar[PldRx] = ContextVar(
|
||||||
|
'pld_rx',
|
||||||
|
default=_def_pld_rx,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def current_pldrx() -> PldRx:
|
||||||
|
'''
|
||||||
|
Return the current `trio.Task.context`'s msg-payload-receiver.
|
||||||
|
|
||||||
|
A payload receiver is the IPC-msg processing sub-sys which
|
||||||
|
filters inter-actor-task communicated payload data, i.e. the
|
||||||
|
`PayloadMsg.pld: PayloadT` field value, AFTER it's container
|
||||||
|
shuttlle msg (eg. `Started`/`Yield`/`Return) has been delivered
|
||||||
|
up from `tractor`'s transport layer but BEFORE the data is
|
||||||
|
yielded to application code, normally via an IPC primitive API
|
||||||
|
like, for ex., `pld_data: PayloadT = MsgStream.receive()`.
|
||||||
|
|
||||||
|
Modification of the current payload spec via `limit_plds()`
|
||||||
|
allows a `tractor` application to contextually filter IPC
|
||||||
|
payload content with a type specification as supported by
|
||||||
|
the interchange backend.
|
||||||
|
|
||||||
|
- for `msgspec` see <PUTLINKHERE>.
|
||||||
|
|
||||||
|
NOTE that the `PldRx` itself is a per-`Context` global sub-system
|
||||||
|
that normally does not change other then the applied pld-spec
|
||||||
|
for the current `trio.Task`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# ctx: context = current_ipc_ctx()
|
||||||
|
# return ctx._pld_rx
|
||||||
|
return _ctxvar_PldRx.get()
|
||||||
|
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
def limit_plds(
|
def limit_plds(
|
||||||
spec: Union[Type[Struct]],
|
spec: Union[Type[Struct]],
|
||||||
|
@ -421,55 +439,29 @@ def limit_plds(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
try:
|
try:
|
||||||
curr_ctx: Context = current_ipc_ctx()
|
# sanity on orig settings
|
||||||
rx: PldRx = curr_ctx._pld_rx
|
orig_pldrx: PldRx = current_pldrx()
|
||||||
orig_pldec: MsgDec = rx.pld_dec
|
orig_pldec: MsgDec = orig_pldrx.pld_dec
|
||||||
|
|
||||||
with rx.limit_plds(
|
with orig_pldrx.limit_plds(
|
||||||
spec=spec,
|
spec=spec,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) as pldec:
|
) as pldec:
|
||||||
log.runtime(
|
log.info(
|
||||||
'Applying payload-decoder\n\n'
|
'Applying payload-decoder\n\n'
|
||||||
f'{pldec}\n'
|
f'{pldec}\n'
|
||||||
)
|
)
|
||||||
yield pldec
|
yield pldec
|
||||||
finally:
|
finally:
|
||||||
log.runtime(
|
log.info(
|
||||||
'Reverted to previous payload-decoder\n\n'
|
'Reverted to previous payload-decoder\n\n'
|
||||||
f'{orig_pldec}\n'
|
f'{orig_pldec}\n'
|
||||||
)
|
)
|
||||||
# sanity on orig settings
|
assert (
|
||||||
assert rx.pld_dec is orig_pldec
|
(pldrx := current_pldrx()) is orig_pldrx
|
||||||
|
and
|
||||||
|
pldrx.pld_dec is orig_pldec
|
||||||
@acm
|
)
|
||||||
async def maybe_limit_plds(
|
|
||||||
ctx: Context,
|
|
||||||
spec: Union[Type[Struct]]|None = None,
|
|
||||||
**kwargs,
|
|
||||||
) -> MsgDec|None:
|
|
||||||
'''
|
|
||||||
Async compat maybe-payload type limiter.
|
|
||||||
|
|
||||||
Mostly for use inside other internal `@acm`s such that a separate
|
|
||||||
indent block isn't needed when an async one is already being
|
|
||||||
used.
|
|
||||||
|
|
||||||
'''
|
|
||||||
if spec is None:
|
|
||||||
yield None
|
|
||||||
return
|
|
||||||
|
|
||||||
# sanity on scoping
|
|
||||||
curr_ctx: Context = current_ipc_ctx()
|
|
||||||
assert ctx is curr_ctx
|
|
||||||
|
|
||||||
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
|
|
||||||
yield msgdec
|
|
||||||
|
|
||||||
curr_ctx: Context = current_ipc_ctx()
|
|
||||||
assert ctx is curr_ctx
|
|
||||||
|
|
||||||
|
|
||||||
async def drain_to_final_msg(
|
async def drain_to_final_msg(
|
||||||
|
@ -551,12 +543,21 @@ async def drain_to_final_msg(
|
||||||
match msg:
|
match msg:
|
||||||
|
|
||||||
# final result arrived!
|
# final result arrived!
|
||||||
case Return():
|
case Return(
|
||||||
|
# cid=cid,
|
||||||
|
# pld=res,
|
||||||
|
):
|
||||||
|
# ctx._result: Any = res
|
||||||
|
ctx._result: Any = pld
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Context delivered final draining msg:\n'
|
'Context delivered final draining msg:\n'
|
||||||
f'{pretty_struct.pformat(msg)}'
|
f'{pretty_struct.pformat(msg)}'
|
||||||
)
|
)
|
||||||
ctx._result: Any = pld
|
# XXX: only close the rx mem chan AFTER
|
||||||
|
# a final result is retreived.
|
||||||
|
# if ctx._rx_chan:
|
||||||
|
# await ctx._rx_chan.aclose()
|
||||||
|
# TODO: ^ we don't need it right?
|
||||||
result_msg = msg
|
result_msg = msg
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -663,6 +664,24 @@ async def drain_to_final_msg(
|
||||||
result_msg = msg
|
result_msg = msg
|
||||||
break # OOOOOF, yeah obvi we need this..
|
break # OOOOOF, yeah obvi we need this..
|
||||||
|
|
||||||
|
# XXX we should never really get here
|
||||||
|
# right! since `._deliver_msg()` should
|
||||||
|
# always have detected an {'error': ..}
|
||||||
|
# msg and already called this right!?!
|
||||||
|
# elif error := unpack_error(
|
||||||
|
# msg=msg,
|
||||||
|
# chan=ctx._portal.channel,
|
||||||
|
# hide_tb=False,
|
||||||
|
# ):
|
||||||
|
# log.critical('SHOULD NEVER GET HERE!?')
|
||||||
|
# assert msg is ctx._cancel_msg
|
||||||
|
# assert error.msgdata == ctx._remote_error.msgdata
|
||||||
|
# assert error.ipc_msg == ctx._remote_error.ipc_msg
|
||||||
|
# from .devx._debug import pause
|
||||||
|
# await pause()
|
||||||
|
# ctx._maybe_cancel_and_set_remote_error(error)
|
||||||
|
# ctx._maybe_raise_remote_err(error)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# bubble the original src key error
|
# bubble the original src key error
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -56,7 +56,8 @@ log = get_logger('tractor.msgspec')
|
||||||
PayloadT = TypeVar('PayloadT')
|
PayloadT = TypeVar('PayloadT')
|
||||||
|
|
||||||
|
|
||||||
class PayloadMsg(
|
# TODO: PayloadMsg
|
||||||
|
class Msg(
|
||||||
Struct,
|
Struct,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
|
|
||||||
|
@ -109,10 +110,6 @@ class PayloadMsg(
|
||||||
pld: Raw
|
pld: Raw
|
||||||
|
|
||||||
|
|
||||||
# TODO: complete rename
|
|
||||||
Msg = PayloadMsg
|
|
||||||
|
|
||||||
|
|
||||||
class Aid(
|
class Aid(
|
||||||
Struct,
|
Struct,
|
||||||
tag=True,
|
tag=True,
|
||||||
|
@ -302,7 +299,7 @@ class StartAck(
|
||||||
|
|
||||||
|
|
||||||
class Started(
|
class Started(
|
||||||
PayloadMsg,
|
Msg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -316,12 +313,12 @@ class Started(
|
||||||
|
|
||||||
# TODO: instead of using our existing `Start`
|
# TODO: instead of using our existing `Start`
|
||||||
# for this (as we did with the original `{'cmd': ..}` style)
|
# for this (as we did with the original `{'cmd': ..}` style)
|
||||||
# class Cancel:
|
# class Cancel(Msg):
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
|
|
||||||
class Yield(
|
class Yield(
|
||||||
PayloadMsg,
|
Msg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -348,7 +345,7 @@ class Stop(
|
||||||
|
|
||||||
# TODO: is `Result` or `Out[come]` a better name?
|
# TODO: is `Result` or `Out[come]` a better name?
|
||||||
class Return(
|
class Return(
|
||||||
PayloadMsg,
|
Msg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -360,7 +357,7 @@ class Return(
|
||||||
|
|
||||||
|
|
||||||
class CancelAck(
|
class CancelAck(
|
||||||
PayloadMsg,
|
Msg,
|
||||||
Generic[PayloadT],
|
Generic[PayloadT],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -466,14 +463,14 @@ def from_dict_msg(
|
||||||
|
|
||||||
# TODO: should be make a msg version of `ContextCancelled?`
|
# TODO: should be make a msg version of `ContextCancelled?`
|
||||||
# and/or with a scope field or a full `ActorCancelled`?
|
# and/or with a scope field or a full `ActorCancelled`?
|
||||||
# class Cancelled(MsgType):
|
# class Cancelled(Msg):
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
# TODO what about overruns?
|
# TODO what about overruns?
|
||||||
# class Overrun(MsgType):
|
# class Overrun(Msg):
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
_runtime_msgs: list[Struct] = [
|
_runtime_msgs: list[Msg] = [
|
||||||
|
|
||||||
# identity handshake on first IPC `Channel` contact.
|
# identity handshake on first IPC `Channel` contact.
|
||||||
Aid,
|
Aid,
|
||||||
|
@ -499,9 +496,9 @@ _runtime_msgs: list[Struct] = [
|
||||||
]
|
]
|
||||||
|
|
||||||
# the no-outcome-yet IAC (inter-actor-communication) sub-set which
|
# the no-outcome-yet IAC (inter-actor-communication) sub-set which
|
||||||
# can be `PayloadMsg.pld` payload field type-limited by application code
|
# can be `Msg.pld` payload field type-limited by application code
|
||||||
# using `apply_codec()` and `limit_msg_spec()`.
|
# using `apply_codec()` and `limit_msg_spec()`.
|
||||||
_payload_msgs: list[PayloadMsg] = [
|
_payload_msgs: list[Msg] = [
|
||||||
# first <value> from `Context.started(<value>)`
|
# first <value> from `Context.started(<value>)`
|
||||||
Started,
|
Started,
|
||||||
|
|
||||||
|
@ -544,8 +541,8 @@ def mk_msg_spec(
|
||||||
] = 'indexed_generics',
|
] = 'indexed_generics',
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
Union[MsgType],
|
Union[Type[Msg]],
|
||||||
list[MsgType],
|
list[Type[Msg]],
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Create a payload-(data-)type-parameterized IPC message specification.
|
Create a payload-(data-)type-parameterized IPC message specification.
|
||||||
|
@ -557,7 +554,7 @@ def mk_msg_spec(
|
||||||
determined by the input `payload_type_union: Union[Type]`.
|
determined by the input `payload_type_union: Union[Type]`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
submsg_types: list[MsgType] = Msg.__subclasses__()
|
submsg_types: list[Type[Msg]] = Msg.__subclasses__()
|
||||||
bases: tuple = (
|
bases: tuple = (
|
||||||
# XXX NOTE XXX the below generic-parameterization seems to
|
# XXX NOTE XXX the below generic-parameterization seems to
|
||||||
# be THE ONLY way to get this to work correctly in terms
|
# be THE ONLY way to get this to work correctly in terms
|
||||||
|
|
Loading…
Reference in New Issue