Compare commits
78 Commits
main
...
POST_final
Author | SHA1 | Date |
---|---|---|
|
d2ad58d5d6 | |
|
f7ca31c0e4 | |
|
e50ba3e8a6 | |
|
5fc64107e5 | |
|
6348c83d28 | |
|
8b8390e83c | |
|
f5c6fc2f02 | |
|
444b9bfc22 | |
|
79e70a9b08 | |
|
23240c31e3 | |
|
6a82bab627 | |
|
b485297411 | |
|
dd23ef1d95 | |
|
2ec3ff46cd | |
|
967d0e4836 | |
|
5ccb36af57 | |
|
28f8546ac5 | |
|
0ff0971aca | |
|
dc1091016b | |
|
69bba30557 | |
|
da9bc1237d | |
|
ab11ee4fbe | |
|
466dce8aed | |
|
808dd9d73c | |
|
aef306465d | |
|
7459a4127c | |
|
fc77e6eca5 | |
|
26526b86c3 | |
|
d079675dd4 | |
|
c2acc4f55c | |
|
326b258fd5 | |
|
4f4c7e6b67 | |
|
c05d08e426 | |
|
02062c5dc0 | |
|
72c4a9d20b | |
|
ccc3b1fce1 | |
|
11c4e65757 | |
|
33ac3ca99f | |
|
9ada628a57 | |
|
d2c3e32bf1 | |
|
51944a0b99 | |
|
024e8015da | |
|
aaed3a4a37 | |
|
edffd5e367 | |
|
4ca81e39e6 | |
|
dd7aca539f | |
|
735dc9056a | |
|
e949839edf | |
|
6194ac891c | |
|
6554e324f2 | |
|
076caeb596 | |
|
faa678e209 | |
|
c5d68f6b58 | |
|
506aefb917 | |
|
7436d52f37 | |
|
80b074e3e7 | |
|
e97efb7099 | |
|
81b11fd665 | |
|
aa2b1fbf8b | |
|
82c12253e5 | |
|
7f451409ec | |
|
9be6f6d3e9 | |
|
9d2c7ae3cf | |
|
a81a1be40c | |
|
c85575e6ce | |
|
aa98cbd848 | |
|
a890e9aa83 | |
|
1592f7e6be | |
|
1c9293e69d | |
|
ec13c1b31d | |
|
7ce366097d | |
|
6cedda008a | |
|
207175d78e | |
|
57b5e51099 | |
|
b72c8dce9b | |
|
bfa4d71009 | |
|
434e22680e | |
|
636c19866c |
|
@ -16,7 +16,6 @@ from tractor import (
|
|||
ContextCancelled,
|
||||
MsgStream,
|
||||
_testing,
|
||||
trionics,
|
||||
)
|
||||
import trio
|
||||
import pytest
|
||||
|
@ -63,8 +62,9 @@ async def recv_and_spawn_net_killers(
|
|||
await ctx.started()
|
||||
async with (
|
||||
ctx.open_stream() as stream,
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
):
|
||||
async for i in stream:
|
||||
print(f'child echoing {i}')
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
import trio
|
||||
import tractor
|
||||
|
||||
|
||||
async def main():
|
||||
async with tractor.open_root_actor(
|
||||
debug_mode=True,
|
||||
loglevel='cancel',
|
||||
) as _root:
|
||||
|
||||
# manually trigger self-cancellation and wait
|
||||
# for it to fully trigger.
|
||||
_root.cancel_soon()
|
||||
await _root._cancel_complete.wait()
|
||||
print('root cancelled')
|
||||
|
||||
# now ensure we can still use the REPL
|
||||
try:
|
||||
await tractor.pause()
|
||||
except trio.Cancelled as _taskc:
|
||||
assert (root_cs := _root._root_tn.cancel_scope).cancel_called
|
||||
# NOTE^^ above logic but inside `open_root_actor()` and
|
||||
# passed to the `shield=` expression is effectively what
|
||||
# we're testing here!
|
||||
await tractor.pause(shield=root_cs.cancel_called)
|
||||
|
||||
# XXX, if shield logic *is wrong* inside `open_root_actor()`'s
|
||||
# crash-handler block this should never be interacted,
|
||||
# instead `trio.Cancelled` would be bubbled up: the original
|
||||
# BUG.
|
||||
assert 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trio.run(main)
|
|
@ -23,8 +23,9 @@ async def main():
|
|||
modules=[__name__]
|
||||
) as portal_map,
|
||||
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
):
|
||||
|
||||
for (name, portal) in portal_map.items():
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
"""
|
||||
That "native" debug mode better work!
|
||||
|
||||
All these tests can be understood (somewhat) by running the
|
||||
equivalent `examples/debugging/` scripts manually.
|
||||
All these tests can be understood (somewhat) by running the equivalent
|
||||
`examples/debugging/` scripts manually.
|
||||
|
||||
TODO:
|
||||
- none of these tests have been run successfully on windows yet but
|
||||
there's been manual testing that verified it works.
|
||||
- wonder if any of it'll work on OS X?
|
||||
- none of these tests have been run successfully on windows yet but
|
||||
there's been manual testing that verified it works.
|
||||
- wonder if any of it'll work on OS X?
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
@ -1156,54 +1156,6 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
|
|||
)
|
||||
|
||||
|
||||
def test_crash_handling_within_cancelled_root_actor(
|
||||
spawn: PexpectSpawner,
|
||||
):
|
||||
'''
|
||||
Ensure that when only a root-actor is started via `open_root_actor()`
|
||||
we can crash-handle in debug-mode despite self-cancellation.
|
||||
|
||||
More-or-less ensures we conditionally shield the pause in
|
||||
`._root.open_root_actor()`'s `await debug._maybe_enter_pm()`
|
||||
call.
|
||||
|
||||
'''
|
||||
child = spawn('root_self_cancelled_w_error')
|
||||
child.expect(PROMPT)
|
||||
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
"Actor.cancel_soon()` was called!",
|
||||
"root cancelled",
|
||||
_pause_msg,
|
||||
"('root'", # actor name
|
||||
]
|
||||
)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(PROMPT)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
_crash_msg,
|
||||
"('root'", # actor name
|
||||
"AssertionError",
|
||||
"assert 0",
|
||||
]
|
||||
)
|
||||
|
||||
child.sendline('c')
|
||||
child.expect(EOF)
|
||||
assert_before(
|
||||
child,
|
||||
[
|
||||
"AssertionError",
|
||||
"assert 0",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
# TODO: better error for "non-ideal" usage from the root actor.
|
||||
# -[ ] if called from an async scope emit a message that suggests
|
||||
# using `await tractor.pause()` instead since it's less overhead
|
||||
|
|
|
@ -18,9 +18,8 @@ from tractor import (
|
|||
@pytest.fixture
|
||||
def bindspace_dir_str() -> str:
|
||||
|
||||
rt_dir: Path = tractor._state.get_rt_dir()
|
||||
bs_dir: Path = rt_dir / 'doggy'
|
||||
bs_dir_str: str = str(bs_dir)
|
||||
bs_dir_str: str = '/run/user/1000/doggy'
|
||||
bs_dir = Path(bs_dir_str)
|
||||
assert not bs_dir.is_dir()
|
||||
|
||||
yield bs_dir_str
|
||||
|
|
|
@ -313,8 +313,9 @@ async def inf_streamer(
|
|||
# `trio.EndOfChannel` doesn't propagate directly to the above
|
||||
# .open_stream() parent, resulting in it also raising instead
|
||||
# of gracefully absorbing as normal.. so how to handle?
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
):
|
||||
async def close_stream_on_sentinel():
|
||||
async for msg in stream:
|
||||
|
|
|
@ -236,10 +236,7 @@ async def stream_forever():
|
|||
async def test_cancel_infinite_streamer(start_method):
|
||||
|
||||
# stream for at most 1 seconds
|
||||
with (
|
||||
trio.fail_after(4),
|
||||
trio.move_on_after(1) as cancel_scope
|
||||
):
|
||||
with trio.move_on_after(1) as cancel_scope:
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
'donny',
|
||||
|
@ -535,15 +532,10 @@ def test_cancel_via_SIGINT_other_task(
|
|||
async def main():
|
||||
# should never timeout since SIGINT should cancel the current program
|
||||
with trio.fail_after(timeout):
|
||||
async with (
|
||||
|
||||
# XXX ?TODO? why no work!?
|
||||
# tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
):
|
||||
await tn.start(spawn_and_sleep_forever)
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as n:
|
||||
await n.start(spawn_and_sleep_forever)
|
||||
if 'mp' in spawn_backend:
|
||||
time.sleep(0.1)
|
||||
os.kill(pid, signal.SIGINT)
|
||||
|
|
|
@ -117,10 +117,9 @@ async def open_actor_local_nursery(
|
|||
ctx: tractor.Context,
|
||||
):
|
||||
global _nursery
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn:
|
||||
_nursery = tn
|
||||
await ctx.started()
|
||||
await trio.sleep(10)
|
||||
|
|
|
@ -11,7 +11,6 @@ import psutil
|
|||
import pytest
|
||||
import subprocess
|
||||
import tractor
|
||||
from tractor.trionics import collapse_eg
|
||||
from tractor._testing import tractor_test
|
||||
import trio
|
||||
|
||||
|
@ -194,10 +193,10 @@ async def spawn_and_check_registry(
|
|||
|
||||
try:
|
||||
async with tractor.open_nursery() as an:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as trion,
|
||||
):
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as trion:
|
||||
|
||||
portals = {}
|
||||
for i in range(3):
|
||||
name = f'a{i}'
|
||||
|
@ -339,12 +338,11 @@ async def close_chans_before_nursery(
|
|||
async with portal2.open_stream_from(
|
||||
stream_forever
|
||||
) as agen2:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
tn.start_soon(streamer, agen1)
|
||||
tn.start_soon(cancel, use_signal, .5)
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as n:
|
||||
n.start_soon(streamer, agen1)
|
||||
n.start_soon(cancel, use_signal, .5)
|
||||
try:
|
||||
await streamer(agen2)
|
||||
finally:
|
||||
|
|
|
@ -234,8 +234,10 @@ async def trio_ctx(
|
|||
with trio.fail_after(1 + delay):
|
||||
try:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
trio.open_nursery(
|
||||
# TODO, for new `trio` / py3.13
|
||||
# strict_exception_groups=False,
|
||||
) as tn,
|
||||
tractor.to_asyncio.open_channel_from(
|
||||
sleep_and_err,
|
||||
) as (first, chan),
|
||||
|
|
|
@ -235,16 +235,10 @@ async def cancel_after(wait, reg_addr):
|
|||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def time_quad_ex(
|
||||
reg_addr: tuple,
|
||||
ci_env: bool,
|
||||
spawn_backend: str,
|
||||
):
|
||||
def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
||||
if spawn_backend == 'mp':
|
||||
'''
|
||||
no idea but the mp *nix runs are flaking out here often...
|
||||
|
||||
'''
|
||||
"""no idea but the mp *nix runs are flaking out here often...
|
||||
"""
|
||||
pytest.skip("Test is too flaky on mp in CI")
|
||||
|
||||
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
|
||||
|
@ -255,24 +249,12 @@ def time_quad_ex(
|
|||
return results, diff
|
||||
|
||||
|
||||
def test_a_quadruple_example(
|
||||
time_quad_ex: tuple,
|
||||
ci_env: bool,
|
||||
spawn_backend: str,
|
||||
):
|
||||
'''
|
||||
This also serves as a kind of "we'd like to be this fast test".
|
||||
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
|
||||
"""This also serves as a kind of "we'd like to be this fast test"."""
|
||||
|
||||
'''
|
||||
results, diff = time_quad_ex
|
||||
assert results
|
||||
this_fast = (
|
||||
6 if platform.system() in (
|
||||
'Windows',
|
||||
'Darwin',
|
||||
)
|
||||
else 3
|
||||
)
|
||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
|
||||
assert diff < this_fast
|
||||
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ from contextlib import (
|
|||
)
|
||||
|
||||
import pytest
|
||||
from tractor.trionics import collapse_eg
|
||||
import trio
|
||||
from trio import TaskStatus
|
||||
|
||||
|
@ -65,8 +64,9 @@ def test_stashed_child_nursery(use_start_soon):
|
|||
async def main():
|
||||
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as pn,
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as pn,
|
||||
):
|
||||
cn = await pn.start(mk_child_nursery)
|
||||
assert cn
|
||||
|
@ -117,11 +117,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
tractor.trionics.maybe_raise_from_masking_exc(
|
||||
tn=tn,
|
||||
unmask_from=(
|
||||
trio.Cancelled
|
||||
if unmask_from_canc
|
||||
else None
|
||||
(trio.Cancelled,) if unmask_from_canc
|
||||
else ()
|
||||
),
|
||||
)
|
||||
):
|
||||
|
@ -136,8 +134,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
with tractor.devx.maybe_open_crash_handler(
|
||||
pdb=debug_mode,
|
||||
) as bxerr:
|
||||
if bxerr:
|
||||
assert not bxerr.value
|
||||
assert not bxerr.value
|
||||
|
||||
async with (
|
||||
wraps_tn_that_always_cancels() as tn,
|
||||
|
@ -145,11 +142,12 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
assert not tn.cancel_scope.cancel_called
|
||||
assert 0
|
||||
|
||||
assert (
|
||||
(err := bxerr.value)
|
||||
and
|
||||
type(err) is AssertionError
|
||||
)
|
||||
if debug_mode:
|
||||
assert (
|
||||
(err := bxerr.value)
|
||||
and
|
||||
type(err) is AssertionError
|
||||
)
|
||||
|
||||
with pytest.raises(ExceptionGroup) as excinfo:
|
||||
trio.run(_main)
|
||||
|
@ -197,8 +195,10 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
|||
async with (
|
||||
# XXX should ensure ONLY the KBI
|
||||
# is relayed upward
|
||||
collapse_eg(),
|
||||
trio.open_nursery(), # as tn,
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery(
|
||||
# strict_exception_groups=False,
|
||||
), # as tn,
|
||||
|
||||
trionics.gather_contexts([
|
||||
open_memchan(),
|
||||
|
|
|
@ -478,14 +478,16 @@ async def open_root_actor(
|
|||
|
||||
# start runtime in a bg sub-task, yield to caller.
|
||||
async with (
|
||||
collapse_eg(),
|
||||
collapse_eg(
|
||||
bp=True,
|
||||
hide_tb=False,
|
||||
),
|
||||
trio.open_nursery() as root_tn,
|
||||
|
||||
# ?TODO? finally-footgun below?
|
||||
# XXX, finally-footgun below?
|
||||
# -> see note on why shielding.
|
||||
# maybe_raise_from_masking_exc(),
|
||||
):
|
||||
actor._root_tn = root_tn
|
||||
# `_runtime.async_main()` creates an internal nursery
|
||||
# and blocks here until any underlying actor(-process)
|
||||
# tree has terminated thereby conducting so called
|
||||
|
@ -529,6 +531,7 @@ async def open_root_actor(
|
|||
# crashes under cancellation conditions; so
|
||||
# most of them!
|
||||
shield=root_tn.cancel_scope.cancel_called,
|
||||
# ^TODO? write a (debugger) test for this ya?
|
||||
)
|
||||
|
||||
if (
|
||||
|
@ -568,7 +571,6 @@ async def open_root_actor(
|
|||
f'{op_nested_actor_repr}'
|
||||
)
|
||||
# XXX, THIS IS A *finally-footgun*!
|
||||
# (also mentioned in with-block above)
|
||||
# -> though already shields iternally it can
|
||||
# taskc here and mask underlying errors raised in
|
||||
# the try-block above?
|
||||
|
|
|
@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
|
|||
try:
|
||||
yield # run RPC invoke body
|
||||
|
||||
except TransportClosed:
|
||||
log.exception('Tpt disconnect during remote-exc relay?')
|
||||
raise
|
||||
|
||||
# box and ship RPC errors for wire-transit via
|
||||
# the task's requesting parent IPC-channel.
|
||||
except (
|
||||
|
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
|
|||
and debug_kbis
|
||||
)
|
||||
)
|
||||
# TODO? better then `debug_filter` below?
|
||||
# and
|
||||
# not isinstance(err, TransportClosed)
|
||||
):
|
||||
# XXX QUESTION XXX: is there any case where we'll
|
||||
# want to debug IPC disconnects as a default?
|
||||
|
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
|
|||
# recovery logic - the only case is some kind of
|
||||
# strange bug in our transport layer itself? Going
|
||||
# to keep this open ended for now.
|
||||
log.debug(
|
||||
'RPC task crashed, attempting to enter debugger\n'
|
||||
f'|_{ctx}'
|
||||
)
|
||||
|
||||
if _state.debug_mode():
|
||||
log.exception(
|
||||
f'RPC task crashed!\n'
|
||||
f'Attempting to enter debugger\n'
|
||||
f'\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
|
||||
entered_debug = await debug._maybe_enter_pm(
|
||||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
|
||||
# don't REPL any psuedo-expected tpt-disconnect
|
||||
# debug_filter=lambda exc: (
|
||||
# type (exc) not in {
|
||||
# TransportClosed,
|
||||
# }
|
||||
# ),
|
||||
)
|
||||
if not entered_debug:
|
||||
# if we prolly should have entered the REPL but
|
||||
|
@ -384,7 +403,7 @@ async def _errors_relayed_via_ipc(
|
|||
|
||||
# RPC task bookeeping.
|
||||
# since RPC tasks are scheduled inside a flat
|
||||
# `Actor._service_tn`, we add "handles" to each such that
|
||||
# `Actor._service_n`, we add "handles" to each such that
|
||||
# they can be individually ccancelled.
|
||||
finally:
|
||||
|
||||
|
@ -450,7 +469,7 @@ async def _invoke(
|
|||
kwargs: dict[str, Any],
|
||||
|
||||
is_rpc: bool = True,
|
||||
hide_tb: bool = True,
|
||||
hide_tb: bool = False,
|
||||
return_msg_type: Return|CancelAck = Return,
|
||||
|
||||
task_status: TaskStatus[
|
||||
|
@ -462,7 +481,7 @@ async def _invoke(
|
|||
connected IPC channel.
|
||||
|
||||
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
||||
remotely invoked function, normally in `Actor._service_tn: Nursery`.
|
||||
remotely invoked function, normally in `Actor._service_n: Nursery`.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
@ -654,8 +673,7 @@ async def _invoke(
|
|||
# scope ensures unasking of the `await coro` below
|
||||
# *should* never be interfered with!!
|
||||
maybe_raise_from_masking_exc(
|
||||
tn=tn,
|
||||
unmask_from=Cancelled,
|
||||
unmask_from=(Cancelled,),
|
||||
) as _mbme, # maybe boxed masked exc
|
||||
):
|
||||
ctx._scope_nursery = tn
|
||||
|
@ -675,7 +693,20 @@ async def _invoke(
|
|||
f'\n'
|
||||
f'{pretty_struct.pformat(return_msg)}\n'
|
||||
)
|
||||
await chan.send(return_msg)
|
||||
try:
|
||||
await chan.send(return_msg)
|
||||
except TransportClosed:
|
||||
log.exception(
|
||||
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
|
||||
f'\n'
|
||||
f'{chan}\n'
|
||||
f'Channel already disconnected ??\n'
|
||||
f'\n'
|
||||
f'{pretty_struct.pformat(return_msg)}'
|
||||
)
|
||||
# ?TODO? will this ever be true though?
|
||||
if chan.connected():
|
||||
raise
|
||||
|
||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||
# called by any of,
|
||||
|
@ -936,7 +967,7 @@ async def process_messages(
|
|||
|
||||
Receive (multiplexed) per-`Channel` RPC requests as msgs from
|
||||
remote processes; schedule target async funcs as local
|
||||
`trio.Task`s inside the `Actor._service_tn: Nursery`.
|
||||
`trio.Task`s inside the `Actor._service_n: Nursery`.
|
||||
|
||||
Depending on msg type, non-`cmd` (task spawning/starting)
|
||||
request payloads (eg. `started`, `yield`, `return`, `error`)
|
||||
|
@ -961,7 +992,7 @@ async def process_messages(
|
|||
|
||||
'''
|
||||
actor: Actor = _state.current_actor()
|
||||
assert actor._service_tn # runtime state sanity
|
||||
assert actor._service_n # runtime state sanity
|
||||
|
||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||
# should use it?
|
||||
|
@ -1172,7 +1203,7 @@ async def process_messages(
|
|||
start_status += '->( scheduling new task..\n'
|
||||
log.runtime(start_status)
|
||||
try:
|
||||
ctx: Context = await actor._service_tn.start(
|
||||
ctx: Context = await actor._service_n.start(
|
||||
partial(
|
||||
_invoke,
|
||||
actor,
|
||||
|
@ -1312,7 +1343,7 @@ async def process_messages(
|
|||
) as err:
|
||||
|
||||
if nursery_cancelled_before_task:
|
||||
sn: Nursery = actor._service_tn
|
||||
sn: Nursery = actor._service_n
|
||||
assert sn and sn.cancel_scope.cancel_called # sanity
|
||||
log.cancel(
|
||||
f'Service nursery cancelled before it handled {funcname}'
|
||||
|
|
|
@ -35,15 +35,6 @@ for running all lower level spawning, supervision and msging layers:
|
|||
SC-transitive RPC via scheduling of `trio` tasks.
|
||||
- registration of newly spawned actors with the discovery sys.
|
||||
|
||||
Glossary:
|
||||
--------
|
||||
- tn: a `trio.Nursery` or "task nursery".
|
||||
- an: an `ActorNursery` or "actor nursery".
|
||||
- root: top/parent-most scope/task/process/actor (or other runtime
|
||||
primitive) in a hierarchical tree.
|
||||
- parent-ish: "higher-up" in the runtime-primitive hierarchy.
|
||||
- child-ish: "lower-down" in the runtime-primitive hierarchy.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import (
|
||||
|
@ -85,7 +76,6 @@ from tractor.msg import (
|
|||
)
|
||||
from .trionics import (
|
||||
collapse_eg,
|
||||
maybe_open_nursery,
|
||||
)
|
||||
from .ipc import (
|
||||
Channel,
|
||||
|
@ -183,11 +173,9 @@ class Actor:
|
|||
|
||||
msg_buffer_size: int = 2**6
|
||||
|
||||
# nursery placeholders filled in by `async_main()`,
|
||||
# - after fork for subactors.
|
||||
# - during boot for the root actor.
|
||||
_root_tn: Nursery|None = None
|
||||
_service_tn: Nursery|None = None
|
||||
# nursery placeholders filled in by `async_main()` after fork
|
||||
_service_n: Nursery|None = None
|
||||
|
||||
_ipc_server: _server.IPCServer|None = None
|
||||
|
||||
@property
|
||||
|
@ -1021,48 +1009,12 @@ class Actor:
|
|||
the RPC service nursery.
|
||||
|
||||
'''
|
||||
actor_repr: str = _pformat.nest_from_op(
|
||||
input_op='>c(',
|
||||
text=self.pformat(),
|
||||
nest_indent=1,
|
||||
)
|
||||
log.cancel(
|
||||
'Actor.cancel_soon()` was called!\n'
|
||||
f'>> scheduling `Actor.cancel()`\n'
|
||||
f'{actor_repr}'
|
||||
)
|
||||
assert self._service_tn
|
||||
self._service_tn.start_soon(
|
||||
assert self._service_n
|
||||
self._service_n.start_soon(
|
||||
self.cancel,
|
||||
None, # self cancel all rpc tasks
|
||||
)
|
||||
|
||||
# schedule a "canceller task" in the `._root_tn` once the
|
||||
# `._service_tn` is fully shutdown; task waits for child-ish
|
||||
# scopes to fully exit then finally cancels its parent,
|
||||
# root-most, scope.
|
||||
async def cancel_root_tn_after_services():
|
||||
log.runtime(
|
||||
'Waiting on service-tn to cancel..\n'
|
||||
f'c>)\n'
|
||||
f'|_{self._service_tn.cancel_scope!r}\n'
|
||||
)
|
||||
await self._cancel_complete.wait()
|
||||
log.cancel(
|
||||
f'`._service_tn` cancelled\n'
|
||||
f'>c)\n'
|
||||
f'|_{self._service_tn.cancel_scope!r}\n'
|
||||
f'\n'
|
||||
f'>> cancelling `._root_tn`\n'
|
||||
f'c>(\n'
|
||||
f' |_{self._root_tn.cancel_scope!r}\n'
|
||||
)
|
||||
self._root_tn.cancel_scope.cancel()
|
||||
|
||||
self._root_tn.start_soon(
|
||||
cancel_root_tn_after_services
|
||||
)
|
||||
|
||||
@property
|
||||
def cancel_complete(self) -> bool:
|
||||
return self._cancel_complete.is_set()
|
||||
|
@ -1167,8 +1119,8 @@ class Actor:
|
|||
await ipc_server.wait_for_shutdown()
|
||||
|
||||
# cancel all rpc tasks permanently
|
||||
if self._service_tn:
|
||||
self._service_tn.cancel_scope.cancel()
|
||||
if self._service_n:
|
||||
self._service_n.cancel_scope.cancel()
|
||||
|
||||
log_meth(msg)
|
||||
self._cancel_complete.set()
|
||||
|
@ -1305,7 +1257,7 @@ class Actor:
|
|||
'''
|
||||
Cancel all ongoing RPC tasks owned/spawned for a given
|
||||
`parent_chan: Channel` or simply all tasks (inside
|
||||
`._service_tn`) when `parent_chan=None`.
|
||||
`._service_n`) when `parent_chan=None`.
|
||||
|
||||
'''
|
||||
tasks: dict = self._rpc_tasks
|
||||
|
@ -1517,55 +1469,46 @@ async def async_main(
|
|||
accept_addrs.append(addr.unwrap())
|
||||
|
||||
assert accept_addrs
|
||||
|
||||
ya_root_tn: bool = bool(actor._root_tn)
|
||||
ya_service_tn: bool = bool(actor._service_tn)
|
||||
|
||||
# NOTE, a top-most "root" nursery in each actor-process
|
||||
# enables a lifetime priority for the IPC-channel connection
|
||||
# with a sub-actor's immediate parent. I.e. this connection
|
||||
# is kept alive as a resilient service connection until all
|
||||
# other machinery has exited, cancellation of all
|
||||
# embedded/child scopes have completed. This helps ensure
|
||||
# a deterministic (and thus "graceful")
|
||||
# first-class-supervision style teardown where a parent actor
|
||||
# (vs. say peers) is always the last to be contacted before
|
||||
# disconnect.
|
||||
# The "root" nursery ensures the channel with the immediate
|
||||
# parent is kept alive as a resilient service until
|
||||
# cancellation steps have (mostly) occurred in
|
||||
# a deterministic way.
|
||||
root_tn: trio.Nursery
|
||||
async with (
|
||||
collapse_eg(),
|
||||
maybe_open_nursery(
|
||||
nursery=actor._root_tn,
|
||||
) as root_tn,
|
||||
trio.open_nursery() as root_tn,
|
||||
):
|
||||
if ya_root_tn:
|
||||
assert root_tn is actor._root_tn
|
||||
else:
|
||||
actor._root_tn = root_tn
|
||||
# actor._root_n = root_tn
|
||||
# assert actor._root_n
|
||||
|
||||
ipc_server: _server.IPCServer
|
||||
async with (
|
||||
collapse_eg(),
|
||||
maybe_open_nursery(
|
||||
nursery=actor._service_tn,
|
||||
) as service_tn,
|
||||
trio.open_nursery() as service_nursery,
|
||||
_server.open_ipc_server(
|
||||
parent_tn=service_tn, # ?TODO, why can't this be the root-tn
|
||||
stream_handler_tn=service_tn,
|
||||
parent_tn=service_nursery,
|
||||
stream_handler_tn=service_nursery,
|
||||
) as ipc_server,
|
||||
# ) as actor._ipc_server,
|
||||
# ^TODO? prettier?
|
||||
|
||||
):
|
||||
if ya_service_tn:
|
||||
assert service_tn is actor._service_tn
|
||||
else:
|
||||
# This nursery is used to handle all inbound
|
||||
# connections to us such that if the TCP server
|
||||
# is killed, connections can continue to process
|
||||
# in the background until this nursery is cancelled.
|
||||
actor._service_tn = service_tn
|
||||
|
||||
# set after allocate
|
||||
# This nursery is used to handle all inbound
|
||||
# connections to us such that if the TCP server
|
||||
# is killed, connections can continue to process
|
||||
# in the background until this nursery is cancelled.
|
||||
actor._service_n = service_nursery
|
||||
actor._ipc_server = ipc_server
|
||||
assert (
|
||||
actor._service_n
|
||||
and (
|
||||
actor._service_n
|
||||
is
|
||||
actor._ipc_server._parent_tn
|
||||
is
|
||||
ipc_server._stream_handler_tn
|
||||
)
|
||||
)
|
||||
|
||||
# load exposed/allowed RPC modules
|
||||
# XXX: do this **after** establishing a channel to the parent
|
||||
|
@ -1591,11 +1534,10 @@ async def async_main(
|
|||
# - root actor: the ``accept_addr`` passed to this method
|
||||
|
||||
# TODO: why is this not with the root nursery?
|
||||
# - see above that the `._service_tn` is what's used?
|
||||
try:
|
||||
eps: list = await ipc_server.listen_on(
|
||||
accept_addrs=accept_addrs,
|
||||
stream_handler_nursery=service_tn,
|
||||
stream_handler_nursery=service_nursery,
|
||||
)
|
||||
log.runtime(
|
||||
f'Booted IPC server\n'
|
||||
|
@ -1603,7 +1545,7 @@ async def async_main(
|
|||
)
|
||||
assert (
|
||||
(eps[0].listen_tn)
|
||||
is not service_tn
|
||||
is not service_nursery
|
||||
)
|
||||
|
||||
except OSError as oserr:
|
||||
|
@ -1765,7 +1707,7 @@ async def async_main(
|
|||
|
||||
# XXX TODO but hard XXX
|
||||
# we can't actually do this bc the debugger uses the
|
||||
# _service_tn to spawn the lock task, BUT, in theory if we had
|
||||
# _service_n to spawn the lock task, BUT, in theory if we had
|
||||
# the root nursery surround this finally block it might be
|
||||
# actually possible to debug THIS machinery in the same way
|
||||
# as user task code?
|
||||
|
|
|
@ -297,23 +297,6 @@ async def hard_kill(
|
|||
# zombies (as a feature) we ask the OS to do send in the
|
||||
# removal swad as the last resort.
|
||||
if cs.cancelled_caught:
|
||||
|
||||
# TODO? attempt at intermediary-rent-sub
|
||||
# with child in debug lock?
|
||||
# |_https://github.com/goodboy/tractor/issues/320
|
||||
#
|
||||
# if not is_root_process():
|
||||
# log.warning(
|
||||
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
|
||||
# )
|
||||
# with trio.CancelScope(shield=True):
|
||||
# async with debug.acquire_debug_lock(
|
||||
# subactor_uid=current_actor().uid,
|
||||
# ) as _ctx:
|
||||
# log.warning(
|
||||
# 'Acquired debug lock, child ready to be killed ??\n'
|
||||
# )
|
||||
|
||||
# TODO: toss in the skynet-logo face as ascii art?
|
||||
log.critical(
|
||||
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
|
||||
|
|
|
@ -468,7 +468,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
|
||||
# errors from this daemon actor nursery bubble up to caller
|
||||
async with (
|
||||
collapse_eg(),
|
||||
# collapse_eg(),
|
||||
collapse_eg(
|
||||
bp=True,
|
||||
hide_tb=False,
|
||||
),
|
||||
trio.open_nursery() as da_nursery,
|
||||
):
|
||||
try:
|
||||
|
@ -481,7 +485,11 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# As such if the strategy propagates any error(s) upwards
|
||||
# the above "daemon actor" nursery will be notified.
|
||||
async with (
|
||||
collapse_eg(),
|
||||
# collapse_eg(),
|
||||
collapse_eg(
|
||||
bp=True,
|
||||
hide_tb=False,
|
||||
),
|
||||
trio.open_nursery() as ria_nursery,
|
||||
):
|
||||
an = ActorNursery(
|
||||
|
@ -621,11 +629,17 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
|
||||
# use `BaseExceptionGroup` as needed
|
||||
if len(errors) > 1:
|
||||
raise BaseExceptionGroup(
|
||||
beg = BaseExceptionGroup(
|
||||
'tractor.ActorNursery errored with',
|
||||
tuple(errors.values()),
|
||||
)
|
||||
beg.add_note(
|
||||
'This beg was created from an actor-nursery!\n'
|
||||
)
|
||||
await debug.pause(shield=True)
|
||||
raise beg
|
||||
else:
|
||||
# await debug.pause(shield=True)
|
||||
raise list(errors.values())[0]
|
||||
|
||||
# show frame on any (likely) internal error
|
||||
|
@ -643,9 +657,8 @@ _shutdown_msg: str = (
|
|||
'Actor-runtime-shutdown'
|
||||
)
|
||||
|
||||
|
||||
@acm
|
||||
# @api_frame
|
||||
@acm
|
||||
async def open_nursery(
|
||||
*, # named params only!
|
||||
hide_tb: bool = True,
|
||||
|
@ -684,16 +697,30 @@ async def open_nursery(
|
|||
# mark us for teardown on exit
|
||||
implicit_runtime: bool = True
|
||||
|
||||
async with open_root_actor(
|
||||
hide_tb=hide_tb,
|
||||
**kwargs,
|
||||
) as actor:
|
||||
async with (
|
||||
# collapse_eg(),
|
||||
collapse_eg(
|
||||
bp=True,
|
||||
hide_tb=False,
|
||||
),
|
||||
open_root_actor(
|
||||
hide_tb=hide_tb,
|
||||
**kwargs,
|
||||
) as actor
|
||||
):
|
||||
assert actor is current_actor()
|
||||
|
||||
try:
|
||||
async with _open_and_supervise_one_cancels_all_nursery(
|
||||
actor
|
||||
) as an:
|
||||
async with (
|
||||
# collapse_eg(),
|
||||
collapse_eg(
|
||||
bp=True,
|
||||
hide_tb=False,
|
||||
),
|
||||
_open_and_supervise_one_cancels_all_nursery(
|
||||
actor
|
||||
) as an,
|
||||
):
|
||||
|
||||
# NOTE: mark this nursery as having
|
||||
# implicitly started the root actor so
|
||||
|
|
|
@ -481,12 +481,12 @@ async def _pause(
|
|||
# we have to figure out how to avoid having the service nursery
|
||||
# cancel on this task start? I *think* this works below:
|
||||
# ```python
|
||||
# actor._service_tn.cancel_scope.shield = shield
|
||||
# actor._service_n.cancel_scope.shield = shield
|
||||
# ```
|
||||
# but not entirely sure if that's a sane way to implement it?
|
||||
|
||||
# NOTE currently we spawn the lock request task inside this
|
||||
# subactor's global `Actor._service_tn` so that the
|
||||
# subactor's global `Actor._service_n` so that the
|
||||
# lifetime of the lock-request can outlive the current
|
||||
# `._pause()` scope while the user steps through their
|
||||
# application code and when they finally exit the
|
||||
|
@ -510,7 +510,7 @@ async def _pause(
|
|||
f'|_{task}\n'
|
||||
)
|
||||
with trio.CancelScope(shield=shield):
|
||||
req_ctx: Context = await actor._service_tn.start(
|
||||
req_ctx: Context = await actor._service_n.start(
|
||||
partial(
|
||||
request_root_stdio_lock,
|
||||
actor_uid=actor.uid,
|
||||
|
@ -544,7 +544,7 @@ async def _pause(
|
|||
_repl_fail_report = None
|
||||
|
||||
# when the actor is mid-runtime cancellation the
|
||||
# `Actor._service_tn` might get closed before we can spawn
|
||||
# `Actor._service_n` might get closed before we can spawn
|
||||
# the request task, so just ignore expected RTE.
|
||||
elif (
|
||||
isinstance(pause_err, RuntimeError)
|
||||
|
@ -561,6 +561,9 @@ async def _pause(
|
|||
return
|
||||
|
||||
elif isinstance(pause_err, trio.Cancelled):
|
||||
__tracebackhide__: bool = False
|
||||
# XXX, unmask to REPL it.
|
||||
# mk_pdb().set_trace(frame=inspect.currentframe())
|
||||
_repl_fail_report += (
|
||||
'You called `tractor.pause()` from an already cancelled scope!\n\n'
|
||||
'Consider `await tractor.pause(shield=True)` to make it work B)\n'
|
||||
|
@ -989,7 +992,7 @@ def pause_from_sync(
|
|||
# that output and assign the `repl` created above!
|
||||
bg_task, _ = trio.from_thread.run(
|
||||
afn=partial(
|
||||
actor._service_tn.start,
|
||||
actor._service_n.start,
|
||||
partial(
|
||||
_pause_from_bg_root_thread,
|
||||
behalf_of_thread=thread,
|
||||
|
|
|
@ -17,59 +17,36 @@
|
|||
Utils to tame mp non-SC madeness
|
||||
|
||||
'''
|
||||
import platform
|
||||
|
||||
|
||||
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
|
||||
# a flag,
|
||||
# - [ ] soo if it works like this, drop this module entirely for
|
||||
# 3.13+ B)
|
||||
# |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
|
||||
#
|
||||
def disable_mantracker():
|
||||
'''
|
||||
Disable all `multiprocessing` "resource tracking" machinery since
|
||||
it's an absolute multi-threaded mess of non-SC madness.
|
||||
|
||||
'''
|
||||
from multiprocessing.shared_memory import SharedMemory
|
||||
from multiprocessing import resource_tracker as mantracker
|
||||
|
||||
# Tell the "resource tracker" thing to fuck off.
|
||||
class ManTracker(mantracker.ResourceTracker):
|
||||
def register(self, name, rtype):
|
||||
pass
|
||||
|
||||
# 3.13+ only.. can pass `track=False` to disable
|
||||
# all the resource tracker bs.
|
||||
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
|
||||
if (_py_313 := (
|
||||
platform.python_version_tuple()[:-1]
|
||||
>=
|
||||
('3', '13')
|
||||
)
|
||||
):
|
||||
from functools import partial
|
||||
return partial(
|
||||
SharedMemory,
|
||||
track=False,
|
||||
)
|
||||
def unregister(self, name, rtype):
|
||||
pass
|
||||
|
||||
# !TODO, once we drop 3.12- we can obvi remove all this!
|
||||
else:
|
||||
from multiprocessing import (
|
||||
resource_tracker as mantracker,
|
||||
)
|
||||
def ensure_running(self):
|
||||
pass
|
||||
|
||||
# Tell the "resource tracker" thing to fuck off.
|
||||
class ManTracker(mantracker.ResourceTracker):
|
||||
def register(self, name, rtype):
|
||||
pass
|
||||
|
||||
def unregister(self, name, rtype):
|
||||
pass
|
||||
|
||||
def ensure_running(self):
|
||||
pass
|
||||
|
||||
# "know your land and know your prey"
|
||||
# https://www.dailymotion.com/video/x6ozzco
|
||||
mantracker._resource_tracker = ManTracker()
|
||||
mantracker.register = mantracker._resource_tracker.register
|
||||
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||
|
||||
# use std type verbatim
|
||||
shmT = SharedMemory
|
||||
|
||||
return shmT
|
||||
# "know your land and know your prey"
|
||||
# https://www.dailymotion.com/video/x6ozzco
|
||||
mantracker._resource_tracker = ManTracker()
|
||||
mantracker.register = mantracker._resource_tracker.register
|
||||
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||
|
|
|
@ -1001,11 +1001,7 @@ class Server(Struct):
|
|||
partial(
|
||||
_serve_ipc_eps,
|
||||
server=self,
|
||||
stream_handler_tn=(
|
||||
stream_handler_nursery
|
||||
or
|
||||
self._stream_handler_tn
|
||||
),
|
||||
stream_handler_tn=stream_handler_nursery,
|
||||
listen_addrs=accept_addrs,
|
||||
)
|
||||
)
|
||||
|
@ -1149,17 +1145,13 @@ async def open_ipc_server(
|
|||
|
||||
async with maybe_open_nursery(
|
||||
nursery=parent_tn,
|
||||
) as parent_tn:
|
||||
) as rent_tn:
|
||||
no_more_peers = trio.Event()
|
||||
no_more_peers.set()
|
||||
|
||||
ipc_server = IPCServer(
|
||||
_parent_tn=parent_tn,
|
||||
_stream_handler_tn=(
|
||||
stream_handler_tn
|
||||
or
|
||||
parent_tn
|
||||
),
|
||||
_parent_tn=rent_tn,
|
||||
_stream_handler_tn=stream_handler_tn or rent_tn,
|
||||
_no_more_peers=no_more_peers,
|
||||
)
|
||||
try:
|
||||
|
|
|
@ -23,15 +23,14 @@ considered optional within the context of this runtime-library.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from multiprocessing import shared_memory as shm
|
||||
from multiprocessing.shared_memory import (
|
||||
# SharedMemory,
|
||||
ShareableList,
|
||||
)
|
||||
import platform
|
||||
from sys import byteorder
|
||||
import time
|
||||
from typing import Optional
|
||||
from multiprocessing import shared_memory as shm
|
||||
from multiprocessing.shared_memory import (
|
||||
SharedMemory,
|
||||
ShareableList,
|
||||
)
|
||||
|
||||
from msgspec import (
|
||||
Struct,
|
||||
|
@ -62,7 +61,7 @@ except ImportError:
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
SharedMemory = disable_mantracker()
|
||||
disable_mantracker()
|
||||
|
||||
|
||||
class SharedInt:
|
||||
|
@ -798,15 +797,8 @@ def open_shm_list(
|
|||
# "close" attached shm on actor teardown
|
||||
try:
|
||||
actor = tractor.current_actor()
|
||||
|
||||
actor.lifetime_stack.callback(shml.shm.close)
|
||||
|
||||
# XXX on 3.13+ we don't need to call this?
|
||||
# -> bc we pass `track=False` for `SharedMemeory` orr?
|
||||
if (
|
||||
platform.python_version_tuple()[:-1] < ('3', '13')
|
||||
):
|
||||
actor.lifetime_stack.callback(shml.shm.unlink)
|
||||
actor.lifetime_stack.callback(shml.shm.unlink)
|
||||
except RuntimeError:
|
||||
log.warning('tractor runtime not active, skipping teardown steps')
|
||||
|
||||
|
|
|
@ -430,7 +430,6 @@ class MsgpackTransport(MsgTransport):
|
|||
return await self.stream.send_all(size + bytes_data)
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
) as _re:
|
||||
trans_err = _re
|
||||
tpt_name: str = f'{type(self).__name__!r}'
|
||||
|
@ -459,22 +458,6 @@ class MsgpackTransport(MsgTransport):
|
|||
)
|
||||
raise tpt_closed from trans_err
|
||||
|
||||
# case trio.ClosedResourceError() if (
|
||||
# 'this socket was already closed'
|
||||
# in
|
||||
# trans_err.args[0]
|
||||
# ):
|
||||
# tpt_closed = TransportClosed.from_src_exc(
|
||||
# message=(
|
||||
# f'{tpt_name} already closed by peer\n'
|
||||
# ),
|
||||
# body=f'{self}\n',
|
||||
# src_exc=trans_err,
|
||||
# raise_on_report=True,
|
||||
# loglevel='transport',
|
||||
# )
|
||||
# raise tpt_closed from trans_err
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
# about it.
|
||||
|
|
|
@ -215,7 +215,7 @@ class LinkedTaskChannel(
|
|||
val: Any = None,
|
||||
) -> None:
|
||||
'''
|
||||
Synchronize aio-side with its trio-parent.
|
||||
Synchronize aio-sde with its trio-parent.
|
||||
|
||||
'''
|
||||
self._aio_started_val = val
|
||||
|
@ -459,22 +459,14 @@ def _run_asyncio_task(
|
|||
f'Task exited with final result: {result!r}\n'
|
||||
)
|
||||
|
||||
# XXX ALWAYS close the child-`asyncio`-task-side's
|
||||
# `to_trio` handle which will in turn relay
|
||||
# a `trio.EndOfChannel` to the `trio`-parent.
|
||||
# Consequently the parent `trio` task MUST ALWAYS
|
||||
# check for any `chan._aio_err` to be raised when it
|
||||
# receives an EoC.
|
||||
#
|
||||
# NOTE, there are 2 EoC cases,
|
||||
# - normal/graceful EoC due to the aio-side actually
|
||||
# terminating its "streaming", but the task did not
|
||||
# error and is not yet complete.
|
||||
#
|
||||
# - the aio-task terminated and we specially mark the
|
||||
# closure as due to the `asyncio.Task`'s exit.
|
||||
# only close the aio (child) side which will relay
|
||||
# a `trio.EndOfChannel` to the trio (parent) side.
|
||||
#
|
||||
# XXX NOTE, that trio-side MUST then in such cases
|
||||
# check for a `chan._aio_err` and raise it!!
|
||||
to_trio.close()
|
||||
# specially mark the closure as due to the
|
||||
# asyncio.Task terminating!
|
||||
chan._closed_by_aio_task = True
|
||||
|
||||
aio_task_complete.set()
|
||||
|
@ -854,6 +846,8 @@ async def translate_aio_errors(
|
|||
chan._trio_to_raise = aio_err
|
||||
trio_err = chan._trio_err = eoc
|
||||
#
|
||||
# await tractor.pause(shield=True)
|
||||
#
|
||||
# ?TODO?, raise something like a,
|
||||
# chan._trio_to_raise = AsyncioErrored()
|
||||
# BUT, with the tb rewritten to reflect the underlying
|
||||
|
|
|
@ -204,7 +204,7 @@ class _Cache:
|
|||
a kept-alive-while-in-use async resource.
|
||||
|
||||
'''
|
||||
service_tn: Optional[trio.Nursery] = None
|
||||
service_n: Optional[trio.Nursery] = None
|
||||
locks: dict[Hashable, trio.Lock] = {}
|
||||
users: int = 0
|
||||
values: dict[Any, Any] = {}
|
||||
|
@ -223,16 +223,18 @@ class _Cache:
|
|||
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
async with mng as value:
|
||||
_, no_more_users = cls.resources[ctx_key]
|
||||
cls.values[ctx_key] = value
|
||||
task_status.started(value)
|
||||
try:
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
# discard nursery ref so it won't be re-used (an error)?
|
||||
value = cls.values.pop(ctx_key)
|
||||
cls.resources.pop(ctx_key)
|
||||
try:
|
||||
async with mng as value:
|
||||
_, no_more_users = cls.resources[ctx_key]
|
||||
try:
|
||||
cls.values[ctx_key] = value
|
||||
task_status.started(value)
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
value = cls.values.pop(ctx_key)
|
||||
finally:
|
||||
# discard nursery ref so it won't be re-used (an error)?
|
||||
cls.resources.pop(ctx_key)
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -294,15 +296,15 @@ async def maybe_open_context(
|
|||
f'task: {task}\n'
|
||||
f'task_tn: {task_tn}\n'
|
||||
)
|
||||
service_tn = tn
|
||||
service_n = tn
|
||||
else:
|
||||
service_tn: trio.Nursery = current_actor()._service_tn
|
||||
service_n: trio.Nursery = current_actor()._service_n
|
||||
|
||||
# TODO: is there any way to allocate
|
||||
# a 'stays-open-till-last-task-finshed nursery?
|
||||
# service_tn: trio.Nursery
|
||||
# async with maybe_open_nursery(_Cache.service_tn) as service_tn:
|
||||
# _Cache.service_tn = service_tn
|
||||
# service_n: trio.Nursery
|
||||
# async with maybe_open_nursery(_Cache.service_n) as service_n:
|
||||
# _Cache.service_n = service_n
|
||||
|
||||
cache_miss_ke: KeyError|None = None
|
||||
maybe_taskc: trio.Cancelled|None = None
|
||||
|
@ -324,8 +326,8 @@ async def maybe_open_context(
|
|||
mngr = acm_func(**kwargs)
|
||||
resources = _Cache.resources
|
||||
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
||||
resources[ctx_key] = (service_tn, trio.Event())
|
||||
yielded: Any = await service_tn.start(
|
||||
resources[ctx_key] = (service_n, trio.Event())
|
||||
yielded: Any = await service_n.start(
|
||||
_Cache.run_ctx,
|
||||
mngr,
|
||||
ctx_key,
|
||||
|
|
|
@ -22,7 +22,10 @@ from __future__ import annotations
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import (
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
from tractor.log import get_logger
|
||||
|
@ -65,7 +68,6 @@ def find_masked_excs(
|
|||
#
|
||||
@acm
|
||||
async def maybe_raise_from_masking_exc(
|
||||
tn: trio.Nursery|None = None,
|
||||
unmask_from: (
|
||||
BaseException|
|
||||
tuple[BaseException]
|
||||
|
@ -74,15 +76,26 @@ async def maybe_raise_from_masking_exc(
|
|||
raise_unmasked: bool = True,
|
||||
extra_note: str = (
|
||||
'This can occurr when,\n'
|
||||
' - a `trio.Nursery` scope embeds a `finally:`-block '
|
||||
'which executes a checkpoint!'
|
||||
'\n'
|
||||
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block '
|
||||
'which execs an un-shielded checkpoint!'
|
||||
#
|
||||
# ^TODO? other cases?
|
||||
),
|
||||
|
||||
always_warn_on: tuple[BaseException] = (
|
||||
always_warn_on: tuple[Type[BaseException]] = (
|
||||
trio.Cancelled,
|
||||
),
|
||||
|
||||
# don't ever unmask or warn on any masking pair,
|
||||
# {<masked-excT-key> -> <masking-excT-value>}
|
||||
never_warn_on: dict[
|
||||
Type[BaseException],
|
||||
Type[BaseException],
|
||||
] = {
|
||||
KeyboardInterrupt: trio.Cancelled,
|
||||
trio.Cancelled: trio.Cancelled,
|
||||
},
|
||||
# ^XXX, special case(s) where we warn-log bc likely
|
||||
# there will be no operational diff since the exc
|
||||
# is always expected to be consumed.
|
||||
|
@ -104,81 +117,91 @@ async def maybe_raise_from_masking_exc(
|
|||
individual sub-excs but maintain the eg-parent's form right?
|
||||
|
||||
'''
|
||||
if not isinstance(unmask_from, tuple):
|
||||
raise ValueError(
|
||||
f'Invalid unmask_from = {unmask_from!r}\n'
|
||||
f'Must be a `tuple[Type[BaseException]]`.\n'
|
||||
)
|
||||
|
||||
from tractor.devx.debug import (
|
||||
BoxedMaybeException,
|
||||
pause,
|
||||
)
|
||||
boxed_maybe_exc = BoxedMaybeException(
|
||||
raise_on_exit=raise_unmasked,
|
||||
)
|
||||
matching: list[BaseException]|None = None
|
||||
maybe_eg: ExceptionGroup|None
|
||||
|
||||
if tn:
|
||||
try: # handle egs
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except* unmask_from as _maybe_eg:
|
||||
maybe_eg = _maybe_eg
|
||||
try:
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except BaseException as _bexc:
|
||||
bexc = _bexc
|
||||
if isinstance(bexc, BaseExceptionGroup):
|
||||
matches: ExceptionGroup
|
||||
matches, _ = maybe_eg.split(
|
||||
unmask_from
|
||||
)
|
||||
if not matches:
|
||||
raise
|
||||
matches, _ = bexc.split(unmask_from)
|
||||
if matches:
|
||||
matching = matches.exceptions
|
||||
|
||||
matching: list[BaseException] = matches.exceptions
|
||||
else:
|
||||
try: # handle non-egs
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except unmask_from as _maybe_exc:
|
||||
maybe_exc = _maybe_exc
|
||||
matching: list[BaseException] = [
|
||||
maybe_exc
|
||||
]
|
||||
|
||||
# XXX, only unmask-ed for debuggin!
|
||||
# TODO, remove eventually..
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
await pause(shield=True)
|
||||
raise berr
|
||||
elif (
|
||||
unmask_from
|
||||
and
|
||||
type(bexc) in unmask_from
|
||||
):
|
||||
matching = [bexc]
|
||||
|
||||
if matching is None:
|
||||
raise
|
||||
|
||||
masked: list[tuple[BaseException, BaseException]] = []
|
||||
for exc_match in matching:
|
||||
|
||||
if exc_ctx := find_masked_excs(
|
||||
maybe_masker=exc_match,
|
||||
unmask_from={unmask_from},
|
||||
unmask_from=set(unmask_from),
|
||||
):
|
||||
masked.append((exc_ctx, exc_match))
|
||||
masked.append((
|
||||
exc_ctx,
|
||||
exc_match,
|
||||
))
|
||||
boxed_maybe_exc.value = exc_match
|
||||
note: str = (
|
||||
f'\n'
|
||||
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
|
||||
f'^^WARNING^^\n'
|
||||
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
|
||||
)
|
||||
if extra_note:
|
||||
note += (
|
||||
f'\n'
|
||||
f'{extra_note}\n'
|
||||
)
|
||||
exc_ctx.add_note(note)
|
||||
|
||||
if type(exc_match) in always_warn_on:
|
||||
do_warn: bool = (
|
||||
never_warn_on.get(
|
||||
type(exc_ctx) # masking type
|
||||
)
|
||||
is not
|
||||
type(exc_match) # masked type
|
||||
)
|
||||
|
||||
if do_warn:
|
||||
exc_ctx.add_note(note)
|
||||
|
||||
if (
|
||||
do_warn
|
||||
and
|
||||
type(exc_match) in always_warn_on
|
||||
):
|
||||
log.warning(note)
|
||||
|
||||
# await tractor.pause(shield=True)
|
||||
if raise_unmasked:
|
||||
|
||||
if (
|
||||
do_warn
|
||||
and
|
||||
raise_unmasked
|
||||
):
|
||||
if len(masked) < 2:
|
||||
raise exc_ctx from exc_match
|
||||
else:
|
||||
# ?TODO, see above but, possibly unmasking sub-exc
|
||||
# entries if there are > 1
|
||||
await pause(shield=True)
|
||||
|
||||
# ??TODO, see above but, possibly unmasking sub-exc
|
||||
# entries if there are > 1
|
||||
# else:
|
||||
# await pause(shield=True)
|
||||
else:
|
||||
raise
|
||||
|
|
Loading…
Reference in New Issue