forked from goodboy/tractor
1
0
Fork 0

Compare commits

...

20 Commits

Author SHA1 Message Date
Tyler Goodlet daf4b4ee85 Break loop after result retreival 2022-10-13 17:11:35 -04:00
Tyler Goodlet 3f09843951 Log context cancellation using `.cancel()` loglevel 2022-10-13 17:11:21 -04:00
Tyler Goodlet 3483151aa8 Use `MsgStream.subscribe()` in `Context.result()`
The case exists where there is multiple tasks consuming from an open
2-way stream created via `Context.open_stream()` where a sibling task is
pulling from the stream while some other task also calls `.result()`.
Previously the `.result()` call would consume (drain) stream messages
directly from the underlying mem chan which would mean any sibling task
would not receive those same messages. Instead, make `.result()` check
if a stream is open and instead consume (and discard) stream msgs using
a `BroadcastReceiver` (via `MsgStream.subscribe()`) such that all
interested tasks get copies of the same packets.
2022-10-13 17:07:49 -04:00
Tyler Goodlet 4a5f041211 Expect egs in tests which retreive portal results 2022-10-13 17:00:24 -04:00
Tyler Goodlet 7d0186aab9 Drop full tb flag again... 2022-10-13 15:45:17 -04:00
Tyler Goodlet f9b548e4e7 Fix errors table type annot 2022-10-13 15:42:33 -04:00
Tyler Goodlet afbe90bcfa TOSQUASH cancel on no peers 2022-10-13 15:42:01 -04:00
Tyler Goodlet 44538c44b1 Fix handler type annot 2022-10-13 15:41:38 -04:00
Tyler Goodlet 62fc462580 Never double add parent task's error to `ActorNursery` 2022-10-13 15:27:04 -04:00
Tyler Goodlet c5091afa38 Always restore the `trio` SIGINT handler
Pretty sure this is the final touch to alleviate all our debug lock
headaches! Instead of trying to revert to the "last" handler (as `pdb`
does internally in the stdlib) we always just revert to the handler
`trio` registers during startup. Further this seems to allow cancelling
the root-side locking task if it's detected as stale IFF we only do this
when the root actor is in a "no more IPC peers" state.

Deatz:
- always `._debug.Lock._trio_handler` as the `trio` version, not some
  last used handler to make sure we're getting the ctrl-c handling we
  want when not in debug mode.
- assign the trio handler in `open_root_actor()`
  `._runtime._async_main()` to be sure it's applied in subactors as well
  as the root.
- only do debug lock blocking and root-side-locking-task cancels when
  a "no peers" condition is detected in the root actor: i.e. no IPC
  channels are detected by the root meaning it's impossible any actor
  has a sane lock-state ongoing for debug mode.
2022-10-13 15:17:26 -04:00
Tyler Goodlet f6ac0c2eb7 Always restore at least `trio`'s sigint handler
We can get it during runtime startup and stash on a new
`Lock._trio_handler`. Always at least revert to this handler to
guarantee graceful kbi handling despite mucking about with our own
handler in debug mode.
2022-10-13 13:12:17 -04:00
Tyler Goodlet 8727c1e4c2 TOSQUASH: dun need the var... 2022-10-12 17:53:39 -04:00
Tyler Goodlet 42cae56823 Adjust root-errors debug tests for blocking and egs 2022-10-12 17:43:55 -04:00
Tyler Goodlet 35550dd2a2 Hide some stack layers the user doesn't really need to see 2022-10-12 17:41:01 -04:00
Tyler Goodlet c437196d9b Pack errors from the parent task into the actor nursery 2022-10-12 17:40:08 -04:00
Tyler Goodlet 882c33ff06 Change cancel test over the exception group 2022-10-12 12:46:25 -04:00
Tyler Goodlet cd79fd79b9 First pass, swap `MultiError` for `BaseExceptionGroup` 2022-10-12 12:46:20 -04:00
Tyler Goodlet 53d5b59b7b Add `exceptiongroup` (3.11 backport lib) as dep 2022-10-12 12:46:20 -04:00
Tyler Goodlet e224b8a994 Pin to latest `trio` version 2022-10-12 12:46:20 -04:00
Tyler Goodlet 5db2ebf8d0 Add back `pytest` full trace flag to debug CI hangs 2022-10-12 12:46:20 -04:00
14 changed files with 321 additions and 177 deletions

