Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 23240c31e3 Stackscope import fail msg dun need braces.. 2025-07-29 15:18:13 -04:00
Tyler Goodlet 6a82bab627 Always pop `._Cache.resources` AFTER `mng.__aexit__()`
The correct ordering is to de-alloc the surrounding `service_n`
+ `trio.Event` **after** the `mng` teardown ensuring the
`mng.__aexit__()` never can hit a ref-error if it touches either (like
if a `tn` is passed to `maybe_open_context()`!
2025-07-29 15:17:37 -04:00
Tyler Goodlet b485297411 Multi-line-style up the UDS fast-connect handler
Shift around comments and expressions for better reading, assign
`tpt_closed` for easier introspection from REPL during debug oh and fix
the `MsgpackTransport.pformat()` to render '|_peers: 1' .. XD
2025-07-29 15:07:43 -04:00
Tyler Goodlet dd23ef1d95 Drop duplicated (masked) debugging-`terminate_after`, prolly a rebase slip.. 2025-07-29 15:05:38 -04:00
Tyler Goodlet 2ec3ff46cd Log "out-of-layer" cancellation in `._rpc._invoke()`
Similar to what was just changed for `Context.repr_state`, when the
child task is cancelled but by a different "layer" of the runtime (i.e.
a `Portal.cancel_actor()` / `SIGINT`-to-process canceller) we don't
dump a traceback instead just `log.cancel()` emit.
2025-07-29 15:01:47 -04:00
Tyler Goodlet 967d0e4836 Handle "out-of-layer" remote `Context` cancellation
Such that if the local task hasn't resolved but is `trio.Cancelled` and
a `.canceller` was set, we report a `'actor-cancelled'` from
`.repr_state: str`. Bit of formatting to avoid needless newlines too!
2025-07-29 14:58:18 -04:00
Tyler Goodlet 5ccb36af57 Mk `pause_from_sync()` raise `InternalError` on no `greenback` init 2025-07-29 14:57:16 -04:00
Tyler Goodlet 28f8546ac5 Hide `_maybe_enter_pm()` frame (again?) 2025-07-29 14:55:18 -04:00
Tyler Goodlet 0ff0971aca Adjust `test_trio_prestarted_task_bubbles()` suite to expect non-eg raises 2025-07-29 14:54:10 -04:00
Tyler Goodlet dc1091016b Bit of multi-line styling / name tweaks in cancellation suites 2025-07-29 14:51:44 -04:00
10 changed files with 120 additions and 80 deletions

View File

@ -284,20 +284,32 @@ async def test_cancel_infinite_streamer(start_method):
],
)
@tractor_test
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
"""Verify a subset of failed subactors causes all others in
async def test_some_cancels_all(
num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment.
"""
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
'''
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
try:
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await n.start_actor(
dactor_portals.append(await an.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
@ -307,7 +319,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await n.run_in_actor(
await an.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
@ -337,7 +349,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err:
except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
@ -348,8 +361,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
assert n.cancelled is True
assert not n._children
assert an.cancelled is True
assert not an._children
else:
pytest.fail("Should have gotten a remote assertion error?")
@ -559,8 +572,10 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
await tn.run_in_actor(
async with (
tractor.open_nursery() as an
):
await an.run_in_actor(
spawn,
name='spawn',
)

View File

@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever()
async def _trio_main():
# with trio.fail_after(2):
with trio.fail_after(999):
with trio.fail_after(2 if not debug_mode else 999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
):
aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
assert len(errs := rest_eg.exceptions) == 1
typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
patt: str = 'trio-side'
expect_exc = TypeError
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
assert len(rtes := rte_eg.exceptions) == 1
assert 'asyncio-side' in rtes[0].args[0]
patt: str = 'asyncio-side'
expect_exc = RuntimeError
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args

View File

@ -1011,7 +1011,6 @@ class Context:
else:
log.cancel(
f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}'
)
@ -1492,6 +1491,12 @@ class Context:
):
status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition
case (
Unresolved,
@ -2273,7 +2278,7 @@ async def open_context_from_portal(
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
)
if debug_mode():

View File

@ -661,7 +661,7 @@ async def _invoke(
tn: Nursery
rpc_ctx_cs: CancelScope
async with (
collapse_eg(),
collapse_eg(hide_tb=False),
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
@ -854,24 +854,44 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n'
)
if merr:
logmeth: Callable = log.error
if isinstance(merr, ContextCancelled):
logmeth: Callable = log.runtime
if (
# ctxc: by `Context.cancel()`
isinstance(merr, ContextCancelled)
if not isinstance(merr, RemoteActorError):
tb_str: str = ''.join(traceback.format_exception(merr))
# out-of-layer cancellation, one of:
# - actorc: by `Portal.cancel_actor()`
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
descr_str += (
f'\n{merr!r}\n' # needed?
f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
)
else:
descr_str += f'\n{merr!r}\n'
descr_str += (
f'{merr!r}\n'
)
else:
descr_str += f'\nwith final result {ctx.outcome!r}\n'
descr_str += (
f'\n'
f'with final result {ctx.outcome!r}\n'
)
logmeth(
f'{message}\n'

View File

@ -236,10 +236,6 @@ async def hard_kill(
# whilst also hacking on it XD
# terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.

View File

@ -237,9 +237,9 @@ def enable_stack_on_sig(
try:
import stackscope
except ImportError:
log.error(
'`stackscope` not installed for use in debug mode!\n'
'`Ignoring {enable_stack_on_sig!r} call!\n'
log.warning(
'The `stackscope` lib is not installed!\n'
'`Ignoring enable_stack_on_sig() call!\n'
)
return None

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*,
tb: TracebackType|None = None,
api_frame: FrameType|None = None,
hide_tb: bool = False,
hide_tb: bool = True,
# only enter debugger REPL when returns `True`
debug_filter: Callable[

View File

@ -58,6 +58,7 @@ from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
NoRuntime,
InternalError,
)
from tractor._state import (
current_actor,
@ -79,6 +80,9 @@ from ._sigint import (
sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header
)
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING:
from trio.lowlevel import Task
@ -1153,9 +1157,10 @@ def pause_from_sync(
'use_greenback',
False,
):
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
raise InternalError(
f'`greenback` was never initialized in this actor?\n'
f'\n'
f'{ppfmt(_state._runtime_vars)}\n'
) from rte
raise

View File

@ -430,20 +430,24 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as bre:
trans_err = bre
) as _re:
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}'
match trans_err:
# XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
'[Errno 32] Broken pipe'
in
trans_err.args[0]
):
raise TransportClosed.from_src_exc(
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
@ -451,14 +455,15 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
) from bre
)
raise tpt_closed from trans_err
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
'{tpt_name} layer failed pre-send ??\n'
f'{tpt_name} layer failed pre-send ??\n'
)
raise trans_err
@ -503,7 +508,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_peers: 2\n'
f' |_peers: 1\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'

View File

@ -223,16 +223,18 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
cls.values[ctx_key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
# discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key)
try:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value
task_status.started(value)
await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally:
# discard nursery ref so it won't be re-used (an error)?
cls.resources.pop(ctx_key)
@acm