Compare commits

..

No commits in common. "23240c31e39c7825c9d19631aed3159eb84a76d0" and "69bba3055772ad5358d9f0d77a75ef991bec9b0c" have entirely different histories.

10 changed files with 80 additions and 120 deletions

View File

@ -284,32 +284,20 @@ async def test_cancel_infinite_streamer(start_method):
],
)
@tractor_test
async def test_some_cancels_all(
num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
"""Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment.
'''
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
"""
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
try:
async with tractor.open_nursery() as an:
async with tractor.open_nursery() as n:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await an.start_actor(
dactor_portals.append(await n.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
@ -319,7 +307,7 @@ async def test_some_cancels_all(
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await an.run_in_actor(
await n.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
@ -349,8 +337,7 @@ async def test_some_cancels_all(
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as _err:
err = _err
except first_err as err:
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
@ -361,8 +348,8 @@ async def test_some_cancels_all(
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
assert an.cancelled is True
assert not an._children
assert n.cancelled is True
assert not n._children
else:
pytest.fail("Should have gotten a remote assertion error?")
@ -572,10 +559,8 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main():
with trio.fail_after(2):
async with (
tractor.open_nursery() as an
):
await an.run_in_actor(
async with tractor.open_nursery() as tn:
await tn.run_in_actor(
spawn,
name='spawn',
)

View File

@ -147,7 +147,8 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever()
async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999):
# with trio.fail_after(2):
with trio.fail_after(999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
@ -216,25 +217,32 @@ def test_trio_prestarted_task_bubbles(
):
aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
patt: str = 'trio-side'
expect_exc = TypeError
assert len(errs := rest_eg.exceptions) == 1
typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
patt: str = 'asyncio-side'
expect_exc = RuntimeError
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args
assert len(rtes := rte_eg.exceptions) == 1
assert 'asyncio-side' in rtes[0].args[0]

View File

@ -1011,6 +1011,7 @@ class Context:
else:
log.cancel(
f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}'
)
@ -1491,12 +1492,6 @@ class Context:
):
status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition
case (
Unresolved,
@ -2278,7 +2273,7 @@ async def open_context_from_portal(
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
)
if debug_mode():

View File

@ -661,7 +661,7 @@ async def _invoke(
tn: Nursery
rpc_ctx_cs: CancelScope
async with (
collapse_eg(hide_tb=False),
collapse_eg(),
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
@ -854,44 +854,24 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n'
)
if merr:
logmeth: Callable = log.error
if (
# ctxc: by `Context.cancel()`
isinstance(merr, ContextCancelled)
if isinstance(merr, ContextCancelled):
logmeth: Callable = log.runtime
# out-of-layer cancellation, one of:
# - actorc: by `Portal.cancel_actor()`
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
if not isinstance(merr, RemoteActorError):
tb_str: str = ''.join(traceback.format_exception(merr))
descr_str += (
f'\n{merr!r}\n' # needed?
f'{tb_str}\n'
)
else:
descr_str += (
f'{merr!r}\n'
)
else:
descr_str += (
f'\n'
f'with final result {ctx.outcome!r}\n'
f'scope_error:\n'
f'{scope_err!r}\n'
)
else:
descr_str += f'\n{merr!r}\n'
else:
descr_str += f'\nwith final result {ctx.outcome!r}\n'
logmeth(
f'{message}\n'

View File

@ -236,6 +236,10 @@ async def hard_kill(
# whilst also hacking on it XD
# terminate_after: int = 99999,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None:
'''
Un-gracefully terminate an OS level `trio.Process` after timeout.

View File

@ -237,9 +237,9 @@ def enable_stack_on_sig(
try:
import stackscope
except ImportError:
log.warning(
'The `stackscope` lib is not installed!\n'
'`Ignoring enable_stack_on_sig() call!\n'
log.error(
'`stackscope` not installed for use in debug mode!\n'
'`Ignoring {enable_stack_on_sig!r} call!\n'
)
return None

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*,
tb: TracebackType|None = None,
api_frame: FrameType|None = None,
hide_tb: bool = True,
hide_tb: bool = False,
# only enter debugger REPL when returns `True`
debug_filter: Callable[

View File

@ -58,7 +58,6 @@ from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
NoRuntime,
InternalError,
)
from tractor._state import (
current_actor,
@ -80,9 +79,6 @@ from ._sigint import (
sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header
)
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING:
from trio.lowlevel import Task
@ -1157,10 +1153,9 @@ def pause_from_sync(
'use_greenback',
False,
):
raise InternalError(
f'`greenback` was never initialized in this actor?\n'
f'\n'
f'{ppfmt(_state._runtime_vars)}\n'
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise

View File

@ -430,24 +430,20 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as _re:
trans_err = _re
) as bre:
trans_err = bre
tpt_name: str = f'{type(self).__name__!r}'
match trans_err:
# XXX, specifc to UDS transport and its,
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
):
tpt_closed = TransportClosed.from_src_exc(
raise TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
@ -455,15 +451,14 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
)
raise tpt_closed from trans_err
) from bre
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
f'{tpt_name} layer failed pre-send ??\n'
'{tpt_name} layer failed pre-send ??\n'
)
raise trans_err
@ -508,7 +503,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_peers: 1\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'

View File

@ -223,17 +223,15 @@ class _Cache:
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
try:
async with mng as value:
_, no_more_users = cls.resources[ctx_key]
try:
cls.values[ctx_key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(ctx_key)
finally:
# discard nursery ref so it won't be re-used (an error)?
value = cls.values.pop(ctx_key)
cls.resources.pop(ctx_key)