Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet daf4b4ee85 Break loop after result retreival 2022-10-13 17:11:35 -04:00
Tyler Goodlet 3f09843951 Log context cancellation using `.cancel()` loglevel 2022-10-13 17:11:21 -04:00
Tyler Goodlet 3483151aa8 Use `MsgStream.subscribe()` in `Context.result()`
The case exists where there is multiple tasks consuming from an open
2-way stream created via `Context.open_stream()` where a sibling task is
pulling from the stream while some other task also calls `.result()`.
Previously the `.result()` call would consume (drain) stream messages
directly from the underlying mem chan which would mean any sibling task
would not receive those same messages. Instead, make `.result()` check
if a stream is open and instead consume (and discard) stream msgs using
a `BroadcastReceiver` (via `MsgStream.subscribe()`) such that all
interested tasks get copies of the same packets.
2022-10-13 17:07:49 -04:00
Tyler Goodlet 4a5f041211 Expect egs in tests which retreive portal results 2022-10-13 17:00:24 -04:00
Tyler Goodlet 7d0186aab9 Drop full tb flag again... 2022-10-13 15:45:17 -04:00
Tyler Goodlet f9b548e4e7 Fix errors table type annot 2022-10-13 15:42:33 -04:00
Tyler Goodlet afbe90bcfa TOSQUASH cancel on no peers 2022-10-13 15:42:01 -04:00
Tyler Goodlet 44538c44b1 Fix handler type annot 2022-10-13 15:41:38 -04:00
Tyler Goodlet 62fc462580 Never double add parent task's error to `ActorNursery` 2022-10-13 15:27:04 -04:00
Tyler Goodlet c5091afa38 Always restore the `trio` SIGINT handler
Pretty sure this is the final touch to alleviate all our debug lock
headaches! Instead of trying to revert to the "last" handler (as `pdb`
does internally in the stdlib) we always just revert to the handler
`trio` registers during startup. Further this seems to allow cancelling
the root-side locking task if it's detected as stale IFF we only do this
when the root actor is in a "no more IPC peers" state.

Deatz:
- always `._debug.Lock._trio_handler` as the `trio` version, not some
  last used handler to make sure we're getting the ctrl-c handling we
  want when not in debug mode.
- assign the trio handler in `open_root_actor()`
  `._runtime._async_main()` to be sure it's applied in subactors as well
  as the root.
- only do debug lock blocking and root-side-locking-task cancels when
  a "no peers" condition is detected in the root actor: i.e. no IPC
  channels are detected by the root meaning it's impossible any actor
  has a sane lock-state ongoing for debug mode.
2022-10-13 15:17:26 -04:00
8 changed files with 136 additions and 65 deletions

View File

@ -83,7 +83,7 @@ jobs:
run: pip list run: pip list
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx --full-trace run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
# We skip 3.10 on windows for now due to not having any collabs to # We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very # debug the CI failures. Anyone wanting to hack and solve them is very

View File

@ -60,24 +60,42 @@ def test_remote_error(arb_addr, args_err):
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
) as nursery: ) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
assert_err, name='errorer', **args assert_err, name='errorer', **args
) )
# get result(s) from main task # get result(s) from main task
try: try:
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result() await portal.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == errtype assert err.type == errtype
print("Look Maa that actor failed hard, hehh") print("Look Maa that actor failed hard, hehh")
raise raise
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == errtype assert excinfo.value.type == errtype
else:
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main)
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
def test_multierror(arb_addr): def test_multierror(arb_addr):
''' '''

View File

@ -8,6 +8,7 @@ import builtins
import itertools import itertools
import importlib import importlib
from exceptiongroup import BaseExceptionGroup
import pytest import pytest
import trio import trio
import tractor import tractor
@ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed error is correct # ensure boxed errors
assert excinfo.value.type == Exception for exc in excinfo.value.exceptions:
assert exc.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr): def test_trio_closes_early_and_channel_exits(arb_addr):
@ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error # should trigger remote actor error
await portal.result() await portal.result()
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(BaseExceptionGroup) as excinfo:
trio.run(main) trio.run(main)
# ensure boxed error is correct # ensure boxed errors
assert excinfo.value.type == Exception for exc in excinfo.value.exceptions:
assert exc.type == Exception
@tractor.context @tractor.context

View File

