Compare commits

...

20 Commits

Author SHA1 Message Date
Tyler Goodlet daf4b4ee85 Break loop after result retreival 2022-10-13 17:11:35 -04:00
Tyler Goodlet 3f09843951 Log context cancellation using `.cancel()` loglevel 2022-10-13 17:11:21 -04:00
Tyler Goodlet 3483151aa8 Use `MsgStream.subscribe()` in `Context.result()`
The case exists where there is multiple tasks consuming from an open
2-way stream created via `Context.open_stream()` where a sibling task is
pulling from the stream while some other task also calls `.result()`.
Previously the `.result()` call would consume (drain) stream messages
directly from the underlying mem chan which would mean any sibling task
would not receive those same messages. Instead, make `.result()` check
if a stream is open and instead consume (and discard) stream msgs using
a `BroadcastReceiver` (via `MsgStream.subscribe()`) such that all
interested tasks get copies of the same packets.
2022-10-13 17:07:49 -04:00
Tyler Goodlet 4a5f041211 Expect egs in tests which retreive portal results 2022-10-13 17:00:24 -04:00
Tyler Goodlet 7d0186aab9 Drop full tb flag again... 2022-10-13 15:45:17 -04:00
Tyler Goodlet f9b548e4e7 Fix errors table type annot 2022-10-13 15:42:33 -04:00
Tyler Goodlet afbe90bcfa TOSQUASH cancel on no peers 2022-10-13 15:42:01 -04:00
Tyler Goodlet 44538c44b1 Fix handler type annot 2022-10-13 15:41:38 -04:00
Tyler Goodlet 62fc462580 Never double add parent task's error to `ActorNursery` 2022-10-13 15:27:04 -04:00
Tyler Goodlet c5091afa38 Always restore the `trio` SIGINT handler
Pretty sure this is the final touch to alleviate all our debug lock
headaches! Instead of trying to revert to the "last" handler (as `pdb`
does internally in the stdlib) we always just revert to the handler
`trio` registers during startup. Further this seems to allow cancelling
the root-side locking task if it's detected as stale IFF we only do this
when the root actor is in a "no more IPC peers" state.

Deatz:
- always `._debug.Lock._trio_handler` as the `trio` version, not some
  last used handler to make sure we're getting the ctrl-c handling we
  want when not in debug mode.
- assign the trio handler in `open_root_actor()`
  `._runtime._async_main()` to be sure it's applied in subactors as well
  as the root.
- only do debug lock blocking and root-side-locking-task cancels when
  a "no peers" condition is detected in the root actor: i.e. no IPC
  channels are detected by the root meaning it's impossible any actor
  has a sane lock-state ongoing for debug mode.
2022-10-13 15:17:26 -04:00
Tyler Goodlet f6ac0c2eb7 Always restore at least `trio`'s sigint handler
We can get it during runtime startup and stash on a new
`Lock._trio_handler`. Always at least revert to this handler to
guarantee graceful kbi handling despite mucking about with our own
handler in debug mode.
2022-10-13 13:12:17 -04:00
Tyler Goodlet 8727c1e4c2 TOSQUASH: dun need the var... 2022-10-12 17:53:39 -04:00
Tyler Goodlet 42cae56823 Adjust root-errors debug tests for blocking and egs 2022-10-12 17:43:55 -04:00
Tyler Goodlet 35550dd2a2 Hide some stack layers the user doesn't really need to see 2022-10-12 17:41:01 -04:00
Tyler Goodlet c437196d9b Pack errors from the parent task into the actor nursery 2022-10-12 17:40:08 -04:00
Tyler Goodlet 882c33ff06 Change cancel test over the exception group 2022-10-12 12:46:25 -04:00
Tyler Goodlet cd79fd79b9 First pass, swap `MultiError` for `BaseExceptionGroup` 2022-10-12 12:46:20 -04:00
Tyler Goodlet 53d5b59b7b Add `exceptiongroup` (3.11 backport lib) as dep 2022-10-12 12:46:20 -04:00
Tyler Goodlet e224b8a994 Pin to latest `trio` version 2022-10-12 12:46:20 -04:00
Tyler Goodlet 5db2ebf8d0 Add back `pytest` full trace flag to debug CI hangs 2022-10-12 12:46:20 -04:00
14 changed files with 321 additions and 177 deletions

View File

