Compare commits

..

No commits in common. "main" and "strict_egs_everywhere" have entirely different histories.

31 changed files with 466 additions and 2253 deletions

View File

@ -1,35 +0,0 @@
import trio
import tractor
async def main():
async with tractor.open_root_actor(
debug_mode=True,
loglevel='cancel',
) as _root:
# manually trigger self-cancellation and wait
# for it to fully trigger.
_root.cancel_soon()
await _root._cancel_complete.wait()
print('root cancelled')
# now ensure we can still use the REPL
try:
await tractor.pause()
except trio.Cancelled as _taskc:
assert (root_cs := _root._root_tn.cancel_scope).cancel_called
# NOTE^^ above logic but inside `open_root_actor()` and
# passed to the `shield=` expression is effectively what
# we're testing here!
await tractor.pause(shield=root_cs.cancel_called)
# XXX, if shield logic *is wrong* inside `open_root_actor()`'s
# crash-handler block this should never be interacted,
# instead `trio.Cancelled` would be bubbled up: the original
# BUG.
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -1,85 +0,0 @@
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
_lock: trio.Lock|None = None
@acm
async def acquire_singleton_lock(
) -> None:
global _lock
if _lock is None:
log.info('Allocating LOCK')
_lock = trio.Lock()
log.info('TRYING TO LOCK ACQUIRE')
async with _lock:
log.info('ACQUIRED')
yield _lock
log.info('RELEASED')
async def hold_lock_forever(
task_status=trio.TASK_STATUS_IGNORED
):
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
acquire_singleton_lock() as lock,
):
task_status.started(lock)
await trio.sleep_forever()
async def main(
ignore_special_cases: bool,
loglevel: str = 'info',
debug_mode: bool = True,
):
async with (
trio.open_nursery() as tn,
# tractor.trionics.maybe_raise_from_masking_exc()
# ^^^ XXX NOTE, interestingly putting the unmasker
# here does not exhibit the same behaviour ??
):
if not ignore_special_cases:
from tractor.trionics import _taskc
_taskc._mask_cases.clear()
_lock = await tn.start(
hold_lock_forever,
)
with trio.move_on_after(0.2):
await tn.start(
hold_lock_forever,
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'ignore_special_cases: {case!r}\n'
)
trio.run(partial(
main,
ignore_special_cases=case,
loglevel='info',
))

View File

@ -1,195 +0,0 @@
from contextlib import (
contextmanager as cm,
# TODO, any diff in async case(s)??
# asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(
name=__name__
)
@cm
def teardown_on_exc(
raise_from_handler: bool = False,
):
'''
You could also have a teardown handler which catches any exc and
does some required teardown. In this case the problem is
compounded UNLESS you ensure the handler's scope is OUTSIDE the
`ux.aclose()`.. that is in the caller's enclosing scope.
'''
try:
yield
except BaseException as _berr:
berr = _berr
log.exception(
f'Handling termination teardown in child due to,\n'
f'{berr!r}\n'
)
if raise_from_handler:
# XXX teardown ops XXX
# on termination these steps say need to be run to
# ensure wider system consistency (like the state of
# remote connections/services).
#
# HOWEVER, any bug in this teardown code is also
# masked by the `tx.aclose()`!
# this is also true if `_tn.cancel_scope` is
# `.cancel_called` by the parent in a graceful
# request case..
# simulate a bug in teardown handler.
raise RuntimeError(
'woopsie teardown bug!'
)
raise # no teardown bug.
async def finite_stream_to_rent(
tx: trio.abc.SendChannel,
child_errors_mid_stream: bool,
raise_unmasked: bool,
task_status: trio.TaskStatus[
trio.CancelScope,
] = trio.TASK_STATUS_IGNORED,
):
async with (
# XXX without this unmasker the mid-streaming RTE is never
# reported since it is masked by the `tx.aclose()`
# call which in turn raises `Cancelled`!
#
# NOTE, this is WITHOUT doing any exception handling
# inside the child task!
#
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
tractor.trionics.maybe_raise_from_masking_exc(
raise_unmasked=raise_unmasked,
),
tx as tx, # .aclose() is the guilty masker chkpt!
# XXX, this ONLY matters in the
# `child_errors_mid_stream=False` case oddly!?
# THAT IS, if no tn is opened in that case then the
# test will not fail; it raises the RTE correctly?
#
# -> so it seems this new scope somehow affects the form of
# eventual in the parent EG?
tractor.trionics.maybe_open_nursery(
nursery=(
None
if not child_errors_mid_stream
else True
),
) as _tn,
):
# pass our scope back to parent for supervision\
# control.
cs: trio.CancelScope|None = (
None
if _tn is True
else _tn.cancel_scope
)
task_status.started(cs)
with teardown_on_exc(
raise_from_handler=not child_errors_mid_stream,
):
for i in range(100):
log.debug(
f'Child tx {i!r}\n'
)
if (
child_errors_mid_stream
and
i == 66
):
# oh wait but WOOPS there's a bug
# in that teardown code!?
raise RuntimeError(
'woopsie, a mid-streaming bug!?'
)
await tx.send(i)
async def main(
# TODO! toggle this for the 2 cases!
# 1. child errors mid-stream while parent is also requesting
# (graceful) cancel of that child streamer.
#
# 2. child contains a teardown handler which contains a
# bug and raises.
#
child_errors_mid_stream: bool,
raise_unmasked: bool = False,
loglevel: str = 'info',
):
tractor.log.get_console_log(level=loglevel)
# the `.aclose()` being checkpoints on these
# is the source of the problem..
tx, rx = trio.open_memory_channel(1)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
rx as rx,
):
_child_cs = await tn.start(
partial(
finite_stream_to_rent,
child_errors_mid_stream=child_errors_mid_stream,
raise_unmasked=raise_unmasked,
tx=tx,
)
)
async for msg in rx:
log.debug(
f'Rent rx {msg!r}\n'
)
# simulate some external cancellation
# request **JUST BEFORE** the child errors.
if msg == 65:
log.cancel(
f'Cancelling parent on,\n'
f'msg={msg}\n'
f'\n'
f'Simulates OOB cancel request!\n'
)
tn.cancel_scope.cancel()
# XXX, manual test as script
if __name__ == '__main__':
tractor.log.get_console_log(level='info')
for case in [True, False]:
log.info(
f'\n'
f'------ RUNNING SCRIPT TRIAL ------\n'
f'child_errors_midstream: {case!r}\n'
)
try:
trio.run(partial(
main,
child_errors_mid_stream=case,
# raise_unmasked=True,
loglevel='info',
))
except Exception as _exc:
exc = _exc
log.exception(
'Should have raised an RTE or Cancelled?\n'
)
breakpoint()

View File

@ -1,8 +1,8 @@
""" """
That "native" debug mode better work! That "native" debug mode better work!
All these tests can be understood (somewhat) by running the All these tests can be understood (somewhat) by running the equivalent
equivalent `examples/debugging/` scripts manually. `examples/debugging/` scripts manually.
TODO: TODO:
- none of these tests have been run successfully on windows yet but - none of these tests have been run successfully on windows yet but
@ -925,7 +925,6 @@ def test_post_mortem_api(
"<Task 'name_error'", "<Task 'name_error'",
"NameError", "NameError",
"('child'", "('child'",
'getattr(doggypants)', # exc-LoC
] ]
) )
if ctlc: if ctlc:
@ -942,8 +941,8 @@ def test_post_mortem_api(
"<Task '__main__.main'", "<Task '__main__.main'",
"('root'", "('root'",
"NameError", "NameError",
"tractor.post_mortem()",
"src_uid=('child'", "src_uid=('child'",
"tractor.post_mortem()", # in `main()`-LoC
] ]
) )
if ctlc: if ctlc:
@ -961,10 +960,6 @@ def test_post_mortem_api(
"('root'", "('root'",
"NameError", "NameError",
"src_uid=('child'", "src_uid=('child'",
# raising line in `main()` but from crash-handling
# in `tractor.open_nursery()`.
'async with p.open_context(name_error) as (ctx, first):',
] ]
) )
if ctlc: if ctlc:
@ -1156,54 +1151,6 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
) )
def test_crash_handling_within_cancelled_root_actor(
spawn: PexpectSpawner,
):
'''
Ensure that when only a root-actor is started via `open_root_actor()`
we can crash-handle in debug-mode despite self-cancellation.
More-or-less ensures we conditionally shield the pause in
`._root.open_root_actor()`'s `await debug._maybe_enter_pm()`
call.
'''
child = spawn('root_self_cancelled_w_error')
child.expect(PROMPT)
assert_before(
child,
[
"Actor.cancel_soon()` was called!",
"root cancelled",
_pause_msg,
"('root'", # actor name
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('root'", # actor name
"AssertionError",
"assert 0",
]
)
child.sendline('c')
child.expect(EOF)
assert_before(
child,
[
"AssertionError",
"assert 0",
]
)
# TODO: better error for "non-ideal" usage from the root actor. # TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests # -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead # using `await tractor.pause()` instead since it's less overhead

View File

@ -1,114 +0,0 @@
'''
Unit-ish tests for specific IPC transport protocol backends.
'''
from __future__ import annotations
from pathlib import Path
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
@pytest.fixture
def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir()
yield bs_dir_str
# delete it on suite teardown.
# ?TODO? should we support this internally
# or is leaking it ok?
if bs_dir.is_dir():
bs_dir.rmdir()
def test_uds_bindspace_created_implicitly(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
bs_dir_str: str = registry_addr[0]
# XXX, ensure bindspace-dir DNE beforehand!
assert not Path(bs_dir_str).is_dir()
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# XXX MUST be created implicitly by
# `.ipc._uds.start_listener()`!
assert Path(bs_dir_str).is_dir()
root: Actor = tractor.current_actor()
assert root.is_registrar
assert registry_addr in root.reg_addrs
assert (
registry_addr
in
_state._runtime_vars['_registry_addrs']
)
assert (
_addr.wrap_address(registry_addr)
in
root.registry_addrs
)
trio.run(main)
def test_uds_double_listen_raises_connerr(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# runtime up
root: Actor = tractor.current_actor()
from tractor.ipc._uds import (
start_listener,
UDSAddress,
)
ya_bound_addr: UDSAddress = root.registry_addrs[0]
try:
await start_listener(
addr=ya_bound_addr,
)
except ConnectionError as connerr:
assert type(src_exc := connerr.__context__) is OSError
assert 'Address already in use' in src_exc.args
# complete, exit test.
else:
pytest.fail('It dint raise a connerr !?')
trio.run(main)

View File

@ -236,10 +236,7 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method): async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds # stream for at most 1 seconds
with ( with trio.move_on_after(1) as cancel_scope:
trio.fail_after(4),
trio.move_on_after(1) as cancel_scope
):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
'donny', 'donny',

View File

@ -95,7 +95,6 @@ def run_example_in_subproc(
and 'integration' not in p[0] and 'integration' not in p[0]
and 'advanced_faults' not in p[0] and 'advanced_faults' not in p[0]
and 'multihost' not in p[0] and 'multihost' not in p[0]
and 'trio' not in p[0]
) )
], ],
ids=lambda t: t[1], ids=lambda t: t[1],

View File

