Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 23240c31e3 Stackscope import fail msg dun need braces.. 2025-07-29 15:18:13 -04:00
Tyler Goodlet 6a82bab627 Always pop `._Cache.resources` AFTER `mng.__aexit__()`
The correct ordering is to de-alloc the surrounding `service_n`
+ `trio.Event` **after** the `mng` teardown ensuring the
`mng.__aexit__()` never can hit a ref-error if it touches either (like
if a `tn` is passed to `maybe_open_context()`!
2025-07-29 15:17:37 -04:00
Tyler Goodlet b485297411 Multi-line-style up the UDS fast-connect handler
Shift around comments and expressions for better reading, assign
`tpt_closed` for easier introspection from REPL during debug oh and fix
the `MsgpackTransport.pformat()` to render '|_peers: 1' .. XD
2025-07-29 15:07:43 -04:00
Tyler Goodlet dd23ef1d95 Drop duplicated (masked) debugging-`terminate_after`, prolly a rebase slip.. 2025-07-29 15:05:38 -04:00
Tyler Goodlet 2ec3ff46cd Log "out-of-layer" cancellation in `._rpc._invoke()`
Similar to what was just changed for `Context.repr_state`, when the
child task is cancelled but by a different "layer" of the runtime (i.e.
a `Portal.cancel_actor()` / `SIGINT`-to-process canceller) we don't
dump a traceback instead just `log.cancel()` emit.
2025-07-29 15:01:47 -04:00
Tyler Goodlet 967d0e4836 Handle "out-of-layer" remote `Context` cancellation
Such that if the local task hasn't resolved but is `trio.Cancelled` and
a `.canceller` was set, we report a `'actor-cancelled'` from
`.repr_state: str`. Bit of formatting to avoid needless newlines too!
2025-07-29 14:58:18 -04:00
Tyler Goodlet 5ccb36af57 Mk `pause_from_sync()` raise `InternalError` on no `greenback` init 2025-07-29 14:57:16 -04:00
Tyler Goodlet 28f8546ac5 Hide `_maybe_enter_pm()` frame (again?) 2025-07-29 14:55:18 -04:00
Tyler Goodlet 0ff0971aca Adjust `test_trio_prestarted_task_bubbles()` suite to expect non-eg raises 2025-07-29 14:54:10 -04:00
Tyler Goodlet dc1091016b Bit of multi-line styling / name tweaks in cancellation suites 2025-07-29 14:51:44 -04:00
10 changed files with 120 additions and 80 deletions

View File

@ -284,20 +284,32 @@ async def test_cancel_infinite_streamer(start_method):
], ],
) )
@tractor_test @tractor_test
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): async def test_some_cancels_all(
"""Verify a subset of failed subactors causes all others in num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio. the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment. This is the first and only supervisory strategy at the moment.
"""
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs '''
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
try: try:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
# spawn the same number of deamon actors which should be cancelled # spawn the same number of deamon actors which should be cancelled
dactor_portals = [] dactor_portals = []
for i in range(num_actors): for i in range(num_actors):
dactor_portals.append(await n.start_actor( dactor_portals.append(await an.start_actor(
f'deamon_{i}', f'deamon_{i}',
enable_modules=[__name__], enable_modules=[__name__],
)) ))
@ -307,7 +319,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
for i in range(num_actors): for i in range(num_actors):
# start actor(s) that will fail immediately # start actor(s) that will fail immediately
riactor_portals.append( riactor_portals.append(
await n.run_in_actor( await an.run_in_actor(
func, func,
name=f'actor_{i}', name=f'actor_{i}',
**kwargs **kwargs
@ -337,7 +349,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError`` # should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err: except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup): if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
@ -348,8 +361,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
elif isinstance(err, tractor.RemoteActorError): elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type assert err.boxed_type == err_type
assert n.cancelled is True assert an.cancelled is True
assert not n._children assert not an._children
else: else:
pytest.fail("Should have gotten a remote assertion error?") pytest.fail("Should have gotten a remote assertion error?")
@ -559,8 +572,10 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
async with tractor.open_nursery() as tn: async with (
await tn.run_in_actor( tractor.open_nursery() as an
):
await an.run_in_actor(
spawn, spawn,
name='spawn', name='spawn',
) )

View File

@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever() await trio.sleep_forever()
async def _trio_main(): async def _trio_main():
# with trio.fail_after(2): with trio.fail_after(2 if not debug_mode else 999):
with trio.fail_after(999):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event() aio_ev = asyncio.Event()
@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
): ):
aio_ev.set() aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side # ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first. # having (maybe) errored first.
if aio_err_trigger in ( if aio_err_trigger in (
'after_trio_task_starts', 'after_trio_task_starts',
'after_start_point', 'after_start_point',
): ):
assert len(errs := rest_eg.exceptions) == 1 patt: str = 'trio-side'
typerr = errs[0] expect_exc = TypeError
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should # when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side. # never see anythinb but the aio-side.
else: else:
assert len(rtes := rte_eg.exceptions) == 1 patt: str = 'asyncio-side'
assert 'asyncio-side' in rtes[0].args[0] expect_exc = RuntimeError
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args

View File

@ -1011,7 +1011,6 @@ class Context:
else: else:
log.cancel( log.cancel(
f'Timed out on cancel request of remote task?\n' f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}' f'{reminfo}'
) )
@ -1492,6 +1491,12 @@ class Context:
): ):
status = 'peer-cancelled' status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition # (remote) error condition
case ( case (
Unresolved, Unresolved,
@ -2273,7 +2278,7 @@ async def open_context_from_portal(
logmeth = log.exception logmeth = log.exception
logmeth( logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n' f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
) )
if debug_mode(): if debug_mode():

View File

@ -661,7 +661,7 @@ async def _invoke(
tn: Nursery tn: Nursery
rpc_ctx_cs: CancelScope rpc_ctx_cs: CancelScope
async with ( async with (
collapse_eg(), collapse_eg(hide_tb=False),
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
@ -854,24 +854,44 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n' f'after having {ctx.repr_state!r}\n'
) )
if merr: if merr:
logmeth: Callable = log.error logmeth: Callable = log.error
if isinstance(merr, ContextCancelled): if (
logmeth: Callable = log.runtime # ctxc: by `Context.cancel()`
isinstance(merr, ContextCancelled)
if not isinstance(merr, RemoteActorError): # out-of-layer cancellation, one of:
tb_str: str = ''.join(traceback.format_exception(merr)) # - actorc: by `Portal.cancel_actor()`
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
descr_str += ( descr_str += (
f'\n{merr!r}\n' # needed? f'\n{merr!r}\n' # needed?
f'{tb_str}\n' f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
) )
else: else:
descr_str += f'\n{merr!r}\n' descr_str += (
f'{merr!r}\n'
)
else: else:
descr_str += f'\nwith final result {ctx.outcome!r}\n' descr_str += (
f'\n'
f'with final result {ctx.outcome!r}\n'
)
logmeth( logmeth(
f'{message}\n' f'{message}\n'

View File

@ -236,10 +236,6 @@ async def hard_kill(
# whilst also hacking on it XD # whilst also hacking on it XD
# terminate_after: int = 99999, # terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None: ) -> None:
''' '''
Un-gracefully terminate an OS level `trio.Process` after timeout. Un-gracefully terminate an OS level `trio.Process` after timeout.

View File

@ -237,9 +237,9 @@ def enable_stack_on_sig(
try: try:
import stackscope import stackscope
except ImportError: except ImportError:
log.error( log.warning(
'`stackscope` not installed for use in debug mode!\n' 'The `stackscope` lib is not installed!\n'
'`Ignoring {enable_stack_on_sig!r} call!\n' '`Ignoring enable_stack_on_sig() call!\n'
) )
return None return None

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*, *,
tb: TracebackType|None = None, tb: TracebackType|None = None,
api_frame: FrameType|None = None, api_frame: FrameType|None = None,
hide_tb: bool = False, hide_tb: bool = True,
# only enter debugger REPL when returns `True` # only enter debugger REPL when returns `True`
debug_filter: Callable[ debug_filter: Callable[

View File

@ -58,6 +58,7 @@ from tractor._context import Context
from tractor import _state from tractor import _state
from tractor._exceptions import ( from tractor._exceptions import (
NoRuntime, NoRuntime,
InternalError,
) )
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
@ -79,6 +80,9 @@ from ._sigint import (
sigint_shield as sigint_shield, sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
@ -1153,9 +1157,10 @@ def pause_from_sync(
'use_greenback', 'use_greenback',
False, False,
): ):
raise RuntimeError( raise InternalError(
'`greenback` was never initialized in this actor!?\n\n' f'`greenback` was never initialized in this actor?\n'
f'{_state._runtime_vars}\n' f'\n'
f'{ppfmt(_state._runtime_vars)}\n'
) from rte ) from rte
raise raise

View File

@ -430,20 +430,24 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
) as bre: ) as _re:
trans_err = bre trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
match trans_err: match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0] # XXX, specifc to UDS transport and its,
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD # well, "speediness".. XD
# |_ likely todo with races related to how fast # |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux # the socket is setup/torn-down on linux
# as it pertains to rando pings from the # as it pertains to rando pings from the
# `.discovery` subsys and protos. # `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
): ):
raise TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
@ -451,14 +455,15 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) from bre )
raise tpt_closed from trans_err
# unless the disconnect condition falls under "a # unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn # normal operation breakage" we usualy console warn
# about it. # about it.
case _: case _:
log.exception( log.exception(
'{tpt_name} layer failed pre-send ??\n' f'{tpt_name} layer failed pre-send ??\n'
) )
raise trans_err raise trans_err
@ -503,7 +508,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str: def pformat(self) -> str:
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
f' |_peers: 2\n' f' |_peers: 1\n'
f' laddr: {self._laddr}\n' f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n' f' raddr: {self._raddr}\n'
# f'\n' # f'\n'

View File

@ -223,15 +223,17 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
try:
async with mng as value: async with mng as value:
_, no_more_users = cls.resources[ctx_key] _, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value cls.values[ctx_key] = value
task_status.started(value) task_status.started(value)
try:
await no_more_users.wait() await no_more_users.wait()
finally: finally:
# discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key) value = cls.values.pop(ctx_key)
finally:
# discard nursery ref so it won't be re-used (an error)?
cls.resources.pop(ctx_key) cls.resources.pop(ctx_key)