Compare commits
No commits in common. "23240c31e39c7825c9d19631aed3159eb84a76d0" and "69bba3055772ad5358d9f0d77a75ef991bec9b0c" have entirely different histories.
23240c31e3
...
69bba30557
|
@ -284,32 +284,20 @@ async def test_cancel_infinite_streamer(start_method):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_some_cancels_all(
|
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
|
||||||
num_actors_and_errs: tuple,
|
"""Verify a subset of failed subactors causes all others in
|
||||||
start_method: str,
|
|
||||||
loglevel: str,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Verify a subset of failed subactors causes all others in
|
|
||||||
the nursery to be cancelled just like the strategy in trio.
|
the nursery to be cancelled just like the strategy in trio.
|
||||||
|
|
||||||
This is the first and only supervisory strategy at the moment.
|
This is the first and only supervisory strategy at the moment.
|
||||||
|
"""
|
||||||
'''
|
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
|
||||||
(
|
|
||||||
num_actors,
|
|
||||||
first_err,
|
|
||||||
err_type,
|
|
||||||
ria_func,
|
|
||||||
da_func,
|
|
||||||
) = num_actors_and_errs
|
|
||||||
try:
|
try:
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
# spawn the same number of deamon actors which should be cancelled
|
# spawn the same number of deamon actors which should be cancelled
|
||||||
dactor_portals = []
|
dactor_portals = []
|
||||||
for i in range(num_actors):
|
for i in range(num_actors):
|
||||||
dactor_portals.append(await an.start_actor(
|
dactor_portals.append(await n.start_actor(
|
||||||
f'deamon_{i}',
|
f'deamon_{i}',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
))
|
))
|
||||||
|
@ -319,7 +307,7 @@ async def test_some_cancels_all(
|
||||||
for i in range(num_actors):
|
for i in range(num_actors):
|
||||||
# start actor(s) that will fail immediately
|
# start actor(s) that will fail immediately
|
||||||
riactor_portals.append(
|
riactor_portals.append(
|
||||||
await an.run_in_actor(
|
await n.run_in_actor(
|
||||||
func,
|
func,
|
||||||
name=f'actor_{i}',
|
name=f'actor_{i}',
|
||||||
**kwargs
|
**kwargs
|
||||||
|
@ -349,8 +337,7 @@ async def test_some_cancels_all(
|
||||||
|
|
||||||
# should error here with a ``RemoteActorError`` or ``MultiError``
|
# should error here with a ``RemoteActorError`` or ``MultiError``
|
||||||
|
|
||||||
except first_err as _err:
|
except first_err as err:
|
||||||
err = _err
|
|
||||||
if isinstance(err, BaseExceptionGroup):
|
if isinstance(err, BaseExceptionGroup):
|
||||||
assert len(err.exceptions) == num_actors
|
assert len(err.exceptions) == num_actors
|
||||||
for exc in err.exceptions:
|
for exc in err.exceptions:
|
||||||
|
@ -361,8 +348,8 @@ async def test_some_cancels_all(
|
||||||
elif isinstance(err, tractor.RemoteActorError):
|
elif isinstance(err, tractor.RemoteActorError):
|
||||||
assert err.boxed_type == err_type
|
assert err.boxed_type == err_type
|
||||||
|
|
||||||
assert an.cancelled is True
|
assert n.cancelled is True
|
||||||
assert not an._children
|
assert not n._children
|
||||||
else:
|
else:
|
||||||
pytest.fail("Should have gotten a remote assertion error?")
|
pytest.fail("Should have gotten a remote assertion error?")
|
||||||
|
|
||||||
|
@ -572,10 +559,8 @@ def test_cancel_while_childs_child_in_sync_sleep(
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(2):
|
||||||
async with (
|
async with tractor.open_nursery() as tn:
|
||||||
tractor.open_nursery() as an
|
await tn.run_in_actor(
|
||||||
):
|
|
||||||
await an.run_in_actor(
|
|
||||||
spawn,
|
spawn,
|
||||||
name='spawn',
|
name='spawn',
|
||||||
)
|
)
|
||||||
|
|
|
@ -147,7 +147,8 @@ def test_trio_prestarted_task_bubbles(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
async def _trio_main():
|
async def _trio_main():
|
||||||
with trio.fail_after(2 if not debug_mode else 999):
|
# with trio.fail_after(2):
|
||||||
|
with trio.fail_after(999):
|
||||||
first: str
|
first: str
|
||||||
chan: to_asyncio.LinkedTaskChannel
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
aio_ev = asyncio.Event()
|
aio_ev = asyncio.Event()
|
||||||
|
@ -216,25 +217,32 @@ def test_trio_prestarted_task_bubbles(
|
||||||
):
|
):
|
||||||
aio_ev.set()
|
aio_ev.set()
|
||||||
|
|
||||||
|
with pytest.raises(
|
||||||
|
expected_exception=ExceptionGroup,
|
||||||
|
) as excinfo:
|
||||||
|
tractor.to_asyncio.run_as_asyncio_guest(
|
||||||
|
trio_main=_trio_main,
|
||||||
|
)
|
||||||
|
|
||||||
|
eg = excinfo.value
|
||||||
|
rte_eg, rest_eg = eg.split(RuntimeError)
|
||||||
|
|
||||||
# ensure the trio-task's error bubbled despite the aio-side
|
# ensure the trio-task's error bubbled despite the aio-side
|
||||||
# having (maybe) errored first.
|
# having (maybe) errored first.
|
||||||
if aio_err_trigger in (
|
if aio_err_trigger in (
|
||||||
'after_trio_task_starts',
|
'after_trio_task_starts',
|
||||||
'after_start_point',
|
'after_start_point',
|
||||||
):
|
):
|
||||||
patt: str = 'trio-side'
|
assert len(errs := rest_eg.exceptions) == 1
|
||||||
expect_exc = TypeError
|
typerr = errs[0]
|
||||||
|
assert (
|
||||||
|
type(typerr) is TypeError
|
||||||
|
and
|
||||||
|
'trio-side' in typerr.args
|
||||||
|
)
|
||||||
|
|
||||||
# when aio errors BEFORE (last) trio task is scheduled, we should
|
# when aio errors BEFORE (last) trio task is scheduled, we should
|
||||||
# never see anythinb but the aio-side.
|
# never see anythinb but the aio-side.
|
||||||
else:
|
else:
|
||||||
patt: str = 'asyncio-side'
|
assert len(rtes := rte_eg.exceptions) == 1
|
||||||
expect_exc = RuntimeError
|
assert 'asyncio-side' in rtes[0].args[0]
|
||||||
|
|
||||||
with pytest.raises(expect_exc) as excinfo:
|
|
||||||
tractor.to_asyncio.run_as_asyncio_guest(
|
|
||||||
trio_main=_trio_main,
|
|
||||||
)
|
|
||||||
|
|
||||||
caught_exc = excinfo.value
|
|
||||||
assert patt in caught_exc.args
|
|
||||||
|
|
|
@ -1011,6 +1011,7 @@ class Context:
|
||||||
else:
|
else:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Timed out on cancel request of remote task?\n'
|
f'Timed out on cancel request of remote task?\n'
|
||||||
|
f'\n'
|
||||||
f'{reminfo}'
|
f'{reminfo}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1491,12 +1492,6 @@ class Context:
|
||||||
):
|
):
|
||||||
status = 'peer-cancelled'
|
status = 'peer-cancelled'
|
||||||
|
|
||||||
case (
|
|
||||||
Unresolved,
|
|
||||||
trio.Cancelled(), # any error-type
|
|
||||||
) if self.canceller:
|
|
||||||
status = 'actor-cancelled'
|
|
||||||
|
|
||||||
# (remote) error condition
|
# (remote) error condition
|
||||||
case (
|
case (
|
||||||
Unresolved,
|
Unresolved,
|
||||||
|
@ -2278,7 +2273,7 @@ async def open_context_from_portal(
|
||||||
logmeth = log.exception
|
logmeth = log.exception
|
||||||
|
|
||||||
logmeth(
|
logmeth(
|
||||||
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
|
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
if debug_mode():
|
if debug_mode():
|
||||||
|
|
|
@ -661,7 +661,7 @@ async def _invoke(
|
||||||
tn: Nursery
|
tn: Nursery
|
||||||
rpc_ctx_cs: CancelScope
|
rpc_ctx_cs: CancelScope
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(hide_tb=False),
|
collapse_eg(),
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
msgops.maybe_limit_plds(
|
msgops.maybe_limit_plds(
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
|
@ -854,44 +854,24 @@ async def _invoke(
|
||||||
f'after having {ctx.repr_state!r}\n'
|
f'after having {ctx.repr_state!r}\n'
|
||||||
)
|
)
|
||||||
if merr:
|
if merr:
|
||||||
|
|
||||||
logmeth: Callable = log.error
|
logmeth: Callable = log.error
|
||||||
if (
|
if isinstance(merr, ContextCancelled):
|
||||||
# ctxc: by `Context.cancel()`
|
logmeth: Callable = log.runtime
|
||||||
isinstance(merr, ContextCancelled)
|
|
||||||
|
|
||||||
# out-of-layer cancellation, one of:
|
if not isinstance(merr, RemoteActorError):
|
||||||
# - actorc: by `Portal.cancel_actor()`
|
tb_str: str = ''.join(traceback.format_exception(merr))
|
||||||
# - OSc: by SIGINT or `Process.signal()`
|
|
||||||
or (
|
|
||||||
isinstance(merr, trio.Cancelled)
|
|
||||||
and
|
|
||||||
ctx.canceller
|
|
||||||
)
|
|
||||||
):
|
|
||||||
logmeth: Callable = log.cancel
|
|
||||||
descr_str += (
|
|
||||||
f' with {merr!r}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
elif (
|
|
||||||
not isinstance(merr, RemoteActorError)
|
|
||||||
):
|
|
||||||
tb_str: str = ''.join(
|
|
||||||
traceback.format_exception(merr)
|
|
||||||
)
|
|
||||||
descr_str += (
|
descr_str += (
|
||||||
f'\n{merr!r}\n' # needed?
|
f'\n{merr!r}\n' # needed?
|
||||||
f'{tb_str}\n'
|
f'{tb_str}\n'
|
||||||
|
f'\n'
|
||||||
|
f'scope_error:\n'
|
||||||
|
f'{scope_err!r}\n'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
descr_str += (
|
descr_str += f'\n{merr!r}\n'
|
||||||
f'{merr!r}\n'
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
descr_str += (
|
descr_str += f'\nwith final result {ctx.outcome!r}\n'
|
||||||
f'\n'
|
|
||||||
f'with final result {ctx.outcome!r}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
logmeth(
|
logmeth(
|
||||||
f'{message}\n'
|
f'{message}\n'
|
||||||
|
|
|
@ -236,6 +236,10 @@ async def hard_kill(
|
||||||
# whilst also hacking on it XD
|
# whilst also hacking on it XD
|
||||||
# terminate_after: int = 99999,
|
# terminate_after: int = 99999,
|
||||||
|
|
||||||
|
# NOTE: for mucking with `.pause()`-ing inside the runtime
|
||||||
|
# whilst also hacking on it XD
|
||||||
|
# terminate_after: int = 99999,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Un-gracefully terminate an OS level `trio.Process` after timeout.
|
Un-gracefully terminate an OS level `trio.Process` after timeout.
|
||||||
|
|
|
@ -237,9 +237,9 @@ def enable_stack_on_sig(
|
||||||
try:
|
try:
|
||||||
import stackscope
|
import stackscope
|
||||||
except ImportError:
|
except ImportError:
|
||||||
log.warning(
|
log.error(
|
||||||
'The `stackscope` lib is not installed!\n'
|
'`stackscope` not installed for use in debug mode!\n'
|
||||||
'`Ignoring enable_stack_on_sig() call!\n'
|
'`Ignoring {enable_stack_on_sig!r} call!\n'
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
|
@ -250,7 +250,7 @@ async def _maybe_enter_pm(
|
||||||
*,
|
*,
|
||||||
tb: TracebackType|None = None,
|
tb: TracebackType|None = None,
|
||||||
api_frame: FrameType|None = None,
|
api_frame: FrameType|None = None,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
# only enter debugger REPL when returns `True`
|
# only enter debugger REPL when returns `True`
|
||||||
debug_filter: Callable[
|
debug_filter: Callable[
|
||||||
|
|
|
@ -58,7 +58,6 @@ from tractor._context import Context
|
||||||
from tractor import _state
|
from tractor import _state
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
NoRuntime,
|
NoRuntime,
|
||||||
InternalError,
|
|
||||||
)
|
)
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
|
@ -80,9 +79,6 @@ from ._sigint import (
|
||||||
sigint_shield as sigint_shield,
|
sigint_shield as sigint_shield,
|
||||||
_ctlc_ignore_header as _ctlc_ignore_header
|
_ctlc_ignore_header as _ctlc_ignore_header
|
||||||
)
|
)
|
||||||
from ..pformat import (
|
|
||||||
ppfmt,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from trio.lowlevel import Task
|
from trio.lowlevel import Task
|
||||||
|
@ -1157,10 +1153,9 @@ def pause_from_sync(
|
||||||
'use_greenback',
|
'use_greenback',
|
||||||
False,
|
False,
|
||||||
):
|
):
|
||||||
raise InternalError(
|
raise RuntimeError(
|
||||||
f'`greenback` was never initialized in this actor?\n'
|
'`greenback` was never initialized in this actor!?\n\n'
|
||||||
f'\n'
|
f'{_state._runtime_vars}\n'
|
||||||
f'{ppfmt(_state._runtime_vars)}\n'
|
|
||||||
) from rte
|
) from rte
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -430,24 +430,20 @@ class MsgpackTransport(MsgTransport):
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
) as _re:
|
) as bre:
|
||||||
trans_err = _re
|
trans_err = bre
|
||||||
tpt_name: str = f'{type(self).__name__!r}'
|
tpt_name: str = f'{type(self).__name__!r}'
|
||||||
|
|
||||||
match trans_err:
|
match trans_err:
|
||||||
|
|
||||||
# XXX, specifc to UDS transport and its,
|
|
||||||
# well, "speediness".. XD
|
|
||||||
# |_ likely todo with races related to how fast
|
|
||||||
# the socket is setup/torn-down on linux
|
|
||||||
# as it pertains to rando pings from the
|
|
||||||
# `.discovery` subsys and protos.
|
|
||||||
case trio.BrokenResourceError() if (
|
case trio.BrokenResourceError() if (
|
||||||
'[Errno 32] Broken pipe'
|
'[Errno 32] Broken pipe' in trans_err.args[0]
|
||||||
in
|
# ^XXX, specifc to UDS transport and its,
|
||||||
trans_err.args[0]
|
# well, "speediness".. XD
|
||||||
|
# |_ likely todo with races related to how fast
|
||||||
|
# the socket is setup/torn-down on linux
|
||||||
|
# as it pertains to rando pings from the
|
||||||
|
# `.discovery` subsys and protos.
|
||||||
):
|
):
|
||||||
tpt_closed = TransportClosed.from_src_exc(
|
raise TransportClosed.from_src_exc(
|
||||||
message=(
|
message=(
|
||||||
f'{tpt_name} already closed by peer\n'
|
f'{tpt_name} already closed by peer\n'
|
||||||
),
|
),
|
||||||
|
@ -455,15 +451,14 @@ class MsgpackTransport(MsgTransport):
|
||||||
src_exc=trans_err,
|
src_exc=trans_err,
|
||||||
raise_on_report=True,
|
raise_on_report=True,
|
||||||
loglevel='transport',
|
loglevel='transport',
|
||||||
)
|
) from bre
|
||||||
raise tpt_closed from trans_err
|
|
||||||
|
|
||||||
# unless the disconnect condition falls under "a
|
# unless the disconnect condition falls under "a
|
||||||
# normal operation breakage" we usualy console warn
|
# normal operation breakage" we usualy console warn
|
||||||
# about it.
|
# about it.
|
||||||
case _:
|
case _:
|
||||||
log.exception(
|
log.exception(
|
||||||
f'{tpt_name} layer failed pre-send ??\n'
|
'{tpt_name} layer failed pre-send ??\n'
|
||||||
)
|
)
|
||||||
raise trans_err
|
raise trans_err
|
||||||
|
|
||||||
|
@ -508,7 +503,7 @@ class MsgpackTransport(MsgTransport):
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
return (
|
return (
|
||||||
f'<{type(self).__name__}(\n'
|
f'<{type(self).__name__}(\n'
|
||||||
f' |_peers: 1\n'
|
f' |_peers: 2\n'
|
||||||
f' laddr: {self._laddr}\n'
|
f' laddr: {self._laddr}\n'
|
||||||
f' raddr: {self._raddr}\n'
|
f' raddr: {self._raddr}\n'
|
||||||
# f'\n'
|
# f'\n'
|
||||||
|
|
|
@ -223,18 +223,16 @@ class _Cache:
|
||||||
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
try:
|
async with mng as value:
|
||||||
async with mng as value:
|
_, no_more_users = cls.resources[ctx_key]
|
||||||
_, no_more_users = cls.resources[ctx_key]
|
cls.values[ctx_key] = value
|
||||||
try:
|
task_status.started(value)
|
||||||
cls.values[ctx_key] = value
|
try:
|
||||||
task_status.started(value)
|
await no_more_users.wait()
|
||||||
await no_more_users.wait()
|
finally:
|
||||||
finally:
|
# discard nursery ref so it won't be re-used (an error)?
|
||||||
value = cls.values.pop(ctx_key)
|
value = cls.values.pop(ctx_key)
|
||||||
finally:
|
cls.resources.pop(ctx_key)
|
||||||
# discard nursery ref so it won't be re-used (an error)?
|
|
||||||
cls.resources.pop(ctx_key)
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
|
Loading…
Reference in New Issue