Compare commits

...

12 Commits

Author SHA1 Message Date
Tyler Goodlet 9e24a01d55 Toss in masked `.set_trace()` for unshielded `.pause()` debug 2025-08-20 13:02:51 -04:00
Tyler Goodlet 122c855db7 Mask tpt-closed handling of `chan.send(return_msg)`
A partial revert of commit c05d08e426 since it seem we already
suppress tpt-closed errors lower down in `.ipc.Channel.send()`; given
that i'm pretty sure this new handler code should basically never run?

Left in a todo to remove the masked content once i'm done more
thoroughly testing under `piker`.
2025-08-20 12:45:54 -04:00
Tyler Goodlet 8802756216 Always pop `._Cache.resources` AFTER `mng.__aexit__()`
The correct ordering is to de-alloc the surrounding `service_n`
+ `trio.Event` **after** the `mng` teardown ensuring the
`mng.__aexit__()` never can hit a ref-error if it touches either (like
if a `tn` is passed to `maybe_open_context()`!
2025-08-20 12:45:54 -04:00
Tyler Goodlet 25d6738d03 More `TransportClosed`-handling around IPC-IO
For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports-n-reraises the exc (same as prior behaviour).
  * originally i thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * hence the also-added-bu-masked-out `debug_filter` / guard expression
    around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
2025-08-20 12:45:54 -04:00
Tyler Goodlet 5ab642bdf0 Drop more `typing.Optional` usage 2025-08-20 12:45:49 -04:00
Tyler Goodlet ed18ecd064 Drop `tn` arg to `maybe_raise_from_masking_exc()` in `._rpc` 2025-08-20 12:45:49 -04:00
Tyler Goodlet cec0282953 Add `never_warn_on: dict` support to unmasker
Such that key->value pairs can be defined which should *never be*
unmasked where values of
- the keys are exc-types which might be masked, and
- the values are exc-types which masked the equivalent key.

For example, the default includes:
- KBI->taskc: a kbi should never be unmasked from its masking
  `trio.Cancelled`.

For the impl, a new `do_warn: bool` in the fn-body determines the
primary guard for whether a warning or re-raising is necessary.
2025-08-20 12:45:49 -04:00
Tyler Goodlet 25c5847f2e Drop `tn` input from `maybe_raise_from_masking_exc()`
Including all caller usage throughout. Moving to a non-`except*` impl
means it's never needed as a signal from the caller - we can just catch
the beg outright (like we should have always been doing)..
2025-08-20 12:45:49 -04:00
Tyler Goodlet ba793fadd9 Pass `tuple` from `._invoke()` unmasker usage
To match the `maybe_raise_from_masking_exc()` sig change.
2025-08-20 12:45:49 -04:00
Tyler Goodlet d17864a432 Adjust test suites to new `maybe_raise_from_masking_exc()` changes 2025-08-20 12:45:49 -04:00
Tyler Goodlet 6c361a9564 Drop `except*` usage from `._taskc` unmasker
That is from `maybe_raise_from_masking_exc()` thus minimizing us to
a single `except BaseException` block with logic branching for the beg
vs. `unmask_from` exc cases.

Also,
- raise val-err when `unmask_from` is not a `tuple`.
- tweak the exc-note warning format.
- drop all pausing from dev work.
2025-08-20 12:45:49 -04:00
Tyler Goodlet 34ca7429c7 Add a "real-world" example of cancelled-masking with `.aclose()` 2025-08-20 12:45:49 -04:00
6 changed files with 284 additions and 80 deletions

View File

@ -0,0 +1,145 @@
from contextlib import (
contextmanager as cm,
# TODO, any diff in async case(s)??
# asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(__name__)
tractor.log.get_console_log('info')
@cm
def teardown_on_exc(
raise_from_handler: bool = False,
):
'''
You could also have a teardown handler which catches any exc and
does some required teardown. In this case the problem is
compounded UNLESS you ensure the handler's scope is OUTSIDE the
`ux.aclose()`.. that is in the caller's enclosing scope.
'''
try:
yield
except BaseException as _berr:
berr = _berr
log.exception(
f'Handling termination teardown in child due to,\n'
f'{berr!r}\n'
)
if raise_from_handler:
# XXX teardown ops XXX
# on termination these steps say need to be run to
# ensure wider system consistency (like the state of
# remote connections/services).
#
# HOWEVER, any bug in this teardown code is also
# masked by the `tx.aclose()`!
# this is also true if `_tn.cancel_scope` is
# `.cancel_called` by the parent in a graceful
# request case..
# simulate a bug in teardown handler.
raise RuntimeError(
'woopsie teardown bug!'
)
raise # no teardown bug.
async def finite_stream_to_rent(
tx: trio.abc.SendChannel,
child_errors_mid_stream: bool,
task_status: trio.TaskStatus[
trio.CancelScope,
] = trio.TASK_STATUS_IGNORED,
):
async with (
# XXX without this unmasker the mid-streaming RTE is never
# reported since it is masked by the `tx.aclose()`
# call which in turn raises `Cancelled`!
#
# NOTE, this is WITHOUT doing any exception handling
# inside the child task!
#
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
# tractor.trionics.maybe_raise_from_masking_exc(),
tx as tx, # .aclose() is the guilty masker chkpt!
trio.open_nursery() as _tn,
):
# pass our scope back to parent for supervision\
# control.
task_status.started(_tn.cancel_scope)
with teardown_on_exc(
raise_from_handler=not child_errors_mid_stream,
):
for i in range(100):
log.info(
f'Child tx {i!r}\n'
)
if (
child_errors_mid_stream
and
i == 66
):
# oh wait but WOOPS there's a bug
# in that teardown code!?
raise RuntimeError(
'woopsie, a mid-streaming bug!?'
)
await tx.send(i)
async def main(
# TODO! toggle this for the 2 cases!
# 1. child errors mid-stream while parent is also requesting
# (graceful) cancel of that child streamer.
#
# 2. child contains a teardown handler which contains a
# bug and raises.
#
child_errors_mid_stream: bool,
):
tx, rx = trio.open_memory_channel(1)
async with (
trio.open_nursery() as tn,
rx as rx,
):
_child_cs = await tn.start(
partial(
finite_stream_to_rent,
child_errors_mid_stream=child_errors_mid_stream,
tx=tx,
)
)
async for msg in rx:
log.info(
f'Rent rx {msg!r}\n'
)
# simulate some external cancellation
# request **JUST BEFORE** the child errors.
if msg == 65:
log.cancel(
f'Cancelling parent on,\n'
f'msg={msg}\n'
f'\n'
f'Simulates OOB cancel request!\n'
)
tn.cancel_scope.cancel()
if __name__ == '__main__':
for case in [True, False]:
trio.run(main, case)

View File

@ -117,11 +117,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
async with (
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc(
tn=tn,
unmask_from=(
trio.Cancelled
if unmask_from_canc
else None
(trio.Cancelled,) if unmask_from_canc
else ()
),
)
):
@ -136,8 +134,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode,
) as bxerr:
if bxerr:
assert not bxerr.value
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,
@ -145,11 +142,12 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert not tn.cancel_scope.cancel_called
assert 0
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
if debug_mode:
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
with pytest.raises(ExceptionGroup) as excinfo:
trio.run(_main)

View File

@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
try:
yield # run RPC invoke body
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
except (
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis
)
)
# TODO? better then `debug_filter` below?
and
not isinstance(err, TransportClosed)
):
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
if _state.debug_mode():
log.exception(
f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
)
if not entered_debug:
# if we prolly should have entered the REPL but
@ -450,7 +469,7 @@ async def _invoke(
kwargs: dict[str, Any],
is_rpc: bool = True,
hide_tb: bool = True,
hide_tb: bool = False,
return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[
@ -654,8 +673,7 @@ async def _invoke(
# scope ensures unasking of the `await coro` below
# *should* never be interfered with!!
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=Cancelled,
unmask_from=(Cancelled,),
) as _mbme, # maybe boxed masked exc
):
ctx._scope_nursery = tn
@ -676,6 +694,22 @@ async def _invoke(
f'{pretty_struct.pformat(return_msg)}\n'
)
await chan.send(return_msg)
# ?TODO, remove the below since .send() already
# doesn't raise on tpt-closed?
# try:
# await chan.send(return_msg)
# except TransportClosed:
# log.exception(
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
# f'\n'
# f'{chan}\n'
# f'Channel already disconnected ??\n'
# f'\n'
# f'{pretty_struct.pformat(return_msg)}'
# )
# # ?TODO? will this ever be true though?
# if chan.connected():
# raise
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,

View File

@ -561,6 +561,9 @@ async def _pause(
return
elif isinstance(pause_err, trio.Cancelled):
__tracebackhide__: bool = False
# XXX, unmask to REPL it.
# mk_pdb().set_trace(frame=inspect.currentframe())
_repl_fail_report += (
'You called `tractor.pause()` from an already cancelled scope!\n\n'
'Consider `await tractor.pause(shield=True)` to make it work B)\n'

View File

@ -31,7 +31,6 @@ from typing import (
AsyncIterator,
Callable,
Hashable,
Optional,
Sequence,
TypeVar,
TYPE_CHECKING,
@ -204,7 +203,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_tn: Optional[trio.Nursery] = None
service_tn: trio.Nursery|None = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -213,7 +212,7 @@ class _Cache:
tuple[trio.Nursery, trio.Event]
] = {}
# nurseries: dict[int, trio.Nursery] = {}
no_more_users: Optional[trio.Event] = None
no_more_users: trio.Event|None = None
@classmethod
async def run_ctx(
@ -223,16 +222,18 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
cls.values[ctx_key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
# discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key)
try:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value
task_status.started(value)
await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally:
# discard nursery ref so it won't be re-used (an error)?
cls.resources.pop(ctx_key)
@acm

View File

@ -22,7 +22,10 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from typing import TYPE_CHECKING
from typing import (
Type,
TYPE_CHECKING,
)
import trio
from tractor.log import get_logger
@ -65,7 +68,6 @@ def find_masked_excs(
#
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,
unmask_from: (
BaseException|
tuple[BaseException]
@ -74,15 +76,26 @@ async def maybe_raise_from_masking_exc(
raise_unmasked: bool = True,
extra_note: str = (
'This can occurr when,\n'
' - a `trio.Nursery` scope embeds a `finally:`-block '
'which executes a checkpoint!'
'\n'
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block '
'which execs an un-shielded checkpoint!'
#
# ^TODO? other cases?
),
always_warn_on: tuple[BaseException] = (
always_warn_on: tuple[Type[BaseException]] = (
trio.Cancelled,
),
# don't ever unmask or warn on any masking pair,
# {<masked-excT-key> -> <masking-excT-value>}
never_warn_on: dict[
Type[BaseException],
Type[BaseException],
] = {
KeyboardInterrupt: trio.Cancelled,
trio.Cancelled: trio.Cancelled,
},
# ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc
# is always expected to be consumed.
@ -104,81 +117,91 @@ async def maybe_raise_from_masking_exc(
individual sub-excs but maintain the eg-parent's form right?
'''
if not isinstance(unmask_from, tuple):
raise ValueError(
f'Invalid unmask_from = {unmask_from!r}\n'
f'Must be a `tuple[Type[BaseException]]`.\n'
)
from tractor.devx.debug import (
BoxedMaybeException,
pause,
)
boxed_maybe_exc = BoxedMaybeException(
raise_on_exit=raise_unmasked,
)
matching: list[BaseException]|None = None
maybe_eg: ExceptionGroup|None
if tn:
try: # handle egs
yield boxed_maybe_exc
return
except* unmask_from as _maybe_eg:
maybe_eg = _maybe_eg
try:
yield boxed_maybe_exc
return
except BaseException as _bexc:
bexc = _bexc
if isinstance(bexc, BaseExceptionGroup):
matches: ExceptionGroup
matches, _ = maybe_eg.split(
unmask_from
)
if not matches:
raise
matches, _ = bexc.split(unmask_from)
if matches:
matching = matches.exceptions
matching: list[BaseException] = matches.exceptions
else:
try: # handle non-egs
yield boxed_maybe_exc
return
except unmask_from as _maybe_exc:
maybe_exc = _maybe_exc
matching: list[BaseException] = [
maybe_exc
]
# XXX, only unmask-ed for debuggin!
# TODO, remove eventually..
except BaseException as _berr:
berr = _berr
await pause(shield=True)
raise berr
elif (
unmask_from
and
type(bexc) in unmask_from
):
matching = [bexc]
if matching is None:
raise
masked: list[tuple[BaseException, BaseException]] = []
for exc_match in matching:
if exc_ctx := find_masked_excs(
maybe_masker=exc_match,
unmask_from={unmask_from},
unmask_from=set(unmask_from),
):
masked.append((exc_ctx, exc_match))
masked.append((
exc_ctx,
exc_match,
))
boxed_maybe_exc.value = exc_match
note: str = (
f'\n'
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
f'^^WARNING^^\n'
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
)
if extra_note:
note += (
f'\n'
f'{extra_note}\n'
)
exc_ctx.add_note(note)
if type(exc_match) in always_warn_on:
do_warn: bool = (
never_warn_on.get(
type(exc_ctx) # masking type
)
is not
type(exc_match) # masked type
)
if do_warn:
exc_ctx.add_note(note)
if (
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note)
# await tractor.pause(shield=True)
if raise_unmasked:
if (
do_warn
and
raise_unmasked
):
if len(masked) < 2:
raise exc_ctx from exc_match
else:
# ?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
await pause(shield=True)
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
else:
raise