Compare commits

..

No commits in common. "daf4b4ee857d704d16bcf10df89028d09969fdee" and "f6ac0c2eb79157f7dab2b852ea4eb9fb73baf384" have entirely different histories.

8 changed files with 65 additions and 136 deletions

View File

@ -83,7 +83,7 @@ jobs:
run: pip list
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx --full-trace
# We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very

View File

@ -60,41 +60,23 @@ def test_remote_error(arb_addr, args_err):
arbiter_addr=arb_addr,
) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)
# get result(s) from main task
try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
print("Look Maa that actor failed hard, hehh")
raise
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
# ensure boxed error is correct
assert excinfo.value.type == errtype
def test_multierror(arb_addr):

View File

@ -8,7 +8,6 @@ import builtins
import itertools
import importlib
from exceptiongroup import BaseExceptionGroup
import pytest
import trio
import tractor
@ -410,12 +409,11 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
# ensure boxed error is correct
assert excinfo.value.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
@ -444,12 +442,11 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
# ensure boxed error is correct
assert excinfo.value.type == Exception
@tractor.context

View File

@ -25,7 +25,6 @@ import signal
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
Any,
Optional,
Callable,
AsyncIterator,
@ -76,9 +75,7 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable[
[int, Optional[FrameType]], Any
] | int | None = None
_trio_handler: Callable | None = None
# actor-wide variable pointing to current task name using debugger
local_task_in_debug: str | None = None
@ -110,16 +107,16 @@ class Lock:
@classmethod
def shield_sigint(cls):
cls._orig_sigint_handler = signal.signal(
signal.SIGINT,
shield_sigint,
)
signal.SIGINT,
shield_sigint,
)
@classmethod
def unshield_sigint(cls):
# always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping
# out the handler when the repl activates...
signal.signal(signal.SIGINT, cls._trio_handler)
# always restore (some) sigint handler, either
# the prior or at least ``trio``'s.
orig = cls._orig_sigint_handler or cls._trio_handler
signal.signal(signal.SIGINT, orig)
cls._orig_sigint_handler = None
@classmethod

View File

@ -34,11 +34,7 @@ import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from ._runtime import (
Actor,
Arbiter,
async_main,
)
from ._runtime import Actor, Arbiter, async_main
from . import _debug
from . import _spawn
from . import _state
@ -92,7 +88,7 @@ async def open_root_actor(
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True

View File

@ -25,15 +25,14 @@ from itertools import chain
import importlib
import importlib.util
import inspect
import signal
import sys
import uuid
from typing import (
Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
import uuid
from types import ModuleType
import sys
import os
from contextlib import ExitStack
import warnings
@ -710,14 +709,6 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
@ -731,16 +722,23 @@ class Actor:
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if (
db_cs
and not db_cs.cancel_called
pdb_lock._root_local_task_cs_in_debug
and not pdb_lock._root_local_task_cs_in_debug.cancel_called
):
log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
)
# TODO: figure out why this breaks tests..
db_cs.cancel()
# pdb_lock._root_local_task_cs_in_debug.cancel()
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# XXX: is this necessary (GC should do it)?
if chan.connected():
@ -1231,10 +1229,6 @@ async def async_main(
and when cancelled effectively cancels the actor.
'''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
try:

View File

@ -27,8 +27,7 @@ from typing import (
Optional,
Callable,
AsyncGenerator,
AsyncIterator,
TYPE_CHECKING,
AsyncIterator
)
import warnings
@ -42,10 +41,6 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__)
@ -274,9 +269,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
@asynccontextmanager
async def subscribe(
self,
) -> AsyncIterator[BroadcastReceiver]:
'''
Allocate and return a ``BroadcastReceiver`` which delegates
'''Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
@ -370,8 +365,7 @@ class Context:
_remote_func_type: Optional[str] = None
# only set on the caller side
_portal: Optional[Portal] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_portal: Optional['Portal'] = None # type: ignore # noqa
_result: Optional[Any] = False
_error: Optional[BaseException] = None
@ -431,24 +425,19 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled)
isinstance(error, ContextCancelled) and
self._cancel_called
):
log.cancel(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
self._error = error
@ -484,7 +473,6 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
ipc_broken: bool = False
if side == 'caller':
if not self._portal:
@ -502,14 +490,7 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
try:
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
@ -525,10 +506,7 @@ class Context:
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
elif ipc_broken:
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
else:
self._cancel_msg = msg
@ -615,11 +593,10 @@ class Context:
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
self._stream = stream
) as rchan:
if self._portal:
self._portal._streams.add(stream)
self._portal._streams.add(rchan)
try:
self._stream_opened = True
@ -627,7 +604,7 @@ class Context:
# ensure we aren't cancelled before delivering
# the stream
# await trio.lowlevel.checkpoint()
yield stream
yield rchan
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
@ -658,22 +635,25 @@ class Context:
if not self._recv_chan._closed: # type: ignore
def consume(
msg: dict,
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
) -> Optional[dict]:
msg = await self._recv_chan.receive()
try:
return msg['return']
self._result = msg['return']
break
except KeyError as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
return
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
return
continue
# internal error should never get here
assert msg.get('cid'), (
@ -683,25 +663,6 @@ class Context:
msg, self._portal.channel
) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result
async def started(

View File

@ -83,7 +83,7 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException],
errors: dict[tuple[str, str], Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -347,6 +347,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
anursery._join_procs.set()
except BaseException as err:
errors[actor.uid] = err
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it