@ -25,6 +25,7 @@ import signal
from functools import partial from functools import partial
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from typing import ( from typing import (
Any,
Optional, Optional,
Callable, Callable,
AsyncIterator, AsyncIterator,
@ -75,7 +76,9 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None # pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable | None = None _trio_handler: Callable[
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger # actor-wide variable pointing to current task name using debugger
local_task_in_debug: str | None = None local_task_in_debug: str | None = None
@ -113,10 +116,10 @@ class Lock:
@classmethod @classmethod
def unshield_sigint(cls): def unshield_sigint(cls):
# always restore (some) sigint handler, either # always restore ``trio``'s sigint handler. see notes below in
# the prior or at least ``trio``'s. # the pdb factory about the nightmare that is that code swapping
orig = cls._orig_sigint_handler or cls._trio_handler # out the handler when the repl activates...
signal.signal(signal.SIGINT, orig) signal.signal(signal.SIGINT, cls._trio_handler)
cls._orig_sigint_handler = None cls._orig_sigint_handler = None
@classmethod @classmethod

View File

@ -34,7 +34,11 @@ import warnings
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio import trio
from ._runtime import Actor, Arbiter, async_main from ._runtime import (
Actor,
Arbiter,
async_main,
)
from . import _debug from . import _debug
from . import _spawn from . import _spawn
from . import _state from . import _state

View File

@ -25,14 +25,15 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
import uuid import signal
import sys
from typing import ( from typing import (
Any, Optional, Any, Optional,
Union, TYPE_CHECKING, Union, TYPE_CHECKING,
Callable, Callable,
) )
import uuid
from types import ModuleType from types import ModuleType
import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
import warnings import warnings
@ -709,6 +710,14 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None) self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# NOTE: block this actor from acquiring the # NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we # debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the # cancelled it and further there is no way to ensure the
@ -722,23 +731,16 @@ class Actor:
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if ( if (
pdb_lock._root_local_task_cs_in_debug db_cs
and not pdb_lock._root_local_task_cs_in_debug.cancel_called and not db_cs.cancel_called
): ):
log.warning( log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}' f'STALE DEBUG LOCK DETECTED FOR {uid}'
) )
# TODO: figure out why this breaks tests.. # TODO: figure out why this breaks tests..
# pdb_lock._root_local_task_cs_in_debug.cancel() db_cs.cancel()
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
self._no_more_peers.set()
# XXX: is this necessary (GC should do it)? # XXX: is this necessary (GC should do it)?
if chan.connected(): if chan.connected():
@ -1229,6 +1231,10 @@ async def async_main(
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
''' '''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False registered_with_arbiter = False
try: try:

View File

@ -27,7 +27,8 @@ from typing import (
Optional, Optional,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
AsyncIterator AsyncIterator,
TYPE_CHECKING,
) )
import warnings import warnings
@ -41,6 +42,10 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__) log = get_logger(__name__)
@ -269,9 +274,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(
self, self,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
'''Allocate and return a ``BroadcastReceiver`` which delegates '''
Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream. to this message stream.
This allows multiple local tasks to receive each their own copy This allows multiple local tasks to receive each their own copy
@ -365,7 +370,8 @@ class Context:
_remote_func_type: Optional[str] = None _remote_func_type: Optional[str] = None
# only set on the caller side # only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa _portal: Optional[Portal] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_result: Optional[Any] = False _result: Optional[Any] = False
_error: Optional[BaseException] = None _error: Optional[BaseException] = None
@ -425,19 +431,24 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``, # (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point # ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing. # of the call and result processing.
log.error( error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled)
):
log.cancel(
f'Remote context error for {self.chan.uid}:{self.cid}:\n' f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}' f'{msg["error"]["tb_str"]}'
) )
error = unpack_error(msg, self.chan) if self._cancel_called:
if (
isinstance(error, ContextCancelled) and
self._cancel_called
):
# this is an expected cancel request response message # this is an expected cancel request response message
# and we don't need to raise it in scope since it will # and we don't need to raise it in scope since it will
# potentially override a real error # potentially override a real error
return return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
self._error = error self._error = error
@ -473,6 +484,7 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True self._cancel_called = True
ipc_broken: bool = False
if side == 'caller': if side == 'caller':
if not self._portal: if not self._portal:
@ -490,7 +502,14 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task # NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel # corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly. # instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid) try:
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed # XXX: there's no way to know if the remote task was indeed
@ -506,7 +525,10 @@ class Context:
"Timed out on cancelling remote task " "Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
# callee side remote task elif ipc_broken:
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
else: else:
self._cancel_msg = msg self._cancel_msg = msg
@ -593,10 +615,11 @@ class Context:
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=ctx._recv_chan, rx_chan=ctx._recv_chan,
) as rchan: ) as stream:
self._stream = stream
if self._portal: if self._portal:
self._portal._streams.add(rchan) self._portal._streams.add(stream)
try: try:
self._stream_opened = True self._stream_opened = True
@ -604,7 +627,7 @@ class Context:
# ensure we aren't cancelled before delivering # ensure we aren't cancelled before delivering
# the stream # the stream
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield rchan yield stream
# XXX: Make the stream "one-shot use". On exit, signal # XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
@ -635,25 +658,22 @@ class Context:
if not self._recv_chan._closed: # type: ignore if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming def consume(
# and discarding any bi dir stream msgs still msg: dict,
# in transit from the far end.
while True:
msg = await self._recv_chan.receive() ) -> Optional[dict]:
try: try:
self._result = msg['return'] return msg['return']
break
except KeyError as msgerr: except KeyError as msgerr:
if 'yield' in msg: if 'yield' in msg:
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}') log.warning(f'Discarding stream delivered {msg}')
continue return
elif 'stop' in msg: elif 'stop' in msg:
log.debug('Remote stream terminated') log.debug('Remote stream terminated')
continue return
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), (
@ -663,6 +683,25 @@ class Context:
msg, self._portal.channel msg, self._portal.channel
) from msgerr ) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result return self._result
async def started( async def started(

View File

@ -83,7 +83,7 @@ class ActorNursery:
actor: Actor, actor: Actor,
ria_nursery: trio.Nursery, ria_nursery: trio.Nursery,
da_nursery: trio.Nursery, da_nursery: trio.Nursery,
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], BaseException],
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
@ -347,8 +347,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
anursery._join_procs.set() anursery._join_procs.set()
except BaseException as err: except BaseException as err:
errors[actor.uid] = err
# If we error in the root but the debugger is # If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and # engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it # thus clobber access to) the local tty since it