@ -571,8 +571,6 @@ def test_basic_interloop_channel_stream(
fan_out: bool, fan_out: bool,
): ):
async def main(): async def main():
# TODO, figure out min timeout here!
with trio.fail_after(6):
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
portal = await an.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
@ -1088,108 +1086,6 @@ def test_sigint_closes_lifetime_stack(
trio.run(main) trio.run(main)
# ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ??
async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
`asyncio.Task` entry point which RTEs before calling
`to_trio.send_nowait()`.
'''
await asyncio.sleep(0.2)
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
await asyncio.sleep(float('inf'))
@tractor.context
async def caching_ep(
ctx: tractor.Context,
):
log = tractor.log.get_logger('caching_ep')
log.info('syncing via `ctx.started()`')
await ctx.started()
# XXX, allocate the `open_channel_from()` inside
# a `.trionics.maybe_open_context()`.
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={
'target': raise_before_started,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access
key=tractor.current_actor().uid,
) as (cache_hit, (clients, chan)),
):
if cache_hit:
log.error(
'Re-using cached `.open_from_channel()` call!\n'
)
else:
log.info(
'Allocating SHOULD-FAIL `.open_from_channel()`\n'
)
await trio.sleep_forever()
def test_aio_side_raises_before_started(
reg_addr: tuple[str, int],
debug_mode: bool,
loglevel: str,
):
'''
Simulates connection-err from `piker.brokers.ib.api`..
Ensure any error raised by child-`asyncio.Task` BEFORE
`chan.started()`
'''
# delay = 999 if debug_mode else 1
async def main():
with trio.fail_after(3):
an: tractor.ActorNursery
async with tractor.open_nursery(
debug_mode=debug_mode,
loglevel=loglevel,
) as an:
p: tractor.Portal = await an.start_actor(
'lchan_cacher_that_raises_fast',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
caching_ep,
) as (ctx, first):
assert not first
with pytest.raises(
expected_exception=(RemoteActorError),
) as excinfo:
trio.run(main)
# ensure `asyncio.Task` exception is bubbled
# allll the way erp!!
rae = excinfo.value
assert rae.boxed_type is RuntimeError
# TODO: debug_mode tests once we get support for `asyncio`! # TODO: debug_mode tests once we get support for `asyncio`!
# #
# -[ ] need tests to wrap both scripts: # -[ ] need tests to wrap both scripts:

View File

@ -24,10 +24,14 @@ from tractor._testing import (
) )
# XXX TODO cases: # XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - [x] WE cancelled the peer and thus should not see any raised # - [x] WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently? # `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()` # => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case? # already covers this case?
# - [x] INTER-PEER: some arbitrary remote peer cancels via # - [x] INTER-PEER: some arbitrary remote peer cancels via
# Portal.cancel_actor(). # Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's # => all other connected peers should get that cancel requesting peer's
@ -40,6 +44,16 @@ from tractor._testing import (
# that also spawned a remote task task in that same peer-parent. # that also spawned a remote task task in that same peer-parent.
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
# '''
# ...
@tractor.context @tractor.context
async def open_stream_then_sleep_forever( async def open_stream_then_sleep_forever(
ctx: Context, ctx: Context,
@ -792,7 +806,7 @@ async def basic_echo_server(
ctx: Context, ctx: Context,
peer_name: str = 'wittle_bruv', peer_name: str = 'wittle_bruv',
err_after_imsg: int|None = None, err_after: int|None = None,
) -> None: ) -> None:
''' '''
@ -821,9 +835,8 @@ async def basic_echo_server(
await ipc.send(resp) await ipc.send(resp)
if ( if (
err_after_imsg err_after
and and i > err_after
i > err_after_imsg
): ):
raise RuntimeError( raise RuntimeError(
f'Simulated error in `{peer_name}`' f'Simulated error in `{peer_name}`'
@ -965,8 +978,7 @@ async def tell_little_bro(
actor_name: str, actor_name: str,
caller: str = '', caller: str = '',
err_after: float|None = None, err_after: int|None = None,
rng_seed: int = 50,
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
async with ( async with (
@ -977,18 +989,14 @@ async def tell_little_bro(
basic_echo_server, basic_echo_server,
# XXX proxy any delayed err condition # XXX proxy any delayed err condition
err_after_imsg=( err_after=err_after,
err_after * rng_seed
if err_after is not None
else None
),
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc, sub_ctx.open_stream() as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
uid: tuple = actor.uid uid: tuple = actor.uid
for i in range(rng_seed): for i in range(100):
msg: tuple = ( msg: tuple = (
uid, uid,
i, i,
@ -1013,13 +1021,13 @@ async def tell_little_bro(
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'raise_sub_spawn_error_after', 'raise_sub_spawn_error_after',
[None, 0.5], [None, 50],
) )
def test_peer_spawns_and_cancels_service_subactor( def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool, debug_mode: bool,
raise_client_error: str, raise_client_error: str,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
raise_sub_spawn_error_after: float|None, raise_sub_spawn_error_after: int|None,
): ):
# NOTE: this tests for the modden `mod wks open piker` bug # NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx # discovered as part of implementing workspace ctx
@ -1033,7 +1041,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# and the server's spawned child should cancel and terminate! # and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro' peer_name: str = 'little_bro'
def check_inner_rte(rae: RemoteActorError): def check_inner_rte(rae: RemoteActorError):
''' '''
Validate the little_bro's relayed inception! Validate the little_bro's relayed inception!
@ -1127,7 +1134,8 @@ def test_peer_spawns_and_cancels_service_subactor(
) )
try: try:
res = await client_ctx.wait_for_result(hide_tb=False) res = await client_ctx.result(hide_tb=False)
# in remote (relayed inception) error # in remote (relayed inception) error
# case, we should error on the line above! # case, we should error on the line above!
if raise_sub_spawn_error_after: if raise_sub_spawn_error_after:
@ -1138,23 +1146,6 @@ def test_peer_spawns_and_cancels_service_subactor(
assert isinstance(res, ContextCancelled) assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked assert client_ctx.cancel_acked
assert res.canceller == root.uid assert res.canceller == root.uid
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# XXX, only for tracing
# except BaseException as _berr:
# berr = _berr
# await tractor.pause(shield=True)
# raise berr
except RemoteActorError as rae: except RemoteActorError as rae:
_err = rae _err = rae
@ -1183,8 +1174,19 @@ def test_peer_spawns_and_cancels_service_subactor(
raise raise
# await tractor.pause() # await tractor.pause()
else:
assert not raise_sub_spawn_error_after
# cancelling the spawner sub should
# transitively cancel it's sub, the little
# bruv.
print('root cancelling server/client sub-actors')
await spawn_ctx.cancel()
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
# await tractor.pause()
# await server.cancel_actor() # await server.cancel_actor()
except RemoteActorError as rae: except RemoteActorError as rae:
@ -1197,7 +1199,7 @@ def test_peer_spawns_and_cancels_service_subactor(
# since we called `.cancel_actor()`, `.cancel_ack` # since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not # will not be set on the ctx bc `ctx.cancel()` was not
# called directly for this confext. # called directly fot this confext.
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
_ctxc = ctxc _ctxc = ctxc
print( print(
@ -1237,19 +1239,12 @@ def test_peer_spawns_and_cancels_service_subactor(
# assert spawn_ctx.cancelled_caught # assert spawn_ctx.cancelled_caught
async def _main():
with trio.fail_after(
3 if not debug_mode
else 999
):
await main()
if raise_sub_spawn_error_after: if raise_sub_spawn_error_after:
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(RemoteActorError) as excinfo:
trio.run(_main) trio.run(main)
rae: RemoteActorError = excinfo.value rae: RemoteActorError = excinfo.value
check_inner_rte(rae) check_inner_rte(rae)
else: else:
trio.run(_main) trio.run(main)

View File

@ -1,239 +0,0 @@
'''
Define the details of inter-actor "out-of-band" (OoB) cancel
semantics, that is how cancellation works when a cancel request comes
from the different concurrency (primitive's) "layer" then where the
eventual `trio.Task` actually raises a signal.
'''
from functools import partial
# from contextlib import asynccontextmanager as acm
# import itertools
import pytest
import trio
import tractor
from tractor import ( # typing
ActorNursery,
Portal,
Context,
# ContextCancelled,
# RemoteActorError,
)
# from tractor._testing import (
# tractor_test,
# expect_ctxc,
# )
# XXX TODO cases:
# - [ ] peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
#
# things to ensure!
# -[ ] the ctxc raised in a child should ideally show the tb of the
# underlying `Cancelled` checkpoint, i.e.
# `raise scope_error from ctxc`?
#
# -[ ] a self-cancelled context, if not allowed to block on
# `ctx.result()` at some point will hang since the `ctx._scope`
# is never `.cancel_called`; cases for this include,
# - an `open_ctx()` which never starteds before being OoB actor
# cancelled.
# |_ parent task will be blocked in `.open_context()` for the
# `Started` msg, and when the OoB ctxc arrives `ctx._scope`
# will never have been signalled..
# '''
# ...
# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py`
# but with the `Lock.acquire()` from a `@context` to ensure the
# implicit ignore-case-non-unmasking.
#
# @tractor.context
# async def acquire_actor_global_lock(
# ctx: tractor.Context,
# ignore_special_cases: bool,
# ):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
# # block til cancelled
# await trio.sleep_forever()
@tractor.context
async def sleep_forever(
ctx: tractor.Context,
# ignore_special_cases: bool,
do_started: bool,
):
# async with maybe_unmask_excs(
# ignore_special_cases=ignore_special_cases,
# ):
# await ctx.started('locked')
if do_started:
await ctx.started()
# block til cancelled
print('sleepin on child-side..')
await trio.sleep_forever()
@pytest.mark.parametrize(
'cancel_ctx',
[True, False],
)
def test_cancel_ctx_with_parent_side_entered_in_bg_task(
debug_mode: bool,
loglevel: str,
cancel_ctx: bool,
):
'''
The most "basic" out-of-band-task self-cancellation case where
`Portal.open_context()` is entered in a bg task and the
parent-task (of the containing nursery) calls `Context.cancel()`
without the child knowing; the `Context._scope` should be
`.cancel_called` when the IPC ctx's child-side relays
a `ContextCancelled` with a `.canceller` set to the parent
actor('s task).
'''
async def main():
with trio.fail_after(
2 if not debug_mode else 999,
):
an: ActorNursery
async with (
tractor.open_nursery(
debug_mode=debug_mode,
loglevel='devx',
enable_stack_on_sig=True,
) as an,
trio.open_nursery() as tn,
):
ptl: Portal = await an.start_actor(
'sub',
enable_modules=[__name__],
)
async def _open_ctx_async(
do_started: bool = True,
task_status=trio.TASK_STATUS_IGNORED,
):
# do we expect to never enter the
# `.open_context()` below.
if not do_started:
task_status.started()
async with ptl.open_context(
sleep_forever,
do_started=do_started,
) as (ctx, first):
task_status.started(ctx)
await trio.sleep_forever()
# XXX, this is the key OoB part!
#
# - start the `.open_context()` in a bg task which
# blocks inside the embedded scope-body,
#
# - when we call `Context.cancel()` it **is
# not** from the same task which eventually runs
# `.__aexit__()`,
#
# - since the bg "opener" task will be in
# a `trio.sleep_forever()`, it must be interrupted
# by the `ContextCancelled` delivered from the
# child-side; `Context._scope: CancelScope` MUST
# be `.cancel_called`!
#
print('ASYNC opening IPC context in subtask..')
maybe_ctx: Context|None = await tn.start(partial(
_open_ctx_async,
))
if (
maybe_ctx
and
cancel_ctx
):
print('cancelling first IPC ctx!')
await maybe_ctx.cancel()
# XXX, note that despite `maybe_context.cancel()`
# being called above, it's the parent (bg) task
# which was originally never interrupted in
# the `ctx._scope` body due to missing case logic in
# `ctx._maybe_cancel_and_set_remote_error()`.
#
# It didn't matter that the subactor process was
# already terminated and reaped, nothing was
# cancelling the ctx-parent task's scope!
#
print('cancelling subactor!')
await ptl.cancel_actor()
if maybe_ctx:
try:
await maybe_ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert not cancel_ctx
assert (
ctxc.canceller
==
tractor.current_actor().aid.uid
)
# don't re-raise since it'll trigger
# an EG from the above tn.
if cancel_ctx:
# graceful self-cancel
trio.run(main)
else:
# ctx parent task should see OoB ctxc due to
# `ptl.cancel_actor()`.
with pytest.raises(tractor.ContextCancelled) as excinfo:
trio.run(main)
assert 'root' in excinfo.value.canceller[0]
# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(
# debug_mode: bool,
# loglevel: str,
# ):
# '''
# Demos OoB cancellation from the perspective of a ctx opened with
# a child subactor where the parent cancels the child at the "actor
# layer" using `Portal.cancel_actor()` and thus the
# `ContextCancelled.canceller` received by the ctx's parent-side
# task will appear to be a "self cancellation" even though that
# specific task itself was not cancelled and thus
# `Context.cancel_called ==False`.
# '''
# TODO, do we have an existing implied ctx
# cancel test like this?
# with trio.move_on_after(0.5):# as cs:
# await _open_ctx_async(
# do_started=False,
# )
# in-line ctx scope should definitely raise
# a ctxc with `.canceller = 'root'`
# async with ptl.open_context(
# sleep_forever,
# do_started=True,
# ) as pair:

View File

@ -1,6 +1,5 @@
''' '''
Suites for our `.trionics.maybe_open_context()` multi-task Async context manager cache api testing: ``trionics.maybe_open_context():``
shared-cached `@acm` API.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
@ -10,15 +9,6 @@ from typing import Awaitable
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor.trionics import (
maybe_open_context,
)
from tractor.log import (
get_console_log,
get_logger,
)
log = get_logger(__name__)
_resource: int = 0 _resource: int = 0
@ -62,7 +52,7 @@ def test_resource_only_entered_once(key_on):
# different task names per task will be used # different task names per task will be used
kwargs = {'task_name': name} kwargs = {'task_name': name}
async with maybe_open_context( async with tractor.trionics.maybe_open_context(
maybe_increment_counter, maybe_increment_counter,
kwargs=kwargs, kwargs=kwargs,
key=key, key=key,
@ -82,13 +72,11 @@ def test_resource_only_entered_once(key_on):
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
async with ( async with (
tractor.open_root_actor(), tractor.open_root_actor(),
trio.open_nursery() as tn, trio.open_nursery() as n,
): ):
for i in range(10): for i in range(10):
tn.start_soon( n.start_soon(enter_cached_mngr, f'task_{i}')
enter_cached_mngr,
f'task_{i}',
)
await trio.sleep(0.001) await trio.sleep(0.001)
trio.run(main) trio.run(main)
@ -110,32 +98,21 @@ async def streamer(
@acm @acm
async def open_stream() -> Awaitable[ async def open_stream() -> Awaitable[tractor.MsgStream]:
tuple[
tractor.ActorNursery,
tractor.MsgStream,
]
]:
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
portal = await an.start_actor( portal = await an.start_actor(
'streamer', 'streamer',
enable_modules=[__name__], enable_modules=[__name__],
) )
try:
async with ( async with (
portal.open_context(streamer) as (ctx, first), portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
print('Entered open_stream() caller') yield stream
yield an, stream
print('Exited open_stream() caller')
finally: print('Cancelling streamer')
print(
'Cancelling streamer with,\n'
'=> `Portal.cancel_actor()`'
)
await portal.cancel_actor() await portal.cancel_actor()
print('Cancelled streamer') print('Cancelled streamer')
@ -150,15 +127,11 @@ async def open_stream() -> Awaitable[
@acm @acm
async def maybe_open_stream(taskname: str): async def maybe_open_stream(taskname: str):
async with maybe_open_context( async with tractor.trionics.maybe_open_context(
# NOTE: all secondary tasks should cache hit on the same key # NOTE: all secondary tasks should cache hit on the same key
acm_func=open_stream, acm_func=open_stream,
) as ( ) as (cache_hit, stream):
cache_hit,
(an, stream)
):
# when the actor + portal + ctx + stream has already been
# allocated we want to just bcast to this task.
if cache_hit: if cache_hit:
print(f'{taskname} loaded from cache') print(f'{taskname} loaded from cache')
@ -166,43 +139,10 @@ async def maybe_open_stream(taskname: str):
# if this feed is already allocated by the first # if this feed is already allocated by the first
# task that entereed # task that entereed
async with stream.subscribe() as bstream: async with stream.subscribe() as bstream:
yield an, bstream yield bstream
print(
f'cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
# we should always unreg the "cloned" bcrc for this
# consumer-task
assert id(bstream) not in bstream._state.subs
else: else:
# yield the actual stream # yield the actual stream
try: yield stream
yield an, stream
finally:
print(
f'NON-cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
first_bstream = stream._broadcaster
bcrx_state = first_bstream._state
subs: dict[int, int] = bcrx_state.subs
if len(subs) == 1:
assert id(first_bstream) in subs
# ^^TODO! the bcrx should always de-allocate all subs,
# including the implicit first one allocated on entry
# by the first subscribing peer task, no?
#
# -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
# |_ allows reverting `MsgStream.receive()` to the
# non-bcaster method.
# |_ we can decide whether to reset `._broadcaster`?
#
# await tractor.pause(shield=True)
def test_open_local_sub_to_stream( def test_open_local_sub_to_stream(
@ -219,24 +159,16 @@ def test_open_local_sub_to_stream(
if debug_mode: if debug_mode:
timeout = 999 timeout = 999
print(f'IN debug_mode, setting large timeout={timeout!r}..')
async def main(): async def main():
full = list(range(1000)) full = list(range(1000))
an: tractor.ActorNursery|None = None
num_tasks: int = 10
async def get_sub_and_pull(taskname: str): async def get_sub_and_pull(taskname: str):
nonlocal an
stream: tractor.MsgStream stream: tractor.MsgStream
async with ( async with (
maybe_open_stream(taskname) as ( maybe_open_stream(taskname) as stream,
an,
stream,
),
): ):
if '0' in taskname: if '0' in taskname:
assert isinstance(stream, tractor.MsgStream) assert isinstance(stream, tractor.MsgStream)
@ -248,159 +180,34 @@ def test_open_local_sub_to_stream(
first = await stream.receive() first = await stream.receive()
print(f'{taskname} started with value {first}') print(f'{taskname} started with value {first}')
seq: list[int] = [] seq = []
async for msg in stream: async for msg in stream:
seq.append(msg) seq.append(msg)
assert set(seq).issubset(set(full)) assert set(seq).issubset(set(full))
# end of @acm block
print(f'{taskname} finished') print(f'{taskname} finished')
root: tractor.Actor
with trio.fail_after(timeout) as cs: with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD # TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic? # We probably need an indepotent entry semantic?
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=debug_mode, debug_mode=debug_mode,
# maybe_enable_greenback=True, ):
#
# ^TODO? doesn't seem to mk breakpoint() usage work
# bc each bg task needs to open a portal??
# - [ ] we should consider making this part of
# our taskman defaults?
# |_see https://github.com/goodboy/tractor/pull/363
#
) as root:
assert root.is_registrar
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
for i in range(num_tasks): for i in range(10):
tn.start_soon( tn.start_soon(
get_sub_and_pull, get_sub_and_pull,
f'task_{i}', f'task_{i}',
) )
await trio.sleep(0.001) await trio.sleep(0.001)
print('all consumer tasks finished!') print('all consumer tasks finished')
# ?XXX, ensure actor-nursery is shutdown or we might
# hang here due to a minor task deadlock/race-condition?
#
# - seems that all we need is a checkpoint to ensure
# the last suspended task, which is inside
# `.maybe_open_context()`, can do the
# `Portal.cancel_actor()` call?
#
# - if that bg task isn't resumed, then this blocks
# timeout might hit before that?
#
if root.ipc_server.has_peers():
await trio.lowlevel.checkpoint()
# alt approach, cancel the entire `an`
# await tractor.pause()
# await an.cancel()
# end of runtime scope
print('root actor terminated.')
if cs.cancelled_caught: if cs.cancelled_caught:
pytest.fail( pytest.fail(
'Should NOT time out in `open_root_actor()` ?' 'Should NOT time out in `open_root_actor()` ?'
) )
print('exiting main.')
trio.run(main)
@acm
async def cancel_outer_cs(
cs: trio.CancelScope|None = None,
delay: float = 0,
):
# on first task delay this enough to block
# the 2nd task but then cancel it mid sleep
# so that the tn.start() inside the key-err handler block
# is cancelled and would previously corrupt the
# mutext state.
log.info(f'task entering sleep({delay})')
await trio.sleep(delay)
if cs:
log.info('task calling cs.cancel()')
cs.cancel()
trio.lowlevel.checkpoint()
yield
await trio.sleep_forever()
def test_lock_not_corrupted_on_fast_cancel(
debug_mode: bool,
loglevel: str,
):
'''
Verify that if the caching-task (the first to enter
`maybe_open_context()`) is cancelled mid-cache-miss, the embedded
mutex can never be left in a corrupted state.
That is, the lock is always eventually released ensuring a peer
(cache-hitting) task will never,
- be left to inf-block/hang on the `lock.acquire()`.
- try to release the lock when still owned by the caching-task
due to it having erronously exited without calling
`lock.release()`.
'''
delay: float = 1.
async def use_moc(
cs: trio.CancelScope|None,
delay: float,
):
log.info('task entering moc')
async with maybe_open_context(
cancel_outer_cs,
kwargs={
'cs': cs,
'delay': delay,
},
) as (cache_hit, _null):
if cache_hit:
log.info('2nd task entered')
else:
log.info('1st task entered')
await trio.sleep_forever()
async def main():
with trio.fail_after(delay + 2):
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
trio.open_nursery() as tn,
):
get_console_log('info')
log.info('yo starting')
cs = tn.cancel_scope
tn.start_soon(
use_moc,
cs,
delay,
name='child',
)
with trio.CancelScope() as rent_cs:
await use_moc(
cs=rent_cs,
delay=delay,
)
trio.run(main) trio.run(main)

View File

@ -6,18 +6,11 @@ want to see changed.
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from types import ModuleType
from functools import partial
import pytest import pytest
from _pytest import pathlib
from tractor.trionics import collapse_eg from tractor.trionics import collapse_eg
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from tractor._testing import (
examples_dir,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -113,9 +106,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Demo how a masking `trio.Cancelled` could be handled by unmasking Demo how a masking `trio.Cancelled` could be handled by unmasking from the
from the `.__context__` field when a user (by accident) re-raises `.__context__` field when a user (by accident) re-raises from a `finally:`.
from a `finally:`.
''' '''
import tractor import tractor
@ -125,9 +117,11 @@ def test_acm_embedded_nursery_propagates_enter_err(
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc( tractor.trionics.maybe_raise_from_masking_exc(
tn=tn,
unmask_from=( unmask_from=(
(trio.Cancelled,) if unmask_from_canc trio.Cancelled
else () if unmask_from_canc
else None
), ),
) )
): ):
@ -142,6 +136,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler( with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode, pdb=debug_mode,
) as bxerr: ) as bxerr:
if bxerr:
assert not bxerr.value assert not bxerr.value
async with ( async with (
@ -150,7 +145,6 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert not tn.cancel_scope.cancel_called assert not tn.cancel_scope.cancel_called
assert 0 assert 0
if debug_mode:
assert ( assert (
(err := bxerr.value) (err := bxerr.value)
and and
@ -166,13 +160,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert len(assert_eg.exceptions) == 1 assert len(assert_eg.exceptions) == 1
def test_gatherctxs_with_memchan_breaks_multicancelled( def test_gatherctxs_with_memchan_breaks_multicancelled(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
Demo how a using an `async with sndchan` inside Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
a `.trionics.gather_contexts()` task will break a strict-eg-tn's will break a strict-eg-tn's multi-cancelled absorption..
multi-cancelled absorption..
''' '''
from tractor import ( from tractor import (
@ -198,6 +192,7 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
f'Closed {task!r}\n' f'Closed {task!r}\n'
) )
async def main(): async def main():
async with ( async with (
# XXX should ensure ONLY the KBI # XXX should ensure ONLY the KBI
@ -218,85 +213,3 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run(main) trio.run(main)
@pytest.mark.parametrize(
'raise_unmasked', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/send_chan_aclose_masks.py"
)
),
]
)
@pytest.mark.parametrize(
'child_errors_mid_stream',
[True, False],
)
def test_unmask_aclose_as_checkpoint_on_aexit(
raise_unmasked: bool,
child_errors_mid_stream: bool,
debug_mode: bool,
):
'''
Verify that our unmasker util works over the common case where
a mem-chan's `.aclose()` is included in an `@acm` stack
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
exception from user code resulting in a silent failure which
appears like graceful cancellation.
This test suite is mostly implemented as an example script so it
could more easily be shared with `trio`-core peeps as `tractor`-less
minimum reproducing example.
'''
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'send_chan_aclose_masks_beg.py',
root=examples_dir(),
consider_namespace_packages=False,
)
with pytest.raises(RuntimeError):
trio.run(partial(
mod.main,
raise_unmasked=raise_unmasked,
child_errors_mid_stream=child_errors_mid_stream,
))
@pytest.mark.parametrize(
'ignore_special_cases', [
True,
pytest.param(
False,
marks=pytest.mark.xfail(
reason="see examples/trio/lockacquire_not_umasked.py"
)
),
]
)
def test_cancelled_lockacquire_in_ipctx_not_unmasked(
ignore_special_cases: bool,
loglevel: str,
debug_mode: bool,
):
mod: ModuleType = pathlib.import_path(
examples_dir()
/ 'trio'
/ 'lockacquire_not_unmasked.py',
root=examples_dir(),
consider_namespace_packages=False,
)
async def _main():
with trio.fail_after(2):
await mod.main(
ignore_special_cases=ignore_special_cases,
loglevel=loglevel,
debug_mode=debug_mode,
)
trio.run(_main)

View File

@ -154,7 +154,7 @@ class Context:
2 cancel-scope-linked, communicating and parallel executing 2 cancel-scope-linked, communicating and parallel executing
`Task`s. Contexts are allocated on each side of any task `Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "child" side a context is actor from a `Portal`. On the "callee" side a context is
always allocated inside `._rpc._invoke()`. always allocated inside `._rpc._invoke()`.
TODO: more detailed writeup on cancellation, error and TODO: more detailed writeup on cancellation, error and
@ -222,8 +222,8 @@ class Context:
# `._runtime.invoke()`. # `._runtime.invoke()`.
_remote_func_type: str | None = None _remote_func_type: str | None = None
# NOTE: (for now) only set (a portal) on the parent side since # NOTE: (for now) only set (a portal) on the caller side since
# the child doesn't generally need a ref to one and should # the callee doesn't generally need a ref to one and should
# normally need to explicitly ask for handle to its peer if # normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed? # more the the `Context` is needed?
_portal: Portal | None = None _portal: Portal | None = None
@ -252,12 +252,12 @@ class Context:
_outcome_msg: Return|Error|ContextCancelled = Unresolved _outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value # on a clean exit there should be a final value
# delivered from the far end "child" task, so # delivered from the far end "callee" task, so
# this value is only set on one side. # this value is only set on one side.
# _result: Any | int = None # _result: Any | int = None
_result: PayloadT|Unresolved = Unresolved _result: PayloadT|Unresolved = Unresolved
# if the local "parent" task errors this value is always set # if the local "caller" task errors this value is always set
# to the error that was captured in the # to the error that was captured in the
# `Portal.open_context().__aexit__()` teardown block OR, in # `Portal.open_context().__aexit__()` teardown block OR, in
# 2 special cases when an (maybe) expected remote error # 2 special cases when an (maybe) expected remote error
@ -293,7 +293,7 @@ class Context:
# a `ContextCancelled` due to a call to `.cancel()` triggering # a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side: # "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging # - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "child" # the crash handler REPL in such cases where the "callee"
# raises the cancellation, # raises the cancellation,
# - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if # - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if
# the global tty-lock has been configured to filter out some # the global tty-lock has been configured to filter out some
@ -307,8 +307,8 @@ class Context:
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
# the parent-task's calling-fn's frame-info, the frame above # caller of `Portal.open_context()` for
# `Portal.open_context()`, for introspection/logging. # logging purposes mostly
_caller_info: CallerInfo|None = None _caller_info: CallerInfo|None = None
# overrun handling machinery # overrun handling machinery
@ -442,25 +442,25 @@ class Context:
''' '''
Records whether cancellation has been requested for this context Records whether cancellation has been requested for this context
by a call to `.cancel()` either due to, by a call to `.cancel()` either due to,
- an explicit call by some local task, - either an explicit call by some local task,
- or an implicit call due to an error caught inside - or an implicit call due to an error caught inside
the `Portal.open_context()` block. the ``Portal.open_context()`` block.
''' '''
return self._cancel_called return self._cancel_called
# XXX, to debug who frickin sets it.. @cancel_called.setter
# @cancel_called.setter def cancel_called(self, val: bool) -> None:
# def cancel_called(self, val: bool) -> None: '''
# ''' Set the self-cancelled request `bool` value.
# Set the self-cancelled request `bool` value.
# ''' '''
# to debug who frickin sets it..
# if val: # if val:
# from .devx import pause_from_sync # from .devx import pause_from_sync
# pause_from_sync() # pause_from_sync()
# self._cancel_called = val self._cancel_called = val
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
@ -529,11 +529,11 @@ class Context:
''' '''
Exactly the value of `self._scope.cancelled_caught` Exactly the value of `self._scope.cancelled_caught`
(delegation) and should only be (able to be read as) (delegation) and should only be (able to be read as)
`True` for a `.side == "parent"` ctx wherein the `True` for a `.side == "caller"` ctx wherein the
`Portal.open_context()` block was exited due to a call to `Portal.open_context()` block was exited due to a call to
`._scope.cancel()` - which should only ocurr in 2 cases: `._scope.cancel()` - which should only ocurr in 2 cases:
- a parent side calls `.cancel()`, the far side cancels - a caller side calls `.cancel()`, the far side cancels
and delivers back a `ContextCancelled` (making and delivers back a `ContextCancelled` (making
`.cancel_acked == True`) and `._scope.cancel()` is `.cancel_acked == True`) and `._scope.cancel()` is
called by `._maybe_cancel_and_set_remote_error()` which called by `._maybe_cancel_and_set_remote_error()` which
@ -542,20 +542,20 @@ class Context:
=> `._scope.cancelled_caught == True` by normal `trio` => `._scope.cancelled_caught == True` by normal `trio`
cs semantics. cs semantics.
- a parent side is delivered a `._remote_error: - a caller side is delivered a `._remote_error:
RemoteActorError` via `._deliver_msg()` and a transitive RemoteActorError` via `._deliver_msg()` and a transitive
call to `_maybe_cancel_and_set_remote_error()` calls call to `_maybe_cancel_and_set_remote_error()` calls
`._scope.cancel()` and that cancellation eventually `._scope.cancel()` and that cancellation eventually
results in `trio.Cancelled`(s) caught in the results in `trio.Cancelled`(s) caught in the
`.open_context()` handling around the @acm's `yield`. `.open_context()` handling around the @acm's `yield`.
Only as an FYI, in the "child" side case it can also be Only as an FYI, in the "callee" side case it can also be
set but never is readable by any task outside the RPC set but never is readable by any task outside the RPC
machinery in `._invoke()` since,: machinery in `._invoke()` since,:
- when a child side calls `.cancel()`, `._scope.cancel()` - when a callee side calls `.cancel()`, `._scope.cancel()`
is called immediately and handled specially inside is called immediately and handled specially inside
`._invoke()` to raise a `ContextCancelled` which is then `._invoke()` to raise a `ContextCancelled` which is then
sent to the parent side. sent to the caller side.
However, `._scope.cancelled_caught` can NEVER be However, `._scope.cancelled_caught` can NEVER be
accessed/read as `True` by any RPC invoked task since it accessed/read as `True` by any RPC invoked task since it
@ -635,71 +635,6 @@ class Context:
''' '''
await self.chan.send(Stop(cid=self.cid)) await self.chan.send(Stop(cid=self.cid))
@property
def parent_task(self) -> trio.Task:
'''
This IPC context's "owning task" which is a `trio.Task`
on one of the "sides" of the IPC.
Note that the "parent_" prefix here refers to the local
`trio` task tree using the same interface as
`trio.Nursery.parent_task` whereas for IPC contexts,
a different cross-actor task hierarchy exists:
- a "parent"-side which originally entered
`Portal.open_context()`,
- the "child"-side which was spawned and scheduled to invoke
a function decorated with `@tractor.context`.
This task is thus a handle to mem-domain-distinct/per-process
`Nursery.parent_task` depending on in which of the above
"sides" this context exists.
'''
return self._task
def _is_blocked_on_rx_chan(self) -> bool:
'''
Predicate to indicate whether the owner `._task: trio.Task` is
currently blocked (by `.receive()`-ing) on its underlying RPC
feeder `._rx_chan`.
This knowledge is highly useful when handling so called
"out-of-band" (OoB) cancellation conditions where a peer
actor's task transmitted some remote error/cancel-msg and we
must know whether to signal-via-cancel currently executing
"user-code" (user defined code embedded in `ctx._scope`) or
simply to forward the IPC-msg-as-error **without calling**
`._scope.cancel()`.
In the latter case it is presumed that if the owner task is
blocking for the next IPC msg, it will eventually receive,
process and raise the equivalent local error **without**
requiring `._scope.cancel()` to be explicitly called by the
*delivering OoB RPC-task* (via `_deliver_msg()`).
'''
# NOTE, see the mem-chan meth-impls for *why* this
# logic works,
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
#
# XXX realize that this is NOT an
# official/will-be-loudly-deprecated API:
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
#
# orig repo intro in the mem-chan change over patch:
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
# |_https://github.com/python-trio/trio/pull/616
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
#
return (
self._task.custom_sleep_data
is
self._rx_chan
)
def _maybe_cancel_and_set_remote_error( def _maybe_cancel_and_set_remote_error(
self, self,
error: BaseException, error: BaseException,
@ -731,7 +666,7 @@ class Context:
when called/closed by actor local task(s). when called/closed by actor local task(s).
NOTEs: NOTEs:
- It is expected that the parent has previously unwrapped - It is expected that the caller has previously unwrapped
the remote error using a call to `unpack_error()` and the remote error using a call to `unpack_error()` and
provides that output exception value as the input provides that output exception value as the input
`error` argument *here*. `error` argument *here*.
@ -741,7 +676,7 @@ class Context:
`Portal.open_context()` (ideally) we want to interrupt `Portal.open_context()` (ideally) we want to interrupt
any ongoing local tasks operating within that any ongoing local tasks operating within that
`Context`'s cancel-scope so as to be notified ASAP of `Context`'s cancel-scope so as to be notified ASAP of
the remote error and engage any parent handling (eg. the remote error and engage any caller handling (eg.
for cross-process task supervision). for cross-process task supervision).
- In some cases we may want to raise the remote error - In some cases we may want to raise the remote error
@ -852,27 +787,13 @@ class Context:
if self._canceller is None: if self._canceller is None:
log.error('Ctx has no canceller set!?') log.error('Ctx has no canceller set!?')
cs: trio.CancelScope = self._scope
# ?TODO? see comment @ .start_remote_task()`
#
# if not cs:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise RuntimeError(
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
# f'{self}\n'
# f'\n'
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
# )
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the # once exiting (or manually calling `.wait_for_result()`) the
# `.open_context()` block. # `.open_context()` block.
cs: trio.CancelScope = self._scope
if ( if (
cs cs
and not cs.cancel_called
# XXX this is an expected cancel request response # XXX this is an expected cancel request response
# message and we **don't need to raise it** in the # message and we **don't need to raise it** in the
@ -881,7 +802,8 @@ class Context:
# if `._cancel_called` then `.cancel_acked and .cancel_called` # if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set. # always should be set.
and not self._is_self_cancelled() and not self._is_self_cancelled()
# and not cs.cancelled_caught and not cs.cancel_called
and not cs.cancelled_caught
): ):
if ( if (
msgerr msgerr
@ -892,7 +814,7 @@ class Context:
not self._cancel_on_msgerr not self._cancel_on_msgerr
): ):
message: str = ( message: str = (
f'NOT Cancelling `Context._scope` since,\n' 'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n' f'AND we got a msg-type-error!\n'
f'{error}\n' f'{error}\n'
@ -902,43 +824,13 @@ class Context:
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n' message: str = 'Cancelling `Context._scope` !\n\n'
cs.cancel() # from .devx import pause_from_sync
# pause_from_sync()
# TODO, explicit condition for OoB (self-)cancellation? self._scope.cancel()
# - we called `Portal.cancel_actor()` from this actor else:
# and the peer ctx task delivered ctxc due to it. message: str = 'NOT cancelling `Context._scope` !\n\n'
# - currently `self._is_self_cancelled()` will be true
# since the ctxc.canceller check will match us even though it
# wasn't from this ctx specifically!
elif (
cs
and self._is_self_cancelled()
and not cs.cancel_called
):
message: str = (
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
'\n'
)
# from .devx import mk_pdb # from .devx import mk_pdb
# mk_pdb().set_trace() # mk_pdb().set_trace()
# TODO XXX, required to fix timeout failure in
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
#
# XXX NOTE XXX, this is SUPER SUBTLE!
# we only want to cancel our embedded `._scope`
# if the ctx's current/using task is NOT blocked
# on `._rx_chan.receive()` and on some other
# `trio`-checkpoint since in the former case
# any `._remote_error` will be relayed through
# the rx-chan and appropriately raised by the owning
# `._task` directly. IF the owner task is however
# blocking elsewhere we need to interrupt it **now**.
if not self._is_blocked_on_rx_chan():
cs.cancel()
else:
# rx_stats = self._rx_chan.statistics()
message: str = 'NOT cancelling `Context._scope` !\n\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
if ( if (
@ -962,7 +854,6 @@ class Context:
+ +
cs_fmt cs_fmt
) )
log.cancel( log.cancel(
message message
+ +
@ -995,11 +886,6 @@ class Context:
@property @property
def repr_caller(self) -> str: def repr_caller(self) -> str:
'''
Render a "namespace-path" style representation of the calling
task-fn.
'''
ci: CallerInfo|None = self._caller_info ci: CallerInfo|None = self._caller_info
if ci: if ci:
return ( return (
@ -1013,7 +899,7 @@ class Context:
def repr_api(self) -> str: def repr_api(self) -> str:
return 'Portal.open_context()' return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller fn! # TODO: use `.dev._frame_stack` scanning to find caller!
# ci: CallerInfo|None = self._caller_info # ci: CallerInfo|None = self._caller_info
# if ci: # if ci:
# return ( # return (
@ -1048,16 +934,15 @@ class Context:
=> That is, an IPC `Context` (this) **does not** => That is, an IPC `Context` (this) **does not**
have the same semantics as a `trio.CancelScope`. have the same semantics as a `trio.CancelScope`.
If the parent (who entered the `Portal.open_context()`) If the caller (who entered the `Portal.open_context()`)
desires that the internal block's cancel-scope be desires that the internal block's cancel-scope be
cancelled it should open its own `trio.CancelScope` and cancelled it should open its own `trio.CancelScope` and
manage it as needed. manage it as needed.
''' '''
side: str = self.side side: str = self.side
self._cancel_called = True # XXX for debug via the `@.setter`
# ^ XXX for debug via the `@.setter` self.cancel_called = True
# self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx from {side!r}-side\n' f'Cancelling ctx from {side!r}-side\n'
@ -1121,6 +1006,7 @@ class Context:
else: else:
log.cancel( log.cancel(
f'Timed out on cancel request of remote task?\n' f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}' f'{reminfo}'
) )
@ -1131,7 +1017,7 @@ class Context:
# `_invoke()` RPC task. # `_invoke()` RPC task.
# #
# NOTE: on this side we ALWAYS cancel the local scope # NOTE: on this side we ALWAYS cancel the local scope
# since the parent expects a `ContextCancelled` to be sent # since the caller expects a `ContextCancelled` to be sent
# from `._runtime._invoke()` back to the other side. The # from `._runtime._invoke()` back to the other side. The
# logic for catching the result of the below # logic for catching the result of the below
# `._scope.cancel()` is inside the `._runtime._invoke()` # `._scope.cancel()` is inside the `._runtime._invoke()`
@ -1304,8 +1190,8 @@ class Context:
) -> Any|Exception: ) -> Any|Exception:
''' '''
From some (parent) side task, wait for and return the final From some (caller) side task, wait for and return the final
result from the remote (child) side's task. result from the remote (callee) side's task.
This provides a mechanism for one task running in some actor to wait This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate. on another task at the other side, in some other actor, to terminate.
@ -1601,12 +1487,6 @@ class Context:
): ):
status = 'peer-cancelled' status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition # (remote) error condition
case ( case (
Unresolved, Unresolved,
@ -1720,7 +1600,7 @@ class Context:
raise err raise err
# TODO: maybe a flag to by-pass encode op if already done # TODO: maybe a flag to by-pass encode op if already done
# here in parent? # here in caller?
await self.chan.send(started_msg) await self.chan.send(started_msg)
# set msg-related internal runtime-state # set msg-related internal runtime-state
@ -1796,7 +1676,7 @@ class Context:
XXX RULES XXX XXX RULES XXX
------ - ------ ------ - ------
- NEVER raise remote errors from this method; a calling runtime-task. - NEVER raise remote errors from this method; a runtime task caller.
An error "delivered" to a ctx should always be raised by An error "delivered" to a ctx should always be raised by
the corresponding local task operating on the the corresponding local task operating on the
`Portal`/`Context` APIs. `Portal`/`Context` APIs.
@ -1872,7 +1752,7 @@ class Context:
else: else:
report = ( report = (
'Queueing OVERRUN msg on parent task:\n\n' 'Queueing OVERRUN msg on caller task:\n\n'
+ report + report
) )
log.debug(report) log.debug(report)
@ -2068,12 +1948,12 @@ async def open_context_from_portal(
IPC protocol. IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context` The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "child" task via a call and any first value "sent" by the "callee" task via a call
to `Context.started(<value: Any>)`; this side of the to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "child" task calls context does not unblock until the "callee" task calls
`.started()` in similar style to `trio.Nursery.start()`. `.started()` in similar style to `trio.Nursery.start()`.
When the "child" (side that is "called"/started by a call When the "callee" (side that is "called"/started by a call
to *this* method) returns, the parent side (this) unblocks to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api. retrieved using the `Contex.wait_for_result()` api.
@ -2086,7 +1966,7 @@ async def open_context_from_portal(
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack # denote this frame as a "runtime frame" for stack
# introspection where we report the parent code in logging # introspection where we report the caller code in logging
# and error message content. # and error message content.
# NOTE: 2 bc of the wrapping `@acm` # NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa __runtimeframe__: int = 2 # noqa
@ -2121,9 +2001,6 @@ async def open_context_from_portal(
f'|_{portal.actor}\n' f'|_{portal.actor}\n'
) )
# ?TODO? could we move this to inside the `tn` block?
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
# -> would allow a `if not ._scope: => raise RTE` ?
ctx: Context = await portal.actor.start_remote_task( ctx: Context = await portal.actor.start_remote_task(
portal.channel, portal.channel,
nsf=nsf, nsf=nsf,
@ -2148,9 +2025,8 @@ async def open_context_from_portal(
# placeholder for any exception raised in the runtime # placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure. # or by user tasks which cause this context's closure.
scope_err: BaseException|None = None scope_err: BaseException|None = None
ctxc_from_child: ContextCancelled|None = None ctxc_from_callee: ContextCancelled|None = None
try: try:
# from .devx import pause
async with ( async with (
collapse_eg(), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
@ -2173,10 +2049,6 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try: try:
log.runtime(
f'IPC ctx parent waiting on Started msg..\n'
f'ctx.cid: {ctx.cid!r}\n'
)
started_msg, first = await ctx._pld_rx.recv_msg( started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
@ -2185,16 +2057,16 @@ async def open_context_from_portal(
) )
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope ctx_cs: trio.CancelScope = ctx._scope
log.cancel(
f'IPC ctx was cancelled during "child" task sync due to\n\n'
f'.cid: {ctx.cid!r}\n'
f'.maybe_error: {ctx.maybe_error!r}\n'
)
# await pause(shield=True)
if not ctx_cs.cancel_called: if not ctx_cs.cancel_called:
raise raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually, # OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to # likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure # `._maybe_cancel_and_set_remote_error()` so ensure
@ -2232,7 +2104,7 @@ async def open_context_from_portal(
# that we can re-use it around the `yield` ^ here # that we can re-use it around the `yield` ^ here
# or vice versa? # or vice versa?
# #
# maybe TODO NOTE: between the parent exiting and # maybe TODO NOTE: between the caller exiting and
# arriving here the far end may have sent a ctxc-msg or # arriving here the far end may have sent a ctxc-msg or
# other error, so the quetion is whether we should check # other error, so the quetion is whether we should check
# for it here immediately and maybe raise so as to engage # for it here immediately and maybe raise so as to engage
@ -2298,16 +2170,16 @@ async def open_context_from_portal(
# request in which case we DO let the error bubble to the # request in which case we DO let the error bubble to the
# opener. # opener.
# #
# 2-THIS "parent" task somewhere invoked `Context.cancel()` # 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "child" # and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from # task, in which case we mask the `ContextCancelled` from
# bubbling to this "parent" (much like how `trio.Nursery` # bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to # swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`) # `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_err
ctxc_from_child = ctxc ctxc_from_callee = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will # using this code and then resuming the REPL will
@ -2344,11 +2216,11 @@ async def open_context_from_portal(
# the above `._scope` can be cancelled due to: # the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or # 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`, # `Actor.cancel()`,
# 2. any "child"-side remote error, possibly also a cancellation # 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer, # request by some peer,
# 3. any "parent" (aka THIS scope's) local error raised in the above `yield` # 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except ( except (
# CASE 3: standard local error in this parent/yieldee # CASE 3: standard local error in this caller/yieldee
Exception, Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery` # CASES 1 & 2: can manifest as a `ctx._scope_nursery`
@ -2362,9 +2234,9 @@ async def open_context_from_portal(
# any `Context._maybe_raise_remote_err()` call. # any `Context._maybe_raise_remote_err()` call.
# #
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "child" side # from any error delivered from the "callee" side
# AND a group-exc is only raised if there was > 1 # AND a group-exc is only raised if there was > 1
# tasks started *here* in the "parent" / opener # tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls # block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()` # `.wait_for_result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively # `._maybe_raise_remote_err()` will be transitively
@ -2377,8 +2249,8 @@ async def open_context_from_portal(
trio.Cancelled, # NOTE: NOT from inside the ctx._scope trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt, KeyboardInterrupt,
) as rent_err: ) as caller_err:
scope_err = rent_err scope_err = caller_err
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR. # XXX: ALWAYS request the context to CANCEL ON any ERROR.
@ -2390,16 +2262,13 @@ async def open_context_from_portal(
match scope_err: match scope_err:
case trio.Cancelled(): case trio.Cancelled():
logmeth = log.cancel logmeth = log.cancel
cause: str = 'cancelled'
# XXX explicitly report on any non-graceful-taskc cases # XXX explicitly report on any non-graceful-taskc cases
case _: case _:
cause: str = 'errored'
logmeth = log.exception logmeth = log.exception
logmeth( logmeth(
f'ctx {ctx.side!r}-side {cause!r} with,\n' f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
f'{ctx.repr_outcome()!r}\n'
) )
if debug_mode(): if debug_mode():
@ -2420,11 +2289,10 @@ async def open_context_from_portal(
'Calling `ctx.cancel()`!\n' 'Calling `ctx.cancel()`!\n'
) )
# we don't need to cancel the child if it already # we don't need to cancel the callee if it already
# told us it's cancelled ;p # told us it's cancelled ;p
if ctxc_from_child is None: if ctxc_from_callee is None:
try: try:
# await pause(shield=True)
await ctx.cancel() await ctx.cancel()
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
@ -2454,8 +2322,8 @@ async def open_context_from_portal(
# via a call to # via a call to
# `Context._maybe_cancel_and_set_remote_error()`. # `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS # As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "child" side fails and causes # ALWAYS SET any time "callee" side fails and causes "caller
# "parent side" cancellation via a `ContextCancelled` here. # side" cancellation via a `ContextCancelled` here.
try: try:
result_or_err: Exception|Any = await ctx.wait_for_result() result_or_err: Exception|Any = await ctx.wait_for_result()
except BaseException as berr: except BaseException as berr:
@ -2491,7 +2359,7 @@ async def open_context_from_portal(
) )
case (None, _): case (None, _):
log.runtime( log.runtime(
'Context returned final result from child task:\n' 'Context returned final result from callee task:\n'
f'<= peer: {uid}\n' f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n' f' |_ {nsf}()\n\n'
@ -2581,14 +2449,12 @@ async def open_context_from_portal(
log.cancel( log.cancel(
f'Context cancelled by local {ctx.side!r}-side task\n' f'Context cancelled by local {ctx.side!r}-side task\n'
f'c)>\n' f'c)>\n'
f' |_{ctx.parent_task}\n' f' |_{ctx._task}\n\n'
f' .cid={ctx.cid!r}\n' f'{repr(scope_err)}\n'
f'\n'
f'{scope_err!r}\n'
) )
# TODO: should we add a `._cancel_req_received` # TODO: should we add a `._cancel_req_received`
# flag to determine if the child manually called # flag to determine if the callee manually called
# `ctx.cancel()`? # `ctx.cancel()`?
# -[ ] going to need a cid check no? # -[ ] going to need a cid check no?
@ -2644,7 +2510,7 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan parent-info if log level so high! # TODO: only scan caller-info if log level so high!
from .devx._frame_stack import find_caller_info from .devx._frame_stack import find_caller_info
caller_info: CallerInfo|None = find_caller_info() caller_info: CallerInfo|None = find_caller_info()

View File

@ -300,7 +300,7 @@ class Portal:
) )
# XXX the one spot we set it? # XXX the one spot we set it?
chan._cancel_called: bool = True self.channel._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with # XXX: sure would be nice to make this work with

View File

@ -481,11 +481,10 @@ async def open_root_actor(
collapse_eg(), collapse_eg(),
trio.open_nursery() as root_tn, trio.open_nursery() as root_tn,
# ?TODO? finally-footgun below? # XXX, finally-footgun below?
# -> see note on why shielding. # -> see note on why shielding.
# maybe_raise_from_masking_exc(), # maybe_raise_from_masking_exc(),
): ):
actor._root_tn = root_tn
# `_runtime.async_main()` creates an internal nursery # `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process) # and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called # tree has terminated thereby conducting so called
@ -524,11 +523,6 @@ async def open_root_actor(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
debug_filter=debug_filter, debug_filter=debug_filter,
# XXX NOTE, required to debug root-actor
# crashes under cancellation conditions; so
# most of them!
shield=root_tn.cancel_scope.cancel_called,
) )
if ( if (
@ -568,7 +562,6 @@ async def open_root_actor(
f'{op_nested_actor_repr}' f'{op_nested_actor_repr}'
) )
# XXX, THIS IS A *finally-footgun*! # XXX, THIS IS A *finally-footgun*!
# (also mentioned in with-block above)
# -> though already shields iternally it can # -> though already shields iternally it can
# taskc here and mask underlying errors raised in # taskc here and mask underlying errors raised in
# the try-block above? # the try-block above?

View File

@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc(
# RPC task bookeeping. # RPC task bookeeping.
# since RPC tasks are scheduled inside a flat # since RPC tasks are scheduled inside a flat
# `Actor._service_tn`, we add "handles" to each such that # `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled. # they can be individually ccancelled.
finally: finally:
@ -462,7 +462,7 @@ async def _invoke(
connected IPC channel. connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_tn: Nursery`. remotely invoked function, normally in `Actor._service_n: Nursery`.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -642,7 +642,7 @@ async def _invoke(
tn: Nursery tn: Nursery
rpc_ctx_cs: CancelScope rpc_ctx_cs: CancelScope
async with ( async with (
collapse_eg(hide_tb=False), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
@ -654,7 +654,8 @@ async def _invoke(
# scope ensures unasking of the `await coro` below # scope ensures unasking of the `await coro` below
# *should* never be interfered with!! # *should* never be interfered with!!
maybe_raise_from_masking_exc( maybe_raise_from_masking_exc(
unmask_from=(Cancelled,), tn=tn,
unmask_from=Cancelled,
) as _mbme, # maybe boxed masked exc ) as _mbme, # maybe boxed masked exc
): ):
ctx._scope_nursery = tn ctx._scope_nursery = tn
@ -822,44 +823,24 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n' f'after having {ctx.repr_state!r}\n'
) )
if merr: if merr:
logmeth: Callable = log.error logmeth: Callable = log.error
if ( if isinstance(merr, ContextCancelled):
# ctxc: by `Context.cancel()` logmeth: Callable = log.runtime
isinstance(merr, ContextCancelled)
# out-of-layer cancellation, one of: if not isinstance(merr, RemoteActorError):
# - actorc: by `Portal.cancel_actor()` tb_str: str = ''.join(traceback.format_exception(merr))
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
descr_str += ( descr_str += (
f'\n{merr!r}\n' # needed? f'\n{merr!r}\n' # needed?
f'{tb_str}\n' f'{tb_str}\n'
)
else:
descr_str += (
f'{merr!r}\n'
)
else:
descr_str += (
f'\n' f'\n'
f'with final result {ctx.outcome!r}\n' f'scope_error:\n'
f'{scope_err!r}\n'
) )
else:
descr_str += f'\n{merr!r}\n'
else:
descr_str += f'\nwith final result {ctx.outcome!r}\n'
logmeth( logmeth(
f'{message}\n' f'{message}\n'
@ -935,7 +916,7 @@ async def process_messages(
Receive (multiplexed) per-`Channel` RPC requests as msgs from Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_tn: Nursery`. `trio.Task`s inside the `Actor._service_n: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting) Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`) request payloads (eg. `started`, `yield`, `return`, `error`)
@ -960,7 +941,7 @@ async def process_messages(
''' '''
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
assert actor._service_tn # runtime state sanity assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
@ -1171,7 +1152,7 @@ async def process_messages(
start_status += '->( scheduling new task..\n' start_status += '->( scheduling new task..\n'
log.runtime(start_status) log.runtime(start_status)
try: try:
ctx: Context = await actor._service_tn.start( ctx: Context = await actor._service_n.start(
partial( partial(
_invoke, _invoke,
actor, actor,
@ -1311,7 +1292,7 @@ async def process_messages(
) as err: ) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn: Nursery = actor._service_tn sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel( log.cancel(
f'Service nursery cancelled before it handled {funcname}' f'Service nursery cancelled before it handled {funcname}'

View File

@ -35,15 +35,6 @@ for running all lower level spawning, supervision and msging layers:
SC-transitive RPC via scheduling of `trio` tasks. SC-transitive RPC via scheduling of `trio` tasks.
- registration of newly spawned actors with the discovery sys. - registration of newly spawned actors with the discovery sys.
Glossary:
--------
- tn: a `trio.Nursery` or "task nursery".
- an: an `ActorNursery` or "actor nursery".
- root: top/parent-most scope/task/process/actor (or other runtime
primitive) in a hierarchical tree.
- parent-ish: "higher-up" in the runtime-primitive hierarchy.
- child-ish: "lower-down" in the runtime-primitive hierarchy.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
@ -85,7 +76,6 @@ from tractor.msg import (
) )
from .trionics import ( from .trionics import (
collapse_eg, collapse_eg,
maybe_open_nursery,
) )
from .ipc import ( from .ipc import (
Channel, Channel,
@ -183,11 +173,10 @@ class Actor:
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`, # nursery placeholders filled in by `async_main()` after fork
# - after fork for subactors. _root_n: Nursery|None = None
# - during boot for the root actor. _service_n: Nursery|None = None
_root_tn: Nursery|None = None
_service_tn: Nursery|None = None
_ipc_server: _server.IPCServer|None = None _ipc_server: _server.IPCServer|None = None
@property @property
@ -1021,48 +1010,12 @@ class Actor:
the RPC service nursery. the RPC service nursery.
''' '''
actor_repr: str = _pformat.nest_from_op( assert self._service_n
input_op='>c(', self._service_n.start_soon(
text=self.pformat(),
nest_indent=1,
)
log.cancel(
'Actor.cancel_soon()` was called!\n'
f'>> scheduling `Actor.cancel()`\n'
f'{actor_repr}'
)
assert self._service_tn
self._service_tn.start_soon(
self.cancel, self.cancel,
None, # self cancel all rpc tasks None, # self cancel all rpc tasks
) )
# schedule a "canceller task" in the `._root_tn` once the
# `._service_tn` is fully shutdown; task waits for child-ish
# scopes to fully exit then finally cancels its parent,
# root-most, scope.
async def cancel_root_tn_after_services():
log.runtime(
'Waiting on service-tn to cancel..\n'
f'c>)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
)
await self._cancel_complete.wait()
log.cancel(
f'`._service_tn` cancelled\n'
f'>c)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
f'\n'
f'>> cancelling `._root_tn`\n'
f'c>(\n'
f' |_{self._root_tn.cancel_scope!r}\n'
)
self._root_tn.cancel_scope.cancel()
self._root_tn.start_soon(
cancel_root_tn_after_services
)
@property @property
def cancel_complete(self) -> bool: def cancel_complete(self) -> bool:
return self._cancel_complete.is_set() return self._cancel_complete.is_set()
@ -1167,8 +1120,8 @@ class Actor:
await ipc_server.wait_for_shutdown() await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently # cancel all rpc tasks permanently
if self._service_tn: if self._service_n:
self._service_tn.cancel_scope.cancel() self._service_n.cancel_scope.cancel()
log_meth(msg) log_meth(msg)
self._cancel_complete.set() self._cancel_complete.set()
@ -1305,7 +1258,7 @@ class Actor:
''' '''
Cancel all ongoing RPC tasks owned/spawned for a given Cancel all ongoing RPC tasks owned/spawned for a given
`parent_chan: Channel` or simply all tasks (inside `parent_chan: Channel` or simply all tasks (inside
`._service_tn`) when `parent_chan=None`. `._service_n`) when `parent_chan=None`.
''' '''
tasks: dict = self._rpc_tasks tasks: dict = self._rpc_tasks
@ -1517,55 +1470,46 @@ async def async_main(
accept_addrs.append(addr.unwrap()) accept_addrs.append(addr.unwrap())
assert accept_addrs assert accept_addrs
# The "root" nursery ensures the channel with the immediate
ya_root_tn: bool = bool(actor._root_tn) # parent is kept alive as a resilient service until
ya_service_tn: bool = bool(actor._service_tn) # cancellation steps have (mostly) occurred in
# a deterministic way.
# NOTE, a top-most "root" nursery in each actor-process
# enables a lifetime priority for the IPC-channel connection
# with a sub-actor's immediate parent. I.e. this connection
# is kept alive as a resilient service connection until all
# other machinery has exited, cancellation of all
# embedded/child scopes have completed. This helps ensure
# a deterministic (and thus "graceful")
# first-class-supervision style teardown where a parent actor
# (vs. say peers) is always the last to be contacted before
# disconnect.
root_tn: trio.Nursery root_tn: trio.Nursery
async with ( async with (
collapse_eg(), collapse_eg(),
maybe_open_nursery( trio.open_nursery() as root_tn,
nursery=actor._root_tn,
) as root_tn,
): ):
if ya_root_tn: actor._root_n = root_tn
assert root_tn is actor._root_tn assert actor._root_n
else:
actor._root_tn = root_tn
ipc_server: _server.IPCServer ipc_server: _server.IPCServer
async with ( async with (
collapse_eg(), collapse_eg(),
maybe_open_nursery( trio.open_nursery() as service_nursery,
nursery=actor._service_tn,
) as service_tn,
_server.open_ipc_server( _server.open_ipc_server(
parent_tn=service_tn, # ?TODO, why can't this be the root-tn parent_tn=service_nursery,
stream_handler_tn=service_tn, stream_handler_tn=service_nursery,
) as ipc_server, ) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
): ):
if ya_service_tn:
assert service_tn is actor._service_tn
else:
# This nursery is used to handle all inbound # This nursery is used to handle all inbound
# connections to us such that if the TCP server # connections to us such that if the TCP server
# is killed, connections can continue to process # is killed, connections can continue to process
# in the background until this nursery is cancelled. # in the background until this nursery is cancelled.
actor._service_tn = service_tn actor._service_n = service_nursery
# set after allocate
actor._ipc_server = ipc_server actor._ipc_server = ipc_server
assert (
actor._service_n
and (
actor._service_n
is
actor._ipc_server._parent_tn
is
ipc_server._stream_handler_tn
)
)
# load exposed/allowed RPC modules # load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent # XXX: do this **after** establishing a channel to the parent
@ -1591,11 +1535,10 @@ async def async_main(
# - root actor: the ``accept_addr`` passed to this method # - root actor: the ``accept_addr`` passed to this method
# TODO: why is this not with the root nursery? # TODO: why is this not with the root nursery?
# - see above that the `._service_tn` is what's used?
try: try:
eps: list = await ipc_server.listen_on( eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs, accept_addrs=accept_addrs,
stream_handler_nursery=service_tn, stream_handler_nursery=service_nursery,
) )
log.runtime( log.runtime(
f'Booted IPC server\n' f'Booted IPC server\n'
@ -1603,7 +1546,7 @@ async def async_main(
) )
assert ( assert (
(eps[0].listen_tn) (eps[0].listen_tn)
is not service_tn is not service_nursery
) )
except OSError as oserr: except OSError as oserr:
@ -1765,7 +1708,7 @@ async def async_main(
# XXX TODO but hard XXX # XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the # we can't actually do this bc the debugger uses the
# _service_tn to spawn the lock task, BUT, in theory if we had # _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be # the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way # actually possible to debug THIS machinery in the same way
# as user task code? # as user task code?

View File

@ -297,23 +297,6 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the # zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort. # removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
# TODO? attempt at intermediary-rent-sub
# with child in debug lock?
# |_https://github.com/goodboy/tractor/issues/320
#
# if not is_root_process():
# log.warning(
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
# )
# with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid,
# ) as _ctx:
# log.warning(
# 'Acquired debug lock, child ready to be killed ??\n'
# )
# TODO: toss in the skynet-logo face as ascii art? # TODO: toss in the skynet-logo face as ascii art?
log.critical( log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'

View File

@ -117,6 +117,7 @@ class ActorNursery:
] ]
] = {} ] = {}
self.cancelled: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False self._at_least_one_child_in_debug: bool = False
self.errors = errors self.errors = errors
@ -134,53 +135,10 @@ class ActorNursery:
# TODO: remove the `.run_in_actor()` API and thus this 2ndary # TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive! # nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery self._ria_nursery = ria_nursery
# TODO, factor this into a .hilevel api!
#
# portals spawned with ``run_in_actor()`` are # portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives # cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set() self._cancel_after_result_on_exit: set = set()
# trio.Nursery-like cancel (request) statuses
self._cancelled_caught: bool = False
self._cancel_called: bool = False
@property
def cancel_called(self) -> bool:
'''
Records whether cancellation has been requested for this
actor-nursery by a call to `.cancel()` either due to,
- an explicit call by some actor-local-task,
- an implicit call due to an error/cancel emited inside
the `tractor.open_nursery()` block.
'''
return self._cancel_called
@property
def cancelled_caught(self) -> bool:
'''
Set when this nursery was able to cance all spawned subactors
gracefully via an (implicit) call to `.cancel()`.
'''
return self._cancelled_caught
# TODO! remove internal/test-suite usage!
@property
def cancelled(self) -> bool:
warnings.warn(
"`ActorNursery.cancelled` is now deprecated, use "
" `.cancel_called` instead.",
DeprecationWarning,
stacklevel=2,
)
return (
self._cancel_called
# and
# self._cancelled_caught
)
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
@ -358,7 +316,7 @@ class ActorNursery:
''' '''
__runtimeframe__: int = 1 # noqa __runtimeframe__: int = 1 # noqa
self._cancel_called = True self.cancelled = True
# TODO: impl a repr for spawn more compact # TODO: impl a repr for spawn more compact
# then `._children`.. # then `._children`..
@ -436,8 +394,6 @@ class ActorNursery:
) in children.values(): ) in children.values():
log.warning(f"Hard killing process {proc}") log.warning(f"Hard killing process {proc}")
proc.terminate() proc.terminate()
else:
self._cancelled_caught
# mark ourselves as having (tried to have) cancelled all subactors # mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set() self._join_procs.set()
@ -446,12 +402,12 @@ class ActorNursery:
@acm @acm
async def _open_and_supervise_one_cancels_all_nursery( async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor, actor: Actor,
hide_tb: bool = True, tb_hide: bool = False,
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# normally don't need to show user by default # normally don't need to show user by default
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = tb_hide
outer_err: BaseException|None = None outer_err: BaseException|None = None
inner_err: BaseException|None = None inner_err: BaseException|None = None
@ -647,7 +603,6 @@ _shutdown_msg: str = (
@acm @acm
# @api_frame # @api_frame
async def open_nursery( async def open_nursery(
*, # named params only!
hide_tb: bool = True, hide_tb: bool = True,
**kwargs, **kwargs,
# ^TODO, paramspec for `open_root_actor()` # ^TODO, paramspec for `open_root_actor()`

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*, *,
tb: TracebackType|None = None, tb: TracebackType|None = None,
api_frame: FrameType|None = None, api_frame: FrameType|None = None,
hide_tb: bool = True, hide_tb: bool = False,
# only enter debugger REPL when returns `True` # only enter debugger REPL when returns `True`
debug_filter: Callable[ debug_filter: Callable[

View File

@ -58,7 +58,6 @@ from tractor._context import Context
from tractor import _state from tractor import _state
from tractor._exceptions import ( from tractor._exceptions import (
NoRuntime, NoRuntime,
InternalError,
) )
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
@ -80,9 +79,6 @@ from ._sigint import (
sigint_shield as sigint_shield, sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
@ -481,12 +477,12 @@ async def _pause(
# we have to figure out how to avoid having the service nursery # we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below: # cancel on this task start? I *think* this works below:
# ```python # ```python
# actor._service_tn.cancel_scope.shield = shield # actor._service_n.cancel_scope.shield = shield
# ``` # ```
# but not entirely sure if that's a sane way to implement it? # but not entirely sure if that's a sane way to implement it?
# NOTE currently we spawn the lock request task inside this # NOTE currently we spawn the lock request task inside this
# subactor's global `Actor._service_tn` so that the # subactor's global `Actor._service_n` so that the
# lifetime of the lock-request can outlive the current # lifetime of the lock-request can outlive the current
# `._pause()` scope while the user steps through their # `._pause()` scope while the user steps through their
# application code and when they finally exit the # application code and when they finally exit the
@ -510,7 +506,7 @@ async def _pause(
f'|_{task}\n' f'|_{task}\n'
) )
with trio.CancelScope(shield=shield): with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_tn.start( req_ctx: Context = await actor._service_n.start(
partial( partial(
request_root_stdio_lock, request_root_stdio_lock,
actor_uid=actor.uid, actor_uid=actor.uid,
@ -544,7 +540,7 @@ async def _pause(
_repl_fail_report = None _repl_fail_report = None
# when the actor is mid-runtime cancellation the # when the actor is mid-runtime cancellation the
# `Actor._service_tn` might get closed before we can spawn # `Actor._service_n` might get closed before we can spawn
# the request task, so just ignore expected RTE. # the request task, so just ignore expected RTE.
elif ( elif (
isinstance(pause_err, RuntimeError) isinstance(pause_err, RuntimeError)
@ -989,7 +985,7 @@ def pause_from_sync(
# that output and assign the `repl` created above! # that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run( bg_task, _ = trio.from_thread.run(
afn=partial( afn=partial(
actor._service_tn.start, actor._service_n.start,
partial( partial(
_pause_from_bg_root_thread, _pause_from_bg_root_thread,
behalf_of_thread=thread, behalf_of_thread=thread,
@ -1157,10 +1153,9 @@ def pause_from_sync(
'use_greenback', 'use_greenback',
False, False,
): ):
raise InternalError( raise RuntimeError(
f'`greenback` was never initialized in this actor?\n' '`greenback` was never initialized in this actor!?\n\n'
f'\n' f'{_state._runtime_vars}\n'
f'{ppfmt(_state._runtime_vars)}\n'
) from rte ) from rte
raise raise

View File

@ -101,27 +101,11 @@ class Channel:
# ^XXX! ONLY set if a remote actor sends an `Error`-msg # ^XXX! ONLY set if a remote actor sends an `Error`-msg
self._closed: bool = False self._closed: bool = False
# flag set by `Portal.cancel_actor()` indicating remote # flag set by ``Portal.cancel_actor()`` indicating remote
# (possibly peer) cancellation of the far end actor runtime. # (possibly peer) cancellation of the far end actor
# runtime.
self._cancel_called: bool = False self._cancel_called: bool = False
@property
def closed(self) -> bool:
'''
Was `.aclose()` successfully called?
'''
return self._closed
@property
def cancel_called(self) -> bool:
'''
Set when `Portal.cancel_actor()` is called on a portal which
wraps this IPC channel.
'''
return self._cancel_called
@property @property
def uid(self) -> tuple[str, str]: def uid(self) -> tuple[str, str]:
''' '''
@ -185,9 +169,7 @@ class Channel:
addr, addr,
**kwargs, **kwargs,
) )
# XXX, for UDS *no!* since we recv the peer-pid and build out assert transport.raddr == addr
# a new addr..
# assert transport.raddr == addr
chan = Channel(transport=transport) chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods? # ?TODO, compact this into adapter level-methods?
@ -303,7 +285,7 @@ class Channel:
self, self,
payload: Any, payload: Any,
hide_tb: bool = False, hide_tb: bool = True,
) -> None: ) -> None:
''' '''

View File

@ -17,38 +17,13 @@
Utils to tame mp non-SC madeness Utils to tame mp non-SC madeness
''' '''
import platform
def disable_mantracker(): def disable_mantracker():
''' '''
Disable all `multiprocessing` "resource tracking" machinery since Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness. it's an absolute multi-threaded mess of non-SC madness.
''' '''
from multiprocessing.shared_memory import SharedMemory from multiprocessing import resource_tracker as mantracker
# 3.13+ only.. can pass `track=False` to disable
# all the resource tracker bs.
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := (
platform.python_version_tuple()[:-1]
>=
('3', '13')
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
# !TODO, once we drop 3.12- we can obvi remove all this!
else:
from multiprocessing import (
resource_tracker as mantracker,
)
# Tell the "resource tracker" thing to fuck off. # Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker): class ManTracker(mantracker.ResourceTracker):
@ -68,8 +43,3 @@ def disable_mantracker():
mantracker.ensure_running = mantracker._resource_tracker.ensure_running mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT

View File

@ -1001,11 +1001,7 @@ class Server(Struct):
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
server=self, server=self,
stream_handler_tn=( stream_handler_tn=stream_handler_nursery,
stream_handler_nursery
or
self._stream_handler_tn
),
listen_addrs=accept_addrs, listen_addrs=accept_addrs,
) )
) )
@ -1149,17 +1145,13 @@ async def open_ipc_server(
async with maybe_open_nursery( async with maybe_open_nursery(
nursery=parent_tn, nursery=parent_tn,
) as parent_tn: ) as rent_tn:
no_more_peers = trio.Event() no_more_peers = trio.Event()
no_more_peers.set() no_more_peers.set()
ipc_server = IPCServer( ipc_server = IPCServer(
_parent_tn=parent_tn, _parent_tn=rent_tn,
_stream_handler_tn=( _stream_handler_tn=stream_handler_tn or rent_tn,
stream_handler_tn
or
parent_tn
),
_no_more_peers=no_more_peers, _no_more_peers=no_more_peers,
) )
try: try:

View File

@ -23,15 +23,14 @@ considered optional within the context of this runtime-library.
""" """
from __future__ import annotations from __future__ import annotations
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
ShareableList,
)
import platform
from sys import byteorder from sys import byteorder
import time import time
from typing import Optional from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import ( from msgspec import (
Struct, Struct,
@ -62,7 +61,7 @@ except ImportError:
log = get_logger(__name__) log = get_logger(__name__)
SharedMemory = disable_mantracker() disable_mantracker()
class SharedInt: class SharedInt:
@ -790,22 +789,10 @@ def open_shm_list(
readonly=readonly, readonly=readonly,
) )
# TODO, factor into a @actor_fixture acm-API?
# -[ ] also `@maybe_actor_fixture()` which inludes
# the .current_actor() convenience check?
# |_ orr can that just be in the sin-maybe-version?
#
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close) actor.lifetime_stack.callback(shml.shm.close)
# XXX on 3.13+ we don't need to call this?
# -> bc we pass `track=False` for `SharedMemeory` orr?
if (
platform.python_version_tuple()[:-1] < ('3', '13')
):
actor.lifetime_stack.callback(shml.shm.unlink) actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError: except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps') log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -430,25 +430,20 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError, ) as bre:
) as _re: trans_err = bre
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
match trans_err: match trans_err:
case trio.BrokenResourceError() if (
# XXX, specifc to UDS transport and its, '[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD # well, "speediness".. XD
# |_ likely todo with races related to how fast # |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux # the socket is setup/torn-down on linux
# as it pertains to rando pings from the # as it pertains to rando pings from the
# `.discovery` subsys and protos. # `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
): ):
tpt_closed = TransportClosed.from_src_exc( raise TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
@ -456,31 +451,14 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) ) from bre
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
# unless the disconnect condition falls under "a # unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn # normal operation breakage" we usualy console warn
# about it. # about it.
case _: case _:
log.exception( log.exception(
f'{tpt_name} layer failed pre-send ??\n' '{tpt_name} layer failed pre-send ??\n'
) )
raise trans_err raise trans_err
@ -525,7 +503,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str: def pformat(self) -> str:
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
f' |_peers: 1\n' f' |_peers: 2\n'
f' laddr: {self._laddr}\n' f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n' f' raddr: {self._raddr}\n'
# f'\n' # f'\n'

View File

@ -18,9 +18,6 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
from pathlib import Path from pathlib import Path
import os import os
from socket import ( from socket import (
@ -32,7 +29,6 @@ from socket import (
) )
import struct import struct
from typing import ( from typing import (
Type,
TYPE_CHECKING, TYPE_CHECKING,
ClassVar, ClassVar,
) )
@ -103,6 +99,8 @@ class UDSAddress(
self.filedir self.filedir
or or
self.def_bindspace self.def_bindspace
# or
# get_rt_dir()
) )
@property @property
@ -207,35 +205,12 @@ class UDSAddress(
f']' f']'
) )
@cm
def _reraise_as_connerr(
src_excs: tuple[Type[Exception]],
addr: UDSAddress,
):
try:
yield
except src_excs as src_exc:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
f'\n'
f'from src: {src_exc!r}\n'
) from src_exc
async def start_listener( async def start_listener(
addr: UDSAddress, addr: UDSAddress,
**kwargs, **kwargs,
) -> SocketListener: ) -> SocketListener:
''' # sock = addr._sock = socket.socket(
Start listening for inbound connections via
a `trio.SocketListener` (task) which `socket.bind()`s on `addr`.
Note, if the `UDSAddress.bindspace: Path` directory dne it is
implicitly created.
'''
sock = socket.socket( sock = socket.socket(
socket.AF_UNIX, socket.AF_UNIX,
socket.SOCK_STREAM socket.SOCK_STREAM
@ -246,25 +221,17 @@ async def start_listener(
f'|_{addr}\n' f'|_{addr}\n'
) )
# ?TODO? should we use the `actor.lifetime_stack`
# to rm on shutdown?
bindpath: Path = addr.sockpath bindpath: Path = addr.sockpath
if not (bs := addr.bindspace).is_dir(): try:
log.info(
'Creating bindspace dir in file-sys\n'
f'>{{\n'
f'|_{bs!r}\n'
)
bs.mkdir()
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
OSError,
),
addr=addr
):
await sock.bind(str(bindpath)) await sock.bind(str(bindpath))
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
) from fdne
sock.listen(1) sock.listen(1)
log.info( log.info(
@ -389,30 +356,27 @@ class MsgpackUDSStream(MsgpackTransport):
# `.setsockopt()` call tells the OS provide it; the client # `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via # pid can then be read on server/listen() side via
# `get_peer_info()` above. # `get_peer_info()` above.
try:
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
),
addr=addr
):
stream = await open_unix_socket_w_passcred( stream = await open_unix_socket_w_passcred(
str(sockpath), str(sockpath),
**kwargs **kwargs
) )
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
tpt_stream = MsgpackUDSStream( stream = MsgpackUDSStream(
stream, stream,
prefix_size=prefix_size, prefix_size=prefix_size,
codec=codec codec=codec
) )
# XXX assign from new addrs after peer-PID extract! stream._raddr = addr
( return stream
tpt_stream._laddr,
tpt_stream._raddr,
) = cls.get_stream_addrs(stream)
return tpt_stream
@classmethod @classmethod
def get_stream_addrs( def get_stream_addrs(

View File

@ -613,9 +613,10 @@ async def drain_to_final_msg(
# msg: dict = await ctx._rx_chan.receive() # msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught: # if res_cs.cancelled_caught:
# #
# -[x] make sure pause points work here for REPLing # -[ ] make sure pause points work here for REPLing
# the runtime itself; i.e. ensure there's no hangs! # the runtime itself; i.e. ensure there's no hangs!
# |_see masked code below in .cancel_called path # |_from tractor.devx.debug import pause
# await pause()
# NOTE: we get here if the far end was # NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases: # `ContextCancelled` in 2 cases:
@ -651,10 +652,6 @@ async def drain_to_final_msg(
f'IPC ctx cancelled externally during result drain ?\n' f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}' f'{ctx}'
) )
# XXX, for tracing `Cancelled`..
# from tractor.devx.debug import pause
# await pause(shield=True)
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's

View File

@ -130,7 +130,6 @@ class LinkedTaskChannel(
_trio_task: trio.Task _trio_task: trio.Task
_aio_task_complete: trio.Event _aio_task_complete: trio.Event
_closed_by_aio_task: bool = False
_suppress_graceful_exits: bool = True _suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None _trio_err: BaseException|None = None
@ -209,15 +208,10 @@ class LinkedTaskChannel(
async def aclose(self) -> None: async def aclose(self) -> None:
await self._from_aio.aclose() await self._from_aio.aclose()
# ?TODO? async version of this? def started(
def started_nowait(
self, self,
val: Any = None, val: Any = None,
) -> None: ) -> None:
'''
Synchronize aio-side with its trio-parent.
'''
self._aio_started_val = val self._aio_started_val = val
return self._to_trio.send_nowait(val) return self._to_trio.send_nowait(val)
@ -248,7 +242,6 @@ class LinkedTaskChannel(
# cycle on the trio side? # cycle on the trio side?
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
return await self._from_aio.receive() return await self._from_aio.receive()
except BaseException as err: except BaseException as err:
async with translate_aio_errors( async with translate_aio_errors(
chan=self, chan=self,
@ -326,7 +319,7 @@ def _run_asyncio_task(
qsize: int = 1, qsize: int = 1,
provide_channels: bool = False, provide_channels: bool = False,
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
hide_tb: bool = True, hide_tb: bool = False,
**kwargs, **kwargs,
) -> LinkedTaskChannel: ) -> LinkedTaskChannel:
@ -354,6 +347,18 @@ def _run_asyncio_task(
# value otherwise it would just return ;P # value otherwise it would just return ;P
assert qsize > 1 assert qsize > 1
if provide_channels:
assert 'to_trio' in args
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
trio_task: trio.Task = trio.lowlevel.current_task() trio_task: trio.Task = trio.lowlevel.current_task()
trio_cs = trio.CancelScope() trio_cs = trio.CancelScope()
aio_task_complete = trio.Event() aio_task_complete = trio.Event()
@ -368,25 +373,6 @@ def _run_asyncio_task(
_suppress_graceful_exits=suppress_graceful_exits, _suppress_graceful_exits=suppress_graceful_exits,
) )
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
if 'chan' in args:
kwargs['chan'] = chan
if provide_channels:
assert (
'to_trio' in args
or
'chan' in args
)
coro = func(**kwargs)
async def wait_on_coro_final_result( async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel, to_trio: trio.MemorySendChannel,
coro: Awaitable, coro: Awaitable,
@ -459,23 +445,9 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n' f'Task exited with final result: {result!r}\n'
) )
# XXX ALWAYS close the child-`asyncio`-task-side's # only close the sender side which will relay
# `to_trio` handle which will in turn relay # a `trio.EndOfChannel` to the trio (consumer) side.
# a `trio.EndOfChannel` to the `trio`-parent.
# Consequently the parent `trio` task MUST ALWAYS
# check for any `chan._aio_err` to be raised when it
# receives an EoC.
#
# NOTE, there are 2 EoC cases,
# - normal/graceful EoC due to the aio-side actually
# terminating its "streaming", but the task did not
# error and is not yet complete.
#
# - the aio-task terminated and we specially mark the
# closure as due to the `asyncio.Task`'s exit.
#
to_trio.close() to_trio.close()
chan._closed_by_aio_task = True
aio_task_complete.set() aio_task_complete.set()
log.runtime( log.runtime(
@ -673,9 +645,8 @@ def _run_asyncio_task(
not trio_cs.cancel_called not trio_cs.cancel_called
): ):
log.cancel( log.cancel(
f'Cancelling trio-side due to aio-side src exc\n' f'Cancelling `trio` side due to aio-side src exc\n'
f'\n' f'{curr_aio_err}\n'
f'{curr_aio_err!r}\n'
f'\n' f'\n'
f'(c>\n' f'(c>\n'
f' |_{trio_task}\n' f' |_{trio_task}\n'
@ -787,7 +758,6 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done() aio_done_before_trio: bool = aio_task.done()
assert aio_task assert aio_task
trio_err: BaseException|None = None trio_err: BaseException|None = None
eoc: trio.EndOfChannel|None = None
try: try:
yield # back to one of the cross-loop apis yield # back to one of the cross-loop apis
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
@ -819,48 +789,12 @@ async def translate_aio_errors(
# ) # )
# raise # raise
# XXX EoC is a special SIGNAL from the aio-side here! # XXX always passthrough EoC since this translator is often
# There are 2 cases to handle: # called from `LinkedTaskChannel.receive()` which we want
# 1. the "EoC passthrough" case. # passthrough and further we have no special meaning for it in
# - the aio-task actually closed the channel "gracefully" and # terms of relaying errors or signals from the aio side!
# the trio-task should unwind any ongoing channel except trio.EndOfChannel as eoc:
# iteration/receiving,
# |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
# in which case we want to relay the actual "end-of-chan" for
# iteration purposes.
#
# 2. relaying the "asyncio.Task termination" case.
# - if the aio-task terminates, maybe with an error, AND the
# `open_channel_from()` API was used, it will always signal
# that termination.
# |_`wait_on_coro_final_result()` always calls
# `to_trio.close()` when `provide_channels=True` so we need to
# always check if there is an aio-side exc which needs to be
# relayed to the parent trio side!
# |_in this case the special `chan._closed_by_aio_task` is
# ALWAYS set.
#
except trio.EndOfChannel as _eoc:
eoc = _eoc
if (
chan._closed_by_aio_task
and
aio_err
):
log.cancel(
f'The asyncio-child task terminated due to error\n'
f'{aio_err!r}\n'
)
chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc trio_err = chan._trio_err = eoc
#
# ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying
# call stack?
else:
trio_err = chan._trio_err = eoc
raise eoc raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
@ -1113,7 +1047,7 @@ async def translate_aio_errors(
# #
if wait_on_aio_task: if wait_on_aio_task:
await chan._aio_task_complete.wait() await chan._aio_task_complete.wait()
log.debug( log.info(
'asyncio-task is done and unblocked trio-side!\n' 'asyncio-task is done and unblocked trio-side!\n'
) )
@ -1130,17 +1064,11 @@ async def translate_aio_errors(
trio_to_raise: ( trio_to_raise: (
AsyncioCancelled| AsyncioCancelled|
AsyncioTaskExited| AsyncioTaskExited|
Exception| # relayed from aio-task
None None
) = chan._trio_to_raise ) = chan._trio_to_raise
raise_from: Exception = (
trio_err if (aio_err is trio_to_raise)
else aio_err
)
if not suppress_graceful_exits: if not suppress_graceful_exits:
raise trio_to_raise from raise_from raise trio_to_raise from (aio_err or trio_err)
if trio_to_raise: if trio_to_raise:
match ( match (
@ -1173,7 +1101,7 @@ async def translate_aio_errors(
) )
return return
case _: case _:
raise trio_to_raise from raise_from raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side # Check if the asyncio-side is the cause of the trio-side
# error. # error.
@ -1239,6 +1167,7 @@ async def run_task(
@acm @acm
async def open_channel_from( async def open_channel_from(
target: Callable[..., Any], target: Callable[..., Any],
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
**target_kwargs, **target_kwargs,
@ -1272,6 +1201,7 @@ async def open_channel_from(
# deliver stream handle upward # deliver stream handle upward
yield first, chan yield first, chan
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
# await tractor.pause(shield=True) # ya it worx ;)
if cs.cancel_called: if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled): if isinstance(chan._trio_to_raise, AsyncioCancelled):
log.cancel( log.cancel(

View File

@ -31,6 +31,7 @@ from typing import (
AsyncIterator, AsyncIterator,
Callable, Callable,
Hashable, Hashable,
Optional,
Sequence, Sequence,
TypeVar, TypeVar,
TYPE_CHECKING, TYPE_CHECKING,
@ -40,9 +41,6 @@ import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
# from ._beg import collapse_eg # from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -108,9 +106,6 @@ async def _enter_and_wait(
async def gather_contexts( async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
tuple[ tuple[
T | None, T | None,
@ -153,22 +148,17 @@ async def gather_contexts(
'`.trionics.gather_contexts()` input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n' '\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?\n'
'Check that list({mngrs}) works!\n' 'Use a non-lazy iterator or sequence-type intead!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
) )
try:
async with ( async with (
#
# ?TODO, does including these (eg-collapsing,
# taskc-unmasking) improve tb noise-reduction/legibility?
#
# collapse_eg(), # collapse_eg(),
maybe_open_nursery( trio.open_nursery(
nursery=tn, strict_exception_groups=False,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn, ) as tn,
# maybe_raise_from_masking_exc(),
): ):
for mngr in mngrs: for mngr in mngrs:
tn.start_soon( tn.start_soon(
@ -180,12 +170,11 @@ async def gather_contexts(
seed, seed,
) )
# deliver control to caller once all ctx-managers have # deliver control once all managers have started up
# started (yielded back to us).
await all_entered.wait() await all_entered.wait()
yield tuple(unwrapped.values())
parent_exit.set()
try:
yield tuple(unwrapped.values())
finally: finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug: # the following wacky bug:
@ -203,7 +192,7 @@ class _Cache:
a kept-alive-while-in-use async resource. a kept-alive-while-in-use async resource.
''' '''
service_tn: trio.Nursery|None = None service_n: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {} locks: dict[Hashable, trio.Lock] = {}
users: int = 0 users: int = 0
values: dict[Any, Any] = {} values: dict[Any, Any] = {}
@ -212,7 +201,7 @@ class _Cache:
tuple[trio.Nursery, trio.Event] tuple[trio.Nursery, trio.Event]
] = {} ] = {}
# nurseries: dict[int, trio.Nursery] = {} # nurseries: dict[int, trio.Nursery] = {}
no_more_users: trio.Event|None = None no_more_users: Optional[trio.Event] = None
@classmethod @classmethod
async def run_ctx( async def run_ctx(
@ -222,17 +211,15 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
try:
async with mng as value: async with mng as value:
_, no_more_users = cls.resources[ctx_key] _, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value cls.values[ctx_key] = value
task_status.started(value) task_status.started(value)
try:
await no_more_users.wait() await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally: finally:
# discard nursery ref so it won't be re-used (an error)? # discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key) cls.resources.pop(ctx_key)
@ -246,9 +233,6 @@ async def maybe_open_context(
kwargs: dict = {}, kwargs: dict = {},
key: Hashable | Callable[..., Hashable] = None, key: Hashable | Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]: ) -> AsyncIterator[tuple[bool, T]]:
''' '''
Maybe open an async-context-manager (acm) if there is not already Maybe open an async-context-manager (acm) if there is not already
@ -281,94 +265,40 @@ async def maybe_open_context(
# have it not be closed until all consumers have exited (which is # have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our # currently difficult to implement any other way besides using our
# pre-allocated runtime instance..) # pre-allocated runtime instance..)
if tn: service_n: trio.Nursery = current_actor()._service_n
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_tn = tn
else:
service_tn: trio.Nursery = current_actor()._service_tn
# TODO: is there any way to allocate # TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery? # a 'stays-open-till-last-task-finshed nursery?
# service_tn: trio.Nursery # service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn: # async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_tn = service_tn # _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try: try:
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler # checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource. # may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key] yielded = _Cache.values[ctx_key]
except KeyError as _ke: except KeyError:
# XXX, stay mutexed up to cache-miss yield log.debug(f'Allocating new {acm_func} for {ctx_key}')
try:
cache_miss_ke = _ke
log.debug(
f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
mngr = acm_func(**kwargs) mngr = acm_func(**kwargs)
resources = _Cache.resources resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_tn, trio.Event()) resources[ctx_key] = (service_n, trio.Event())
yielded: Any = await service_tn.start(
# sync up to the mngr's yielded value
yielded = await service_n.start(
_Cache.run_ctx, _Cache.run_ctx,
mngr, mngr,
ctx_key, ctx_key,
) )
_Cache.users += 1 _Cache.users += 1
finally:
# XXX, since this runs from an `except` it's a checkpoint
# whih can be `trio.Cancelled`-masked.
#
# NOTE, in that case the mutex is never released by the
# (first and) caching task and **we can't** simply shield
# bc that will inf-block on the `await
# no_more_users.wait()`.
#
# SO just always unlock!
lock.release() lock.release()
yield False, yielded
try:
yield (
False, # cache_hit = "no"
yielded,
)
except trio.Cancelled as taskc:
maybe_taskc = taskc
log.cancel(
f'Cancelled from cache-miss entry\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
else: else:
_Cache.users += 1 _Cache.users += 1
log.debug( log.runtime(
f'Re-using cached resource for user {_Cache.users}\n\n' f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n' f'{ctx_key!r} -> {type(yielded)}\n'
@ -378,19 +308,9 @@ async def maybe_open_context(
# f'{ctx_key!r} -> {yielded!r}\n' # f'{ctx_key!r} -> {yielded!r}\n'
) )
lock.release() lock.release()
yield ( yield True, yielded
True, # cache_hit = "yes"
yielded,
)
finally: finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
log.error(
f'Lock left locked by last owner !?\n'
f'{stats}\n'
)
_Cache.users -= 1 _Cache.users -= 1
if yielded is not None: if yielded is not None:

View File

@ -22,14 +22,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
import inspect from typing import TYPE_CHECKING
from types import (
TracebackType,
)
from typing import (
Type,
TYPE_CHECKING,
)
import trio import trio
from tractor.log import get_logger from tractor.log import get_logger
@ -67,71 +60,12 @@ def find_masked_excs(
return None return None
_mask_cases: dict[
Type[Exception], # masked exc type
dict[
int, # inner-frame index into `inspect.getinnerframes()`
# `FrameInfo.function/filename: str`s to match
dict[str, str],
],
] = {
trio.WouldBlock: {
# `trio.Lock.acquire()` has a checkpoint inside the
# `WouldBlock`-no_wait path's handler..
-5: { # "5th frame up" from checkpoint
'filename': 'trio/_sync.py',
'function': 'acquire',
# 'lineno': 605, # matters?
},
}
}
def is_expected_masking_case(
cases: dict,
exc_ctx: Exception,
exc_match: BaseException,
) -> bool|inspect.FrameInfo:
'''
Determine whether the provided masked exception is from a known
bug/special/unintentional-`trio`-impl case which we do not wish
to unmask.
Return any guilty `inspect.FrameInfo` ow `False`.
'''
exc_tb: TracebackType = exc_match.__traceback__
if cases := _mask_cases.get(type(exc_ctx)):
inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb)
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
for iframe, matchon in cases.items():
try:
masker_frame: inspect.FrameInfo = inner[iframe]
except IndexError:
continue
for field, in_field in matchon.items():
val = getattr(
masker_frame,
field,
)
if in_field not in val:
break
else:
return masker_frame
return False
# XXX, relevant discussion @ `trio`-core, # XXX, relevant discussion @ `trio`-core,
# https://github.com/python-trio/trio/issues/455 # https://github.com/python-trio/trio/issues/455
# #
@acm @acm
async def maybe_raise_from_masking_exc( async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,
unmask_from: ( unmask_from: (
BaseException| BaseException|
tuple[BaseException] tuple[BaseException]
@ -140,30 +74,18 @@ async def maybe_raise_from_masking_exc(
raise_unmasked: bool = True, raise_unmasked: bool = True,
extra_note: str = ( extra_note: str = (
'This can occurr when,\n' 'This can occurr when,\n'
'\n' ' - a `trio.Nursery` scope embeds a `finally:`-block '
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block ' 'which executes a checkpoint!'
'which execs an un-shielded checkpoint!'
# #
# ^TODO? other cases? # ^TODO? other cases?
), ),
always_warn_on: tuple[Type[BaseException]] = ( always_warn_on: tuple[BaseException] = (
trio.Cancelled, trio.Cancelled,
), ),
# don't ever unmask or warn on any masking pair,
# {<masked-excT-key> -> <masking-excT-value>}
never_warn_on: dict[
Type[BaseException],
Type[BaseException],
] = {
KeyboardInterrupt: trio.Cancelled,
trio.Cancelled: trio.Cancelled,
},
# ^XXX, special case(s) where we warn-log bc likely # ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc # there will be no operational diff since the exc
# is always expected to be consumed. # is always expected to be consumed.
) -> BoxedMaybeException: ) -> BoxedMaybeException:
''' '''
Maybe un-mask and re-raise exception(s) suppressed by a known Maybe un-mask and re-raise exception(s) suppressed by a known
@ -182,112 +104,81 @@ async def maybe_raise_from_masking_exc(
individual sub-excs but maintain the eg-parent's form right? individual sub-excs but maintain the eg-parent's form right?
''' '''
if not isinstance(unmask_from, tuple):
raise ValueError(
f'Invalid unmask_from = {unmask_from!r}\n'
f'Must be a `tuple[Type[BaseException]]`.\n'
)
from tractor.devx.debug import ( from tractor.devx.debug import (
BoxedMaybeException, BoxedMaybeException,
pause,
) )
boxed_maybe_exc = BoxedMaybeException( boxed_maybe_exc = BoxedMaybeException(
raise_on_exit=raise_unmasked, raise_on_exit=raise_unmasked,
) )
matching: list[BaseException]|None = None matching: list[BaseException]|None = None
try: maybe_eg: ExceptionGroup|None
if tn:
try: # handle egs
yield boxed_maybe_exc yield boxed_maybe_exc
return return
except BaseException as _bexc: except* unmask_from as _maybe_eg:
bexc = _bexc maybe_eg = _maybe_eg
if isinstance(bexc, BaseExceptionGroup):
matches: ExceptionGroup matches: ExceptionGroup
matches, _ = bexc.split(unmask_from) matches, _ = maybe_eg.split(
if matches:
matching = matches.exceptions
elif (
unmask_from unmask_from
and )
type(bexc) in unmask_from if not matches:
): raise
matching = [bexc]
matching: list[BaseException] = matches.exceptions
else:
try: # handle non-egs
yield boxed_maybe_exc
return
except unmask_from as _maybe_exc:
maybe_exc = _maybe_exc
matching: list[BaseException] = [
maybe_exc
]
# XXX, only unmask-ed for debuggin!
# TODO, remove eventually..
except BaseException as _berr:
berr = _berr
await pause(shield=True)
raise berr
if matching is None: if matching is None:
raise raise
masked: list[tuple[BaseException, BaseException]] = [] masked: list[tuple[BaseException, BaseException]] = []
for exc_match in matching: for exc_match in matching:
if exc_ctx := find_masked_excs( if exc_ctx := find_masked_excs(
maybe_masker=exc_match, maybe_masker=exc_match,
unmask_from=set(unmask_from), unmask_from={unmask_from},
): ):
masked.append(( masked.append((exc_ctx, exc_match))
exc_ctx,
exc_match,
))
boxed_maybe_exc.value = exc_match boxed_maybe_exc.value = exc_match
note: str = ( note: str = (
f'\n' f'\n'
f'^^WARNING^^\n' f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
) )
if extra_note: if extra_note:
note += ( note += (
f'\n' f'\n'
f'{extra_note}\n' f'{extra_note}\n'
) )
do_warn: bool = (
never_warn_on.get(
type(exc_ctx) # masking type
)
is not
type(exc_match) # masked type
)
if do_warn:
exc_ctx.add_note(note) exc_ctx.add_note(note)
if ( if type(exc_match) in always_warn_on:
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note) log.warning(note)
if ( # await tractor.pause(shield=True)
do_warn if raise_unmasked:
and
raise_unmasked
):
if len(masked) < 2: if len(masked) < 2:
# don't unmask already known "special" cases..
if (
_mask_cases
and
(cases := _mask_cases.get(type(exc_ctx)))
and
(masker_frame := is_expected_masking_case(
cases,
exc_ctx,
exc_match,
))
):
log.warning(
f'Ignoring already-known, non-ideal-but-valid '
f'masker code @\n'
f'{masker_frame}\n'
f'\n'
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
)
raise exc_match
raise exc_ctx from exc_match raise exc_ctx from exc_match
else:
# ??TODO, see above but, possibly unmasking sub-exc # ?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1 # entries if there are > 1
# else: await pause(shield=True)
# await pause(shield=True)
else: else:
raise raise