Compare commits

..

No commits in common. "daf4b4ee857d704d16bcf10df89028d09969fdee" and "f6ac0c2eb79157f7dab2b852ea4eb9fb73baf384" have entirely different histories.

8 changed files with 65 additions and 136 deletions

View File

@ -83,7 +83,7 @@ jobs:
run: pip list run: pip list
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx --full-trace
# We skip 3.10 on windows for now due to not having any collabs to # We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very # debug the CI failures. Anyone wanting to hack and solve them is very

View File

@ -60,42 +60,24 @@ def test_remote_error(arb_addr, args_err):
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
) as nursery: ) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
assert_err, name='errorer', **args assert_err, name='errorer', **args
) )
# get result(s) from main task # get result(s) from main task
try: try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result() await portal.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == errtype assert err.type == errtype
print("Look Maa that actor failed hard, hehh") print("Look Maa that actor failed hard, hehh")
raise raise
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == errtype assert excinfo.value.type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
def test_multierror(arb_addr): def test_multierror(arb_addr):
''' '''

View File

@ -8,7 +8,6 @@ import builtins
import itertools import itertools
import importlib import importlib
from exceptiongroup import BaseExceptionGroup
import pytest import pytest
import trio import trio
import tractor import tractor
@ -410,12 +409,11 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo: with pytest.raises(RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed errors # ensure boxed error is correct
for exc in excinfo.value.exceptions: assert excinfo.value.type == Exception
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr): def test_trio_closes_early_and_channel_exits(arb_addr):
@ -444,12 +442,11 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo: with pytest.raises(RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed errors # ensure boxed error is correct
for exc in excinfo.value.exceptions: assert excinfo.value.type == Exception
assert exc.type == Exception
@tractor.context @tractor.context

View File

@ -25,7 +25,6 @@ import signal
from functools import partial from functools import partial
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from typing import ( from typing import (
Any,
Optional, Optional,
Callable, Callable,
AsyncIterator, AsyncIterator,
@ -76,9 +75,7 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None # pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable[ _trio_handler: Callable | None = None
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger # actor-wide variable pointing to current task name using debugger
local_task_in_debug: str | None = None local_task_in_debug: str | None = None
@ -116,10 +113,10 @@ class Lock:
@classmethod @classmethod
def unshield_sigint(cls): def unshield_sigint(cls):
# always restore ``trio``'s sigint handler. see notes below in # always restore (some) sigint handler, either
# the pdb factory about the nightmare that is that code swapping # the prior or at least ``trio``'s.
# out the handler when the repl activates... orig = cls._orig_sigint_handler or cls._trio_handler
signal.signal(signal.SIGINT, cls._trio_handler) signal.signal(signal.SIGINT, orig)
cls._orig_sigint_handler = None cls._orig_sigint_handler = None
@classmethod @classmethod

View File

@ -34,11 +34,7 @@ import warnings
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio import trio
from ._runtime import ( from ._runtime import Actor, Arbiter, async_main
Actor,
Arbiter,
async_main,
)
from . import _debug from . import _debug
from . import _spawn from . import _spawn
from . import _state from . import _state

View File

@ -25,15 +25,14 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
import signal import uuid
import sys
from typing import ( from typing import (
Any, Optional, Any, Optional,
Union, TYPE_CHECKING, Union, TYPE_CHECKING,
Callable, Callable,
) )
import uuid
from types import ModuleType from types import ModuleType
import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
import warnings import warnings
@ -710,14 +709,6 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None) self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the # NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we # debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the # cancelled it and further there is no way to ensure the
@ -731,16 +722,23 @@ class Actor:
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if ( if (
db_cs pdb_lock._root_local_task_cs_in_debug
and not db_cs.cancel_called and not pdb_lock._root_local_task_cs_in_debug.cancel_called
): ):
log.warning( log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}' f'STALE DEBUG LOCK DETECTED FOR {uid}'
) )
# TODO: figure out why this breaks tests.. # TODO: figure out why this breaks tests..
db_cs.cancel() # pdb_lock._root_local_task_cs_in_debug.cancel()
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# XXX: is this necessary (GC should do it)? # XXX: is this necessary (GC should do it)?
if chan.connected(): if chan.connected():
@ -1231,10 +1229,6 @@ async def async_main(
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
''' '''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False registered_with_arbiter = False
try: try:

View File

@ -27,8 +27,7 @@ from typing import (
Optional, Optional,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
AsyncIterator, AsyncIterator
TYPE_CHECKING,
) )
import warnings import warnings
@ -42,10 +41,6 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__) log = get_logger(__name__)
@ -274,9 +269,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(
self, self,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
''' '''Allocate and return a ``BroadcastReceiver`` which delegates
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream. to this message stream.
This allows multiple local tasks to receive each their own copy This allows multiple local tasks to receive each their own copy
@ -370,8 +365,7 @@ class Context:
_remote_func_type: Optional[str] = None _remote_func_type: Optional[str] = None
# only set on the caller side # only set on the caller side
_portal: Optional[Portal] = None # type: ignore # noqa _portal: Optional['Portal'] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_result: Optional[Any] = False _result: Optional[Any] = False
_error: Optional[BaseException] = None _error: Optional[BaseException] = None
@ -431,24 +425,19 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``, # (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point # ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing. # of the call and result processing.
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled)
):
log.cancel(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error( log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n' f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}' f'{msg["error"]["tb_str"]}'
) )
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled) and
self._cancel_called
):
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
self._error = error self._error = error
@ -484,7 +473,6 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True self._cancel_called = True
ipc_broken: bool = False
if side == 'caller': if side == 'caller':
if not self._portal: if not self._portal:
@ -502,14 +490,7 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task # NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel # corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly. # instance is passed to `Actor._cancel_task()` implicitly.
try: await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed # XXX: there's no way to know if the remote task was indeed
@ -525,10 +506,7 @@ class Context:
"Timed out on cancelling remote task " "Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
elif ipc_broken: # callee side remote task
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
else: else:
self._cancel_msg = msg self._cancel_msg = msg
@ -615,11 +593,10 @@ class Context:
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=ctx._recv_chan, rx_chan=ctx._recv_chan,
) as stream: ) as rchan:
self._stream = stream
if self._portal: if self._portal:
self._portal._streams.add(stream) self._portal._streams.add(rchan)
try: try:
self._stream_opened = True self._stream_opened = True
@ -627,7 +604,7 @@ class Context:
# ensure we aren't cancelled before delivering # ensure we aren't cancelled before delivering
# the stream # the stream
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield stream yield rchan
# XXX: Make the stream "one-shot use". On exit, signal # XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
@ -658,22 +635,25 @@ class Context:
if not self._recv_chan._closed: # type: ignore if not self._recv_chan._closed: # type: ignore
def consume( # wait for a final context result consuming
msg: dict, # and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
) -> Optional[dict]: msg = await self._recv_chan.receive()
try: try:
return msg['return'] self._result = msg['return']
break
except KeyError as msgerr: except KeyError as msgerr:
if 'yield' in msg: if 'yield' in msg:
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}') log.warning(f'Discarding stream delivered {msg}')
return continue
elif 'stop' in msg: elif 'stop' in msg:
log.debug('Remote stream terminated') log.debug('Remote stream terminated')
return continue
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), (
@ -683,25 +663,6 @@ class Context:
msg, self._portal.channel msg, self._portal.channel
) from msgerr ) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result return self._result
async def started( async def started(

View File

@ -83,7 +83,7 @@ class ActorNursery:
actor: Actor, actor: Actor,
ria_nursery: trio.Nursery, ria_nursery: trio.Nursery,
da_nursery: trio.Nursery, da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException], errors: dict[tuple[str, str], Exception],
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
@ -347,6 +347,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
anursery._join_procs.set() anursery._join_procs.set()
except BaseException as err: except BaseException as err:
errors[actor.uid] = err
# If we error in the root but the debugger is # If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and # engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it # thus clobber access to) the local tty since it