@ -44,9 +44,10 @@ setup(
# trio related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
'trio >= 0.20, < 0.22',
'trio >= 0.22',
'async_generator',
'trio_typing',
'exceptiongroup',
# tooling
'tricycle',

View File

@ -8,6 +8,10 @@ import platform
import time
from itertools import repeat
from exceptiongroup import (
BaseExceptionGroup,
ExceptionGroup,
)
import pytest
import trio
import tractor
@ -56,29 +60,49 @@ def test_remote_error(arb_addr, args_err):
arbiter_addr=arb_addr,
) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)
# get result(s) from main task
try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
print("Look Maa that actor failed hard, hehh")
raise
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
def test_multierror(arb_addr):
"""Verify we raise a ``trio.MultiError`` out of a nursery where
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.
"""
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
@ -95,10 +119,10 @@ def test_multierror(arb_addr):
print("Look Maa that first actor failed hard, hehh")
raise
# here we should get a `trio.MultiError` containing exceptions
# here we should get a ``BaseExceptionGroup`` containing exceptions
# from both subactors
with pytest.raises(trio.MultiError):
with pytest.raises(BaseExceptionGroup):
trio.run(main)
@ -107,7 +131,7 @@ def test_multierror(arb_addr):
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``trio.MultiError`` out of a nursery where
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
@ -123,10 +147,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
delay=delay
)
with pytest.raises(trio.MultiError) as exc_info:
# with pytest.raises(trio.MultiError) as exc_info:
with pytest.raises(BaseExceptionGroup) as exc_info:
trio.run(main)
assert exc_info.type == tractor.MultiError
assert exc_info.type == ExceptionGroup
err = exc_info.value
exceptions = err.exceptions
@ -214,8 +239,8 @@ async def test_cancel_infinite_streamer(start_method):
[
# daemon actors sit idle while single task actors error out
(1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None),
(2, tractor.MultiError, AssertionError, (assert_err, {}), None),
(3, tractor.MultiError, AssertionError, (assert_err, {}), None),
(2, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
(3, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
# 1 daemon actor errors out while single task actors sleep forever
(3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}),
@ -226,7 +251,7 @@ async def test_cancel_infinite_streamer(start_method):
(do_nuthin, {}), (assert_err, {'delay': 1}, True)),
# daemon complete quickly delay while single task
# actors error after brief delay
(3, tractor.MultiError, AssertionError,
(3, BaseExceptionGroup, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
],
ids=[
@ -293,7 +318,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err:
if isinstance(err, tractor.MultiError):
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError):
@ -337,7 +362,7 @@ async def spawn_and_error(breadth, depth) -> None:
@tractor_test
async def test_nested_multierrors(loglevel, start_method):
'''
Test that failed actor sets are wrapped in `trio.MultiError`s. This
Test that failed actor sets are wrapped in `BaseExceptionGroup`s. This
test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees.
@ -365,7 +390,7 @@ async def test_nested_multierrors(loglevel, start_method):
breadth=subactor_breadth,
depth=depth,
)
except trio.MultiError as err:
except BaseExceptionGroup as err:
assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions:
@ -383,10 +408,10 @@ async def test_nested_multierrors(loglevel, start_method):
assert subexc.type in (
tractor.RemoteActorError,
trio.Cancelled,
trio.MultiError
BaseExceptionGroup,
)
elif isinstance(subexc, trio.MultiError):
elif isinstance(subexc, BaseExceptionGroup):
for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,):
@ -394,7 +419,7 @@ async def test_nested_multierrors(loglevel, start_method):
assert type(subsub) in (
trio.Cancelled,
trio.MultiError,
BaseExceptionGroup,
)
else:
assert isinstance(subexc, tractor.RemoteActorError)
@ -406,13 +431,13 @@ async def test_nested_multierrors(loglevel, start_method):
if is_win():
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in (
trio.MultiError,
BaseExceptionGroup,
tractor.RemoteActorError
)
else:
assert isinstance(subexc, trio.MultiError)
assert isinstance(subexc, BaseExceptionGroup)
else:
assert subexc.type is trio.MultiError
assert subexc.type is ExceptionGroup
else:
assert subexc.type in (
tractor.RemoteActorError,

View File

@ -485,10 +485,12 @@ def test_multi_subactors(
# 2nd name_error failure
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
"NameError",
])
# TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash
# assert_before(child, [
# "Attaching to pdb in crashed actor: ('name_error_1'",
# "NameError",
# ])
if ctlc:
do_ctlc(child)
@ -683,7 +685,15 @@ def test_multi_subactors_root_errors(
# continue again to catch 2nd name error from
# actor 'name_error_1' (which is 2nd depth).
child.sendline('c')
# due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree.
child.expect(r"\(Pdb\+\+\)")
# only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode())
if "Debug lock blocked for ['name_error_1'" not in before:
assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
"NameError",
@ -694,9 +704,14 @@ def test_multi_subactors_root_errors(
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error
before = str(child.before.decode())
if "Attaching to pdb in crashed actor: ('spawn_error'" in before:
assert_before(child, [
"Attaching to pdb in crashed actor: ('spawn_error'",
# boxed error from previous step
# boxed error from spawner's child
"RemoteActorError: ('name_error_1'",
"NameError",
])
@ -706,26 +721,28 @@ def test_multi_subactors_root_errors(
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# expect a root actor crash
assert_before(child, [
"Attaching to pdb in crashed actor: ('root'",
# boxed error from previous step
"RemoteActorError: ('name_error'",
"NameError",
# error from root actor and root task that created top level nursery
"Attaching to pdb in crashed actor: ('root'",
"AssertionError",
])
# warnings assert we probably don't need
# assert "Cancelling nursery in ('spawn_error'," in before
if ctlc:
do_ctlc(child)
# continue again
child.sendline('c')
child.expect(pexpect.EOF)
before = str(child.before.decode())
# error from root actor and root task that created top level nursery
assert "AssertionError" in before
assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'",
# boxed error from previous step
"RemoteActorError: ('name_error'",
"NameError",
"AssertionError",
'assert 0',
])
@has_nested_actors

View File

@ -8,6 +8,7 @@ import builtins
import itertools
import importlib
from exceptiongroup import BaseExceptionGroup
import pytest
import trio
import tractor
@ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
@ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
@tractor.context

View File

@ -18,7 +18,7 @@
tractor: structured concurrent "actors".
"""
from trio import MultiError
from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster
from ._ipc import Channel
@ -62,7 +62,7 @@ __all__ = [
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'MultiError',
'BaseExceptionGroup',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',

View File

@ -25,6 +25,7 @@ import signal
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
Any,
Optional,
Callable,
AsyncIterator,
@ -75,8 +76,12 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable[
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger
local_task_in_debug: Optional[str] = None
local_task_in_debug: str | None = None
# NOTE: set by the current task waiting on the root tty lock from
# the CALLER side of the `lock_tty_for_child()` context entry-call
@ -111,13 +116,10 @@ class Lock:
@classmethod
def unshield_sigint(cls):
if cls._orig_sigint_handler is not None:
# restore original sigint handler
signal.signal(
signal.SIGINT,
cls._orig_sigint_handler
)
# always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping
# out the handler when the repl activates...
signal.signal(signal.SIGINT, cls._trio_handler)
cls._orig_sigint_handler = None
@classmethod
@ -544,7 +546,7 @@ def shield_sigint(
) -> None:
'''
Specialized debugger compatible SIGINT handler.
Specialized, debugger-aware SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
@ -601,6 +603,8 @@ def shield_sigint(
# which has already terminated to unlock.
and any_connected
):
# we are root and some actor is in debug mode
# if uid_in_debug is not None:
name = uid_in_debug[0]
if name != 'root':
log.pdb(
@ -611,6 +615,22 @@ def shield_sigint(
log.pdb(
"Ignoring SIGINT while in debug mode"
)
elif (
is_root_process()
):
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
# revert back to ``trio`` handler asap!
Lock.unshield_sigint()
if (
Lock._root_local_task_cs_in_debug
and not Lock._root_local_task_cs_in_debug.cancel_called
):
Lock._root_local_task_cs_in_debug.cancel()
# raise KeyboardInterrupt
# child actor that has locked the debugger
elif not is_root_process():
@ -636,10 +656,9 @@ def shield_sigint(
# https://github.com/goodboy/tractor/issues/320
# elif debug_mode():
else:
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
else: # XXX: shouldn't ever get here?
print("WTFWTFWTF")
raise KeyboardInterrupt
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it lookks to be that the last command that was run (eg. ll)

View File

@ -27,6 +27,7 @@ import importlib
import builtins
import traceback
import exceptiongroup as eg
import trio
@ -52,9 +53,6 @@ class RemoteActorError(Exception):
self.type = suberror_type
self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager
# for catching underlying remote errors of a particular type
class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating
@ -123,10 +121,12 @@ def unpack_error(
err_type=RemoteActorError
) -> Exception:
"""Unpack an 'error' message from the wire
'''
Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
"""
'''
__tracebackhide__ = True
error = msg['error']
tb_str = error.get('tb_str', '')
@ -139,7 +139,12 @@ def unpack_error(
suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type
for ns in [builtins, _this_mod, trio]:
for ns in [
builtins,
_this_mod,
eg,
trio,
]:
try:
suberror_type = getattr(ns, type_name)
break
@ -158,12 +163,15 @@ def unpack_error(
def is_multi_cancelled(exc: BaseException) -> bool:
"""Predicate to determine if a ``trio.MultiError`` contains only
``trio.Cancelled`` sub-exceptions (and is likely the result of
'''
Predicate to determine if a possible ``eg.BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks.
"""
return not trio.MultiError.filter(
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,
exc,
)
'''
if isinstance(exc, eg.BaseExceptionGroup):
return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled)
) is not None
return False

View File

@ -52,17 +52,17 @@ log = get_logger(__name__)
def _unwrap_msg(
msg: dict[str, Any],
channel: Channel
) -> Any:
__tracebackhide__ = True
try:
return msg['return']
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel)
raise unpack_error(msg, channel) from None
class MessagingError(Exception):
@ -136,6 +136,7 @@ class Portal:
Return the result(s) from the remote actor's "main" task.
'''
# __tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
@ -460,7 +461,6 @@ class Portal:
# sure it's worth being pedantic:
# Exception,
# trio.Cancelled,
# trio.MultiError,
# KeyboardInterrupt,
) as err:

View File

@ -23,15 +23,22 @@ from functools import partial
import importlib
import logging
import os
import signal
from typing import (
Optional,
)
import typing
import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from ._runtime import Actor, Arbiter, async_main
from ._runtime import (
Actor,
Arbiter,
async_main,
)
from . import _debug
from . import _spawn
from . import _state
@ -74,14 +81,19 @@ async def open_root_actor(
rpc_module_paths: Optional[list] = None,
) -> typing.Any:
"""Async entry point for ``tractor``.
'''
Runtime init entry point for ``tractor``.
"""
'''
# Override the global debugger hook to make it play nice with
# ``trio``, see:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
@ -205,7 +217,10 @@ async def open_root_actor(
try:
yield actor
except (Exception, trio.MultiError) as err:
except (
Exception,
BaseExceptionGroup,
) as err:
entered = await _debug._maybe_enter_pm(err)

View File

@ -25,21 +25,23 @@ from itertools import chain
import importlib
import importlib.util
import inspect
import uuid
import signal
import sys
from typing import (
Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
import uuid
from types import ModuleType
import sys
import os
from contextlib import ExitStack
import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore
from trio_typing import TaskStatus
from async_generator import aclosing
from ._ipc import Channel
from ._streaming import Context
@ -194,7 +196,7 @@ async def _invoke(
res = await coro
await chan.send({'return': res, 'cid': cid})
except trio.MultiError:
except BaseExceptionGroup:
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._error is not None:
@ -266,7 +268,7 @@ async def _invoke(
except (
Exception,
trio.MultiError
BaseExceptionGroup,
) as err:
if not is_multi_cancelled(err):
@ -349,7 +351,7 @@ def _get_mod_abspath(module):
async def try_ship_error_to_parent(
channel: Channel,
err: Union[Exception, trio.MultiError],
err: Union[Exception, BaseExceptionGroup],
) -> None:
with trio.CancelScope(shield=True):
@ -708,6 +710,14 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
@ -721,23 +731,16 @@ class Actor:
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if (
pdb_lock._root_local_task_cs_in_debug
and not pdb_lock._root_local_task_cs_in_debug.cancel_called
db_cs
and not db_cs.cancel_called
):
log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
)
# TODO: figure out why this breaks tests..
# pdb_lock._root_local_task_cs_in_debug.cancel()
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
db_cs.cancel()
# XXX: is this necessary (GC should do it)?
if chan.connected():
@ -1228,6 +1231,10 @@ async def async_main(
and when cancelled effectively cancels the actor.
'''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
try:
@ -1549,7 +1556,10 @@ async def process_messages(
partial(_invoke, actor, cid, chan, func, kwargs),
name=funcname,
)
except (RuntimeError, trio.MultiError):
except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
@ -1594,7 +1604,10 @@ async def process_messages(
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err:
except (
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called

View File

@ -31,6 +31,7 @@ from typing import (
)
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio
from trio_typing import TaskStatus
@ -139,6 +140,7 @@ async def exhaust_portal(
If the main task is an async generator do our best to consume
what's left of it.
'''
__tracebackhide__ = True
try:
log.debug(f"Waiting on final result from {actor.uid}")
@ -146,8 +148,11 @@ async def exhaust_portal(
# always be established and shutdown using a context manager api
final = await portal.result()
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
except (
Exception,
BaseExceptionGroup,
) as err:
# we reraise in the parent task via a ``BaseExceptionGroup``
return err
except trio.Cancelled as err:
# lol, of course we need this too ;P
@ -175,7 +180,7 @@ async def cancel_on_completion(
'''
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
# an exception group and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result

View File

@ -22,7 +22,6 @@ from typing import (
Optional,
Any,
)
from collections.abc import Mapping
import trio
@ -46,12 +45,6 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
return _current_actor
_conc_name_getters = {
'task': trio.lowlevel.current_task,
'actor': current_actor
}
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""

View File

@ -27,7 +27,8 @@ from typing import (
Optional,
Callable,
AsyncGenerator,
AsyncIterator
AsyncIterator,
TYPE_CHECKING,
)
import warnings
@ -41,6 +42,10 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__)
@ -269,9 +274,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
@asynccontextmanager
async def subscribe(
self,
) -> AsyncIterator[BroadcastReceiver]:
'''Allocate and return a ``BroadcastReceiver`` which delegates
'''
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
@ -365,7 +370,8 @@ class Context:
_remote_func_type: Optional[str] = None
# only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa
_portal: Optional[Portal] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_result: Optional[Any] = False
_error: Optional[BaseException] = None
@ -425,19 +431,24 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
log.error(
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled)
):
log.cancel(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled) and
self._cancel_called
):
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
self._error = error
@ -473,6 +484,7 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
ipc_broken: bool = False
if side == 'caller':
if not self._portal:
@ -490,7 +502,14 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
try:
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
@ -506,7 +525,10 @@ class Context:
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
elif ipc_broken:
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
else:
self._cancel_msg = msg
@ -593,10 +615,11 @@ class Context:
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as rchan:
) as stream:
self._stream = stream
if self._portal:
self._portal._streams.add(rchan)
self._portal._streams.add(stream)
try:
self._stream_opened = True
@ -604,7 +627,7 @@ class Context:
# ensure we aren't cancelled before delivering
# the stream
# await trio.lowlevel.checkpoint()
yield rchan
yield stream
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
@ -635,25 +658,22 @@ class Context:
if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
def consume(
msg: dict,
msg = await self._recv_chan.receive()
) -> Optional[dict]:
try:
self._result = msg['return']
break
return msg['return']
except KeyError as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
return
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
return
# internal error should never get here
assert msg.get('cid'), (
@ -663,6 +683,25 @@ class Context:
msg, self._portal.channel
) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result
async def started(

View File

@ -18,6 +18,7 @@
``trio`` inspired apis and helpers
"""
from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from typing import (
@ -27,8 +28,8 @@ from typing import (
import typing
import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from async_generator import asynccontextmanager
from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process
@ -82,7 +83,7 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], Exception],
errors: dict[tuple[str, str], BaseException],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -294,13 +295,13 @@ class ActorNursery:
self._join_procs.set()
@asynccontextmanager
@acm
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], Exception] = {}
errors: dict[tuple[str, str], BaseException] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
@ -346,7 +347,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
anursery._join_procs.set()
except BaseException as err:
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
@ -382,18 +382,21 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
f"errored with")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
except BaseExceptionGroup as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
list(merr.exceptions) + [err],
)
else:
raise
@ -402,12 +405,12 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery
except (
Exception,
trio.MultiError,
BaseExceptionGroup,
trio.Cancelled
) as err:
) as err: # noqa
errors[actor.uid] = err
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
@ -436,9 +439,12 @@ async def _open_and_supervise_one_cancels_all_nursery(
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed
# use `BaseExceptionGroup` as needed
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
tuple(errors.values()),
)
else:
raise list(errors.values())[0]
@ -447,7 +453,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# after nursery exit
@asynccontextmanager
@acm
async def open_nursery(
**kwargs,