View File

@ -44,9 +44,10 @@ setup(
# trio related # trio related
# proper range spec: # proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5 # https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
'trio >= 0.20, < 0.22', 'trio >= 0.22',
'async_generator', 'async_generator',
'trio_typing', 'trio_typing',
'exceptiongroup',
# tooling # tooling
'tricycle', 'tricycle',

View File

@ -8,6 +8,10 @@ import platform
import time import time
from itertools import repeat from itertools import repeat
from exceptiongroup import (
BaseExceptionGroup,
ExceptionGroup,
)
import pytest import pytest
import trio import trio
import tractor import tractor
@ -56,29 +60,49 @@ def test_remote_error(arb_addr, args_err):
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
) as nursery: ) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
assert_err, name='errorer', **args assert_err, name='errorer', **args
) )
# get result(s) from main task # get result(s) from main task
try: try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result() await portal.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == errtype assert err.type == errtype
print("Look Maa that actor failed hard, hehh") print("Look Maa that actor failed hard, hehh")
raise raise
with pytest.raises(tractor.RemoteActorError) as excinfo: # ensure boxed errors
trio.run(main) if args:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed error is correct assert excinfo.value.type == errtype
assert excinfo.value.type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
def test_multierror(arb_addr): def test_multierror(arb_addr):
"""Verify we raise a ``trio.MultiError`` out of a nursery where '''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors. more then one actor errors.
"""
'''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
@ -95,10 +119,10 @@ def test_multierror(arb_addr):
print("Look Maa that first actor failed hard, hehh") print("Look Maa that first actor failed hard, hehh")
raise raise
# here we should get a `trio.MultiError` containing exceptions # here we should get a ``BaseExceptionGroup`` containing exceptions
# from both subactors # from both subactors
with pytest.raises(trio.MultiError): with pytest.raises(BaseExceptionGroup):
trio.run(main) trio.run(main)
@ -107,7 +131,7 @@ def test_multierror(arb_addr):
'num_subactors', range(25, 26), 'num_subactors', range(25, 26),
) )
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``trio.MultiError`` out of a nursery where """Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning. to test failure during an ongoing spawning.
""" """
@ -123,10 +147,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
delay=delay delay=delay
) )
with pytest.raises(trio.MultiError) as exc_info: # with pytest.raises(trio.MultiError) as exc_info:
with pytest.raises(BaseExceptionGroup) as exc_info:
trio.run(main) trio.run(main)
assert exc_info.type == tractor.MultiError assert exc_info.type == ExceptionGroup
err = exc_info.value err = exc_info.value
exceptions = err.exceptions exceptions = err.exceptions
@ -214,8 +239,8 @@ async def test_cancel_infinite_streamer(start_method):
[ [
# daemon actors sit idle while single task actors error out # daemon actors sit idle while single task actors error out
(1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None), (1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None),
(2, tractor.MultiError, AssertionError, (assert_err, {}), None), (2, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
(3, tractor.MultiError, AssertionError, (assert_err, {}), None), (3, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
# 1 daemon actor errors out while single task actors sleep forever # 1 daemon actor errors out while single task actors sleep forever
(3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}), (3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}),
@ -226,7 +251,7 @@ async def test_cancel_infinite_streamer(start_method):
(do_nuthin, {}), (assert_err, {'delay': 1}, True)), (do_nuthin, {}), (assert_err, {'delay': 1}, True)),
# daemon complete quickly delay while single task # daemon complete quickly delay while single task
# actors error after brief delay # actors error after brief delay
(3, tractor.MultiError, AssertionError, (3, BaseExceptionGroup, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {}, False)), (assert_err, {'delay': 1}), (do_nuthin, {}, False)),
], ],
ids=[ ids=[
@ -293,7 +318,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError`` # should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err: except first_err as err:
if isinstance(err, tractor.MultiError): if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError): if isinstance(exc, tractor.RemoteActorError):
@ -337,7 +362,7 @@ async def spawn_and_error(breadth, depth) -> None:
@tractor_test @tractor_test
async def test_nested_multierrors(loglevel, start_method): async def test_nested_multierrors(loglevel, start_method):
''' '''
Test that failed actor sets are wrapped in `trio.MultiError`s. This Test that failed actor sets are wrapped in `BaseExceptionGroup`s. This
test goes only 2 nurseries deep but we should eventually have tests test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees. for arbitrary n-depth actor trees.
@ -365,7 +390,7 @@ async def test_nested_multierrors(loglevel, start_method):
breadth=subactor_breadth, breadth=subactor_breadth,
depth=depth, depth=depth,
) )
except trio.MultiError as err: except BaseExceptionGroup as err:
assert len(err.exceptions) == subactor_breadth assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions: for subexc in err.exceptions:
@ -383,10 +408,10 @@ async def test_nested_multierrors(loglevel, start_method):
assert subexc.type in ( assert subexc.type in (
tractor.RemoteActorError, tractor.RemoteActorError,
trio.Cancelled, trio.Cancelled,
trio.MultiError BaseExceptionGroup,
) )
elif isinstance(subexc, trio.MultiError): elif isinstance(subexc, BaseExceptionGroup):
for subsub in subexc.exceptions: for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,): if subsub in (tractor.RemoteActorError,):
@ -394,7 +419,7 @@ async def test_nested_multierrors(loglevel, start_method):
assert type(subsub) in ( assert type(subsub) in (
trio.Cancelled, trio.Cancelled,
trio.MultiError, BaseExceptionGroup,
) )
else: else:
assert isinstance(subexc, tractor.RemoteActorError) assert isinstance(subexc, tractor.RemoteActorError)
@ -406,13 +431,13 @@ async def test_nested_multierrors(loglevel, start_method):
if is_win(): if is_win():
if isinstance(subexc, tractor.RemoteActorError): if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in ( assert subexc.type in (
trio.MultiError, BaseExceptionGroup,
tractor.RemoteActorError tractor.RemoteActorError
) )
else: else:
assert isinstance(subexc, trio.MultiError) assert isinstance(subexc, BaseExceptionGroup)
else: else:
assert subexc.type is trio.MultiError assert subexc.type is ExceptionGroup
else: else:
assert subexc.type in ( assert subexc.type in (
tractor.RemoteActorError, tractor.RemoteActorError,

View File

@ -485,10 +485,12 @@ def test_multi_subactors(
# 2nd name_error failure # 2nd name_error failure
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
assert_before(child, [ # TODO: will we ever get the race where this crash will show up?
"Attaching to pdb in crashed actor: ('name_error_1'", # blocklist strat now prevents this crash
"NameError", # assert_before(child, [
]) # "Attaching to pdb in crashed actor: ('name_error_1'",
# "NameError",
# ])
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -683,49 +685,64 @@ def test_multi_subactors_root_errors(
# continue again to catch 2nd name error from # continue again to catch 2nd name error from
# actor 'name_error_1' (which is 2nd depth). # actor 'name_error_1' (which is 2nd depth).
child.sendline('c') child.sendline('c')
# due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree.
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode())
if "Debug lock blocked for ['name_error_1'" not in before:
assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
"NameError",
])
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error
before = str(child.before.decode())
if "Attaching to pdb in crashed actor: ('spawn_error'" in before:
assert_before(child, [
# boxed error from spawner's child
"RemoteActorError: ('name_error_1'",
"NameError",
])
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# expect a root actor crash
assert_before(child, [ assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
"NameError",
])
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [
"Attaching to pdb in crashed actor: ('spawn_error'",
# boxed error from previous step
"RemoteActorError: ('name_error_1'",
"NameError",
])
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [
"Attaching to pdb in crashed actor: ('root'",
# boxed error from previous step
"RemoteActorError: ('name_error'", "RemoteActorError: ('name_error'",
"NameError", "NameError",
# error from root actor and root task that created top level nursery
"Attaching to pdb in crashed actor: ('root'",
"AssertionError",
]) ])
# warnings assert we probably don't need
# assert "Cancelling nursery in ('spawn_error'," in before
if ctlc:
do_ctlc(child)
# continue again
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
before = str(child.before.decode()) assert_before(child, [
# error from root actor and root task that created top level nursery # "Attaching to pdb in crashed actor: ('root'",
assert "AssertionError" in before # boxed error from previous step
"RemoteActorError: ('name_error'",
"NameError",
"AssertionError",
'assert 0',
])
@has_nested_actors @has_nested_actors

View File

@ -8,6 +8,7 @@ import builtins
import itertools import itertools
import importlib import importlib
from exceptiongroup import BaseExceptionGroup
import pytest import pytest
import trio import trio
import tractor import tractor
@ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed error is correct # ensure boxed errors
assert excinfo.value.type == Exception for exc in excinfo.value.exceptions:
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr): def test_trio_closes_early_and_channel_exits(arb_addr):
@ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed error is correct # ensure boxed errors
assert excinfo.value.type == Exception for exc in excinfo.value.exceptions:
assert exc.type == Exception
@tractor.context @tractor.context

View File

@ -18,7 +18,7 @@
tractor: structured concurrent "actors". tractor: structured concurrent "actors".
""" """
from trio import MultiError from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster from ._clustering import open_actor_cluster
from ._ipc import Channel from ._ipc import Channel
@ -62,7 +62,7 @@ __all__ = [
'ContextCancelled', 'ContextCancelled',
'ModuleNotExposed', 'ModuleNotExposed',
'MsgStream', 'MsgStream',
'MultiError', 'BaseExceptionGroup',
'Portal', 'Portal',
'ReceiveMsgStream', 'ReceiveMsgStream',
'RemoteActorError', 'RemoteActorError',

View File

@ -25,6 +25,7 @@ import signal
from functools import partial from functools import partial
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from typing import ( from typing import (
Any,
Optional, Optional,
Callable, Callable,
AsyncIterator, AsyncIterator,
@ -75,8 +76,12 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None # pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable[
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger # actor-wide variable pointing to current task name using debugger
local_task_in_debug: Optional[str] = None local_task_in_debug: str | None = None
# NOTE: set by the current task waiting on the root tty lock from # NOTE: set by the current task waiting on the root tty lock from
# the CALLER side of the `lock_tty_for_child()` context entry-call # the CALLER side of the `lock_tty_for_child()` context entry-call
@ -105,19 +110,16 @@ class Lock:
@classmethod @classmethod
def shield_sigint(cls): def shield_sigint(cls):
cls._orig_sigint_handler = signal.signal( cls._orig_sigint_handler = signal.signal(
signal.SIGINT, signal.SIGINT,
shield_sigint, shield_sigint,
) )
@classmethod @classmethod
def unshield_sigint(cls): def unshield_sigint(cls):
if cls._orig_sigint_handler is not None: # always restore ``trio``'s sigint handler. see notes below in
# restore original sigint handler # the pdb factory about the nightmare that is that code swapping
signal.signal( # out the handler when the repl activates...
signal.SIGINT, signal.signal(signal.SIGINT, cls._trio_handler)
cls._orig_sigint_handler
)
cls._orig_sigint_handler = None cls._orig_sigint_handler = None
@classmethod @classmethod
@ -544,7 +546,7 @@ def shield_sigint(
) -> None: ) -> None:
''' '''
Specialized debugger compatible SIGINT handler. Specialized, debugger-aware SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root should always be managed by the parent supervising actor. The root
@ -601,6 +603,8 @@ def shield_sigint(
# which has already terminated to unlock. # which has already terminated to unlock.
and any_connected and any_connected
): ):
# we are root and some actor is in debug mode
# if uid_in_debug is not None:
name = uid_in_debug[0] name = uid_in_debug[0]
if name != 'root': if name != 'root':
log.pdb( log.pdb(
@ -611,6 +615,22 @@ def shield_sigint(
log.pdb( log.pdb(
"Ignoring SIGINT while in debug mode" "Ignoring SIGINT while in debug mode"
) )
elif (
is_root_process()
):
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
# revert back to ``trio`` handler asap!
Lock.unshield_sigint()
if (
Lock._root_local_task_cs_in_debug
and not Lock._root_local_task_cs_in_debug.cancel_called
):
Lock._root_local_task_cs_in_debug.cancel()
# raise KeyboardInterrupt
# child actor that has locked the debugger # child actor that has locked the debugger
elif not is_root_process(): elif not is_root_process():
@ -636,10 +656,9 @@ def shield_sigint(
# https://github.com/goodboy/tractor/issues/320 # https://github.com/goodboy/tractor/issues/320
# elif debug_mode(): # elif debug_mode():
else: else: # XXX: shouldn't ever get here?
log.pdb( print("WTFWTFWTF")
"Ignoring SIGINT since debug mode is enabled" raise KeyboardInterrupt
)
# NOTE: currently (at least on ``fancycompleter`` 0.9.2) # NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it lookks to be that the last command that was run (eg. ll) # it lookks to be that the last command that was run (eg. ll)

View File

@ -27,6 +27,7 @@ import importlib
import builtins import builtins
import traceback import traceback
import exceptiongroup as eg
import trio import trio
@ -52,9 +53,6 @@ class RemoteActorError(Exception):
self.type = suberror_type self.type = suberror_type
self.msgdata = msgdata self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager
# for catching underlying remote errors of a particular type
class InternalActorError(RemoteActorError): class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating """Remote internal ``tractor`` error indicating
@ -123,10 +121,12 @@ def unpack_error(
err_type=RemoteActorError err_type=RemoteActorError
) -> Exception: ) -> Exception:
"""Unpack an 'error' message from the wire '''
Unpack an 'error' message from the wire
into a local ``RemoteActorError``. into a local ``RemoteActorError``.
""" '''
__tracebackhide__ = True
error = msg['error'] error = msg['error']
tb_str = error.get('tb_str', '') tb_str = error.get('tb_str', '')
@ -139,7 +139,12 @@ def unpack_error(
suberror_type = trio.Cancelled suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type else: # try to lookup a suitable local error type
for ns in [builtins, _this_mod, trio]: for ns in [
builtins,
_this_mod,
eg,
trio,
]:
try: try:
suberror_type = getattr(ns, type_name) suberror_type = getattr(ns, type_name)
break break
@ -158,12 +163,15 @@ def unpack_error(
def is_multi_cancelled(exc: BaseException) -> bool: def is_multi_cancelled(exc: BaseException) -> bool:
"""Predicate to determine if a ``trio.MultiError`` contains only '''
``trio.Cancelled`` sub-exceptions (and is likely the result of Predicate to determine if a possible ``eg.BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks. cancelling a collection of subtasks.
""" '''
return not trio.MultiError.filter( if isinstance(exc, eg.BaseExceptionGroup):
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None, return exc.subgroup(
exc, lambda exc: isinstance(exc, trio.Cancelled)
) ) is not None
return False

View File

@ -52,17 +52,17 @@ log = get_logger(__name__)
def _unwrap_msg( def _unwrap_msg(
msg: dict[str, Any], msg: dict[str, Any],
channel: Channel channel: Channel
) -> Any: ) -> Any:
__tracebackhide__ = True
try: try:
return msg['return'] return msg['return']
except KeyError: except KeyError:
# internal error should never get here # internal error should never get here
assert msg.get('cid'), "Received internal error at portal?" assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel) raise unpack_error(msg, channel) from None
class MessagingError(Exception): class MessagingError(Exception):
@ -136,6 +136,7 @@ class Portal:
Return the result(s) from the remote actor's "main" task. Return the result(s) from the remote actor's "main" task.
''' '''
# __tracebackhide__ = True
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -460,7 +461,6 @@ class Portal:
# sure it's worth being pedantic: # sure it's worth being pedantic:
# Exception, # Exception,
# trio.Cancelled, # trio.Cancelled,
# trio.MultiError,
# KeyboardInterrupt, # KeyboardInterrupt,
) as err: ) as err:

View File

@ -23,15 +23,22 @@ from functools import partial
import importlib import importlib
import logging import logging
import os import os
import signal
from typing import ( from typing import (
Optional, Optional,
) )
import typing import typing
import warnings import warnings
from exceptiongroup import BaseExceptionGroup
import trio import trio
from ._runtime import Actor, Arbiter, async_main from ._runtime import (
Actor,
Arbiter,
async_main,
)
from . import _debug from . import _debug
from . import _spawn from . import _spawn
from . import _state from . import _state
@ -74,14 +81,19 @@ async def open_root_actor(
rpc_module_paths: Optional[list] = None, rpc_module_paths: Optional[list] = None,
) -> typing.Any: ) -> typing.Any:
"""Async entry point for ``tractor``. '''
Runtime init entry point for ``tractor``.
""" '''
# Override the global debugger hook to make it play nice with # Override the global debugger hook to make it play nice with
# ``trio``, see: # ``trio``, see:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor # mark top most level process as root actor
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True
@ -205,7 +217,10 @@ async def open_root_actor(
try: try:
yield actor yield actor
except (Exception, trio.MultiError) as err: except (
Exception,
BaseExceptionGroup,
) as err:
entered = await _debug._maybe_enter_pm(err) entered = await _debug._maybe_enter_pm(err)

View File

@ -25,21 +25,23 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
import uuid import signal
import sys
from typing import ( from typing import (
Any, Optional, Any, Optional,
Union, TYPE_CHECKING, Union, TYPE_CHECKING,
Callable, Callable,
) )
import uuid
from types import ModuleType from types import ModuleType
import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
import warnings import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore import trio # type: ignore
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import aclosing
from ._ipc import Channel from ._ipc import Channel
from ._streaming import Context from ._streaming import Context
@ -194,7 +196,7 @@ async def _invoke(
res = await coro res = await coro
await chan.send({'return': res, 'cid': cid}) await chan.send({'return': res, 'cid': cid})
except trio.MultiError: except BaseExceptionGroup:
# if a context error was set then likely # if a context error was set then likely
# thei multierror was raised due to that # thei multierror was raised due to that
if ctx._error is not None: if ctx._error is not None:
@ -266,7 +268,7 @@ async def _invoke(
except ( except (
Exception, Exception,
trio.MultiError BaseExceptionGroup,
) as err: ) as err:
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
@ -349,7 +351,7 @@ def _get_mod_abspath(module):
async def try_ship_error_to_parent( async def try_ship_error_to_parent(
channel: Channel, channel: Channel,
err: Union[Exception, trio.MultiError], err: Union[Exception, BaseExceptionGroup],
) -> None: ) -> None:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
@ -708,6 +710,14 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None) self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the # NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we # debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the # cancelled it and further there is no way to ensure the
@ -721,23 +731,16 @@ class Actor:
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if ( if (
pdb_lock._root_local_task_cs_in_debug db_cs
and not pdb_lock._root_local_task_cs_in_debug.cancel_called and not db_cs.cancel_called
): ):
log.warning( log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}' f'STALE DEBUG LOCK DETECTED FOR {uid}'
) )
# TODO: figure out why this breaks tests.. # TODO: figure out why this breaks tests..
# pdb_lock._root_local_task_cs_in_debug.cancel() db_cs.cancel()
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# XXX: is this necessary (GC should do it)? # XXX: is this necessary (GC should do it)?
if chan.connected(): if chan.connected():
@ -1228,6 +1231,10 @@ async def async_main(
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
''' '''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False registered_with_arbiter = False
try: try:
@ -1549,7 +1556,10 @@ async def process_messages(
partial(_invoke, actor, cid, chan, func, kwargs), partial(_invoke, actor, cid, chan, func, kwargs),
name=funcname, name=funcname,
) )
except (RuntimeError, trio.MultiError): except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition # avoid reporting a benign race condition
# during actor runtime teardown. # during actor runtime teardown.
nursery_cancelled_before_task = True nursery_cancelled_before_task = True
@ -1594,7 +1604,10 @@ async def process_messages(
# transport **was** disconnected # transport **was** disconnected
return True return True
except (Exception, trio.MultiError) as err: except (
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn = actor._service_n sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called assert sn and sn.cancel_scope.cancel_called

View File

@ -31,6 +31,7 @@ from typing import (
) )
from collections.abc import Awaitable from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -139,6 +140,7 @@ async def exhaust_portal(
If the main task is an async generator do our best to consume If the main task is an async generator do our best to consume
what's left of it. what's left of it.
''' '''
__tracebackhide__ = True
try: try:
log.debug(f"Waiting on final result from {actor.uid}") log.debug(f"Waiting on final result from {actor.uid}")
@ -146,8 +148,11 @@ async def exhaust_portal(
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
final = await portal.result() final = await portal.result()
except (Exception, trio.MultiError) as err: except (
# we reraise in the parent task via a ``trio.MultiError`` Exception,
BaseExceptionGroup,
) as err:
# we reraise in the parent task via a ``BaseExceptionGroup``
return err return err
except trio.Cancelled as err: except trio.Cancelled as err:
# lol, of course we need this too ;P # lol, of course we need this too ;P
@ -175,7 +180,7 @@ async def cancel_on_completion(
''' '''
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request # an exception group and we still send out a cancel request
result = await exhaust_portal(portal, actor) result = await exhaust_portal(portal, actor)
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid] = result errors[actor.uid] = result

View File

@ -22,7 +22,6 @@ from typing import (
Optional, Optional,
Any, Any,
) )
from collections.abc import Mapping
import trio import trio
@ -46,12 +45,6 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
return _current_actor return _current_actor
_conc_name_getters = {
'task': trio.lowlevel.current_task,
'actor': current_actor
}
def is_main_process() -> bool: def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process. """Bool determining if this actor is running in the top-most process.
""" """

View File

@ -27,7 +27,8 @@ from typing import (
Optional, Optional,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
AsyncIterator AsyncIterator,
TYPE_CHECKING,
) )
import warnings import warnings
@ -41,6 +42,10 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__) log = get_logger(__name__)
@ -269,9 +274,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(
self, self,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
'''Allocate and return a ``BroadcastReceiver`` which delegates '''
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream. to this message stream.
This allows multiple local tasks to receive each their own copy This allows multiple local tasks to receive each their own copy
@ -365,7 +370,8 @@ class Context:
_remote_func_type: Optional[str] = None _remote_func_type: Optional[str] = None
# only set on the caller side # only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa _portal: Optional[Portal] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_result: Optional[Any] = False _result: Optional[Any] = False
_error: Optional[BaseException] = None _error: Optional[BaseException] = None
@ -425,19 +431,24 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``, # (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point # ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing. # of the call and result processing.
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan) error = unpack_error(msg, self.chan)
if ( if (
isinstance(error, ContextCancelled) and isinstance(error, ContextCancelled)
self._cancel_called
): ):
# this is an expected cancel request response message log.cancel(
# and we don't need to raise it in scope since it will f'Remote context error for {self.chan.uid}:{self.cid}:\n'
# potentially override a real error f'{msg["error"]["tb_str"]}'
return )
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
self._error = error self._error = error
@ -473,6 +484,7 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True self._cancel_called = True
ipc_broken: bool = False
if side == 'caller': if side == 'caller':
if not self._portal: if not self._portal:
@ -490,7 +502,14 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task # NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel # corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly. # instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid) try:
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed # XXX: there's no way to know if the remote task was indeed
@ -506,7 +525,10 @@ class Context:
"Timed out on cancelling remote task " "Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
# callee side remote task elif ipc_broken:
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
else: else:
self._cancel_msg = msg self._cancel_msg = msg
@ -593,10 +615,11 @@ class Context:
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=ctx._recv_chan, rx_chan=ctx._recv_chan,
) as rchan: ) as stream:
self._stream = stream
if self._portal: if self._portal:
self._portal._streams.add(rchan) self._portal._streams.add(stream)
try: try:
self._stream_opened = True self._stream_opened = True
@ -604,7 +627,7 @@ class Context:
# ensure we aren't cancelled before delivering # ensure we aren't cancelled before delivering
# the stream # the stream
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield rchan yield stream
# XXX: Make the stream "one-shot use". On exit, signal # XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
@ -635,25 +658,22 @@ class Context:
if not self._recv_chan._closed: # type: ignore if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming def consume(
# and discarding any bi dir stream msgs still msg: dict,
# in transit from the far end.
while True:
msg = await self._recv_chan.receive() ) -> Optional[dict]:
try: try:
self._result = msg['return'] return msg['return']
break
except KeyError as msgerr: except KeyError as msgerr:
if 'yield' in msg: if 'yield' in msg:
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}') log.warning(f'Discarding stream delivered {msg}')
continue return
elif 'stop' in msg: elif 'stop' in msg:
log.debug('Remote stream terminated') log.debug('Remote stream terminated')
continue return
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), (
@ -663,6 +683,25 @@ class Context:
msg, self._portal.channel msg, self._portal.channel
) from msgerr ) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result return self._result
async def started( async def started(

View File

@ -18,6 +18,7 @@
``trio`` inspired apis and helpers ``trio`` inspired apis and helpers
""" """
from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect import inspect
from typing import ( from typing import (
@ -27,8 +28,8 @@ from typing import (
import typing import typing
import warnings import warnings
from exceptiongroup import BaseExceptionGroup
import trio import trio
from async_generator import asynccontextmanager
from ._debug import maybe_wait_for_debugger from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
@ -82,7 +83,7 @@ class ActorNursery:
actor: Actor, actor: Actor,
ria_nursery: trio.Nursery, ria_nursery: trio.Nursery,
da_nursery: trio.Nursery, da_nursery: trio.Nursery,
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], BaseException],
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
@ -294,13 +295,13 @@ class ActorNursery:
self._join_procs.set() self._join_procs.set()
@asynccontextmanager @acm
async def _open_and_supervise_one_cancels_all_nursery( async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor, actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], Exception] = {} errors: dict[tuple[str, str], BaseException] = {}
# This is the outermost level "deamon actor" nursery. It is awaited # This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for # **after** the below inner "run in actor nursery". This allows for
@ -346,7 +347,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
anursery._join_procs.set() anursery._join_procs.set()
except BaseException as err: except BaseException as err:
# If we error in the root but the debugger is # If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and # engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it # thus clobber access to) the local tty since it
@ -382,18 +382,21 @@ async def _open_and_supervise_one_cancels_all_nursery(
else: else:
log.exception( log.exception(
f"Nursery for {current_actor().uid} " f"Nursery for {current_actor().uid} "
f"errored with {err}, ") f"errored with")
# cancel all subactors # cancel all subactors
await anursery.cancel() await anursery.cancel()
except trio.MultiError as merr: except BaseExceptionGroup as merr:
# If we receive additional errors while waiting on # If we receive additional errors while waiting on
# remaining subactors that were cancelled, # remaining subactors that were cancelled,
# aggregate those errors with the original error # aggregate those errors with the original error
# that triggered this teardown. # that triggered this teardown.
if err not in merr.exceptions: if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err]) raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
list(merr.exceptions) + [err],
)
else: else:
raise raise
@ -402,12 +405,12 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: do we need a `trio.Cancelled` catch here as well? # XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery # this is the catch around the ``.run_in_actor()`` nursery
except ( except (
Exception, Exception,
trio.MultiError, BaseExceptionGroup,
trio.Cancelled trio.Cancelled
) as err: ) as err: # noqa
errors[actor.uid] = err
# XXX: yet another guard before allowing the cancel # XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug. # sequence in case a (single) child is in debug.
@ -436,9 +439,12 @@ async def _open_and_supervise_one_cancels_all_nursery(
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await anursery.cancel() await anursery.cancel()
# use `MultiError` as needed # use `BaseExceptionGroup` as needed
if len(errors) > 1: if len(errors) > 1:
raise trio.MultiError(tuple(errors.values())) raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
tuple(errors.values()),
)
else: else:
raise list(errors.values())[0] raise list(errors.values())[0]
@ -447,7 +453,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# after nursery exit # after nursery exit
@asynccontextmanager @acm
async def open_nursery( async def open_nursery(
**kwargs, **kwargs,