Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet daf4b4ee85 Break loop after result retreival 2022-10-13 17:11:35 -04:00
Tyler Goodlet 3f09843951 Log context cancellation using `.cancel()` loglevel 2022-10-13 17:11:21 -04:00
Tyler Goodlet 3483151aa8 Use `MsgStream.subscribe()` in `Context.result()`
The case exists where there is multiple tasks consuming from an open
2-way stream created via `Context.open_stream()` where a sibling task is
pulling from the stream while some other task also calls `.result()`.
Previously the `.result()` call would consume (drain) stream messages
directly from the underlying mem chan which would mean any sibling task
would not receive those same messages. Instead, make `.result()` check
if a stream is open and instead consume (and discard) stream msgs using
a `BroadcastReceiver` (via `MsgStream.subscribe()`) such that all
interested tasks get copies of the same packets.
2022-10-13 17:07:49 -04:00
Tyler Goodlet 4a5f041211 Expect egs in tests which retreive portal results 2022-10-13 17:00:24 -04:00
Tyler Goodlet 7d0186aab9 Drop full tb flag again... 2022-10-13 15:45:17 -04:00
Tyler Goodlet f9b548e4e7 Fix errors table type annot 2022-10-13 15:42:33 -04:00
Tyler Goodlet afbe90bcfa TOSQUASH cancel on no peers 2022-10-13 15:42:01 -04:00
Tyler Goodlet 44538c44b1 Fix handler type annot 2022-10-13 15:41:38 -04:00
Tyler Goodlet 62fc462580 Never double add parent task's error to `ActorNursery` 2022-10-13 15:27:04 -04:00
Tyler Goodlet c5091afa38 Always restore the `trio` SIGINT handler
Pretty sure this is the final touch to alleviate all our debug lock
headaches! Instead of trying to revert to the "last" handler (as `pdb`
does internally in the stdlib) we always just revert to the handler
`trio` registers during startup. Further this seems to allow cancelling
the root-side locking task if it's detected as stale IFF we only do this
when the root actor is in a "no more IPC peers" state.

Deatz:
- always `._debug.Lock._trio_handler` as the `trio` version, not some
  last used handler to make sure we're getting the ctrl-c handling we
  want when not in debug mode.
- assign the trio handler in `open_root_actor()`
  `._runtime._async_main()` to be sure it's applied in subactors as well
  as the root.
- only do debug lock blocking and root-side-locking-task cancels when
  a "no peers" condition is detected in the root actor: i.e. no IPC
  channels are detected by the root meaning it's impossible any actor
  has a sane lock-state ongoing for debug mode.
2022-10-13 15:17:26 -04:00
8 changed files with 136 additions and 65 deletions

View File

@ -83,7 +83,7 @@ jobs:
run: pip list
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx --full-trace
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
# We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very

View File

@ -60,24 +60,42 @@ def test_remote_error(arb_addr, args_err):
arbiter_addr=arb_addr,
) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)
# get result(s) from main task
try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
print("Look Maa that actor failed hard, hehh")
raise
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
def test_multierror(arb_addr):
'''

View File

@ -8,6 +8,7 @@ import builtins
import itertools
import importlib
from exceptiongroup import BaseExceptionGroup
import pytest
import trio
import tractor
@ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
@ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
@tractor.context

View File

@ -25,6 +25,7 @@ import signal
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
Any,
Optional,
Callable,
AsyncIterator,
@ -75,7 +76,9 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable | None = None
_trio_handler: Callable[
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger
local_task_in_debug: str | None = None
@ -113,10 +116,10 @@ class Lock:
@classmethod
def unshield_sigint(cls):
# always restore (some) sigint handler, either
# the prior or at least ``trio``'s.
orig = cls._orig_sigint_handler or cls._trio_handler
signal.signal(signal.SIGINT, orig)
# always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping
# out the handler when the repl activates...
signal.signal(signal.SIGINT, cls._trio_handler)
cls._orig_sigint_handler = None
@classmethod

View File

@ -34,7 +34,11 @@ import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from ._runtime import Actor, Arbiter, async_main
from ._runtime import (
Actor,
Arbiter,
async_main,
)
from . import _debug
from . import _spawn
from . import _state

View File

@ -25,14 +25,15 @@ from itertools import chain
import importlib
import importlib.util
import inspect
import uuid
import signal
import sys
from typing import (
Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
import uuid
from types import ModuleType
import sys
import os
from contextlib import ExitStack
import warnings
@ -709,6 +710,14 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
@ -722,23 +731,16 @@ class Actor:
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if (
pdb_lock._root_local_task_cs_in_debug
and not pdb_lock._root_local_task_cs_in_debug.cancel_called
db_cs
and not db_cs.cancel_called
):
log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
)
# TODO: figure out why this breaks tests..
# pdb_lock._root_local_task_cs_in_debug.cancel()
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
db_cs.cancel()
# XXX: is this necessary (GC should do it)?
if chan.connected():
@ -1229,6 +1231,10 @@ async def async_main(
and when cancelled effectively cancels the actor.
'''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
try:

View File

@ -27,7 +27,8 @@ from typing import (
Optional,
Callable,
AsyncGenerator,
AsyncIterator
AsyncIterator,
TYPE_CHECKING,
)
import warnings
@ -41,6 +42,10 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__)
@ -269,9 +274,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
@asynccontextmanager
async def subscribe(
self,
) -> AsyncIterator[BroadcastReceiver]:
'''Allocate and return a ``BroadcastReceiver`` which delegates
'''
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
@ -365,7 +370,8 @@ class Context:
_remote_func_type: Optional[str] = None
# only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa
_portal: Optional[Portal] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_result: Optional[Any] = False
_error: Optional[BaseException] = None
@ -425,19 +431,24 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
log.error(
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled)
):
log.cancel(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled) and
self._cancel_called
):
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
self._error = error
@ -473,6 +484,7 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
ipc_broken: bool = False
if side == 'caller':
if not self._portal:
@ -490,7 +502,14 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
try:
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
@ -506,7 +525,10 @@ class Context:
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
elif ipc_broken:
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
else:
self._cancel_msg = msg
@ -593,10 +615,11 @@ class Context:
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as rchan:
) as stream:
self._stream = stream
if self._portal:
self._portal._streams.add(rchan)
self._portal._streams.add(stream)
try:
self._stream_opened = True
@ -604,7 +627,7 @@ class Context:
# ensure we aren't cancelled before delivering
# the stream
# await trio.lowlevel.checkpoint()
yield rchan
yield stream
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
@ -635,25 +658,22 @@ class Context:
if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
def consume(
msg: dict,
msg = await self._recv_chan.receive()
) -> Optional[dict]:
try:
self._result = msg['return']
break
return msg['return']
except KeyError as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
return
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
return
# internal error should never get here
assert msg.get('cid'), (
@ -663,6 +683,25 @@ class Context:
msg, self._portal.channel
) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result
async def started(

View File

@ -83,7 +83,7 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], Exception],
errors: dict[tuple[str, str], BaseException],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -347,8 +347,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
anursery._join_procs.set()
except BaseException as err:
errors[actor.uid] = err
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it