Compare commits
10 Commits
f6ac0c2eb7
...
daf4b4ee85
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | daf4b4ee85 | |
Tyler Goodlet | 3f09843951 | |
Tyler Goodlet | 3483151aa8 | |
Tyler Goodlet | 4a5f041211 | |
Tyler Goodlet | 7d0186aab9 | |
Tyler Goodlet | f9b548e4e7 | |
Tyler Goodlet | afbe90bcfa | |
Tyler Goodlet | 44538c44b1 | |
Tyler Goodlet | 62fc462580 | |
Tyler Goodlet | c5091afa38 |
|
@ -83,7 +83,7 @@ jobs:
|
||||||
run: pip list
|
run: pip list
|
||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx --full-trace
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
|
||||||
|
|
||||||
# We skip 3.10 on windows for now due to not having any collabs to
|
# We skip 3.10 on windows for now due to not having any collabs to
|
||||||
# debug the CI failures. Anyone wanting to hack and solve them is very
|
# debug the CI failures. Anyone wanting to hack and solve them is very
|
||||||
|
|
|
@ -60,24 +60,42 @@ def test_remote_error(arb_addr, args_err):
|
||||||
arbiter_addr=arb_addr,
|
arbiter_addr=arb_addr,
|
||||||
) as nursery:
|
) as nursery:
|
||||||
|
|
||||||
|
# on a remote type error caused by bad input args
|
||||||
|
# this should raise directly which means we **don't** get
|
||||||
|
# an exception group outside the nursery since the error
|
||||||
|
# here and the far end task error are one in the same?
|
||||||
portal = await nursery.run_in_actor(
|
portal = await nursery.run_in_actor(
|
||||||
assert_err, name='errorer', **args
|
assert_err, name='errorer', **args
|
||||||
)
|
)
|
||||||
|
|
||||||
# get result(s) from main task
|
# get result(s) from main task
|
||||||
try:
|
try:
|
||||||
|
# this means the root actor will also raise a local
|
||||||
|
# parent task error and thus an eg will propagate out
|
||||||
|
# of this actor nursery.
|
||||||
await portal.result()
|
await portal.result()
|
||||||
except tractor.RemoteActorError as err:
|
except tractor.RemoteActorError as err:
|
||||||
assert err.type == errtype
|
assert err.type == errtype
|
||||||
print("Look Maa that actor failed hard, hehh")
|
print("Look Maa that actor failed hard, hehh")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# ensure boxed errors
|
||||||
|
if args:
|
||||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed error is correct
|
|
||||||
assert excinfo.value.type == errtype
|
assert excinfo.value.type == errtype
|
||||||
|
|
||||||
|
else:
|
||||||
|
# the root task will also error on the `.result()` call
|
||||||
|
# so we expect an error from there AND the child.
|
||||||
|
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
# ensure boxed errors
|
||||||
|
for exc in excinfo.value.exceptions:
|
||||||
|
assert exc.type == errtype
|
||||||
|
|
||||||
|
|
||||||
def test_multierror(arb_addr):
|
def test_multierror(arb_addr):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -8,6 +8,7 @@ import builtins
|
||||||
import itertools
|
import itertools
|
||||||
import importlib
|
import importlib
|
||||||
|
|
||||||
|
from exceptiongroup import BaseExceptionGroup
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -409,11 +410,12 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed error is correct
|
# ensure boxed errors
|
||||||
assert excinfo.value.type == Exception
|
for exc in excinfo.value.exceptions:
|
||||||
|
assert exc.type == Exception
|
||||||
|
|
||||||
|
|
||||||
def test_trio_closes_early_and_channel_exits(arb_addr):
|
def test_trio_closes_early_and_channel_exits(arb_addr):
|
||||||
|
@ -442,11 +444,12 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
|
||||||
# should trigger remote actor error
|
# should trigger remote actor error
|
||||||
await portal.result()
|
await portal.result()
|
||||||
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(BaseExceptionGroup) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# ensure boxed error is correct
|
# ensure boxed errors
|
||||||
assert excinfo.value.type == Exception
|
for exc in excinfo.value.exceptions:
|
||||||
|
assert exc.type == Exception
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -25,6 +25,7 @@ import signal
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from typing import (
|
from typing import (
|
||||||
|
Any,
|
||||||
Optional,
|
Optional,
|
||||||
Callable,
|
Callable,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
|
@ -75,7 +76,9 @@ class Lock:
|
||||||
# placeholder for function to set a ``trio.Event`` on debugger exit
|
# placeholder for function to set a ``trio.Event`` on debugger exit
|
||||||
# pdb_release_hook: Optional[Callable] = None
|
# pdb_release_hook: Optional[Callable] = None
|
||||||
|
|
||||||
_trio_handler: Callable | None = None
|
_trio_handler: Callable[
|
||||||
|
[int, Optional[FrameType]], Any
|
||||||
|
] | int | None = None
|
||||||
|
|
||||||
# actor-wide variable pointing to current task name using debugger
|
# actor-wide variable pointing to current task name using debugger
|
||||||
local_task_in_debug: str | None = None
|
local_task_in_debug: str | None = None
|
||||||
|
@ -113,10 +116,10 @@ class Lock:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def unshield_sigint(cls):
|
def unshield_sigint(cls):
|
||||||
# always restore (some) sigint handler, either
|
# always restore ``trio``'s sigint handler. see notes below in
|
||||||
# the prior or at least ``trio``'s.
|
# the pdb factory about the nightmare that is that code swapping
|
||||||
orig = cls._orig_sigint_handler or cls._trio_handler
|
# out the handler when the repl activates...
|
||||||
signal.signal(signal.SIGINT, orig)
|
signal.signal(signal.SIGINT, cls._trio_handler)
|
||||||
cls._orig_sigint_handler = None
|
cls._orig_sigint_handler = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
|
@ -34,7 +34,11 @@ import warnings
|
||||||
from exceptiongroup import BaseExceptionGroup
|
from exceptiongroup import BaseExceptionGroup
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._runtime import Actor, Arbiter, async_main
|
from ._runtime import (
|
||||||
|
Actor,
|
||||||
|
Arbiter,
|
||||||
|
async_main,
|
||||||
|
)
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from . import _spawn
|
from . import _spawn
|
||||||
from . import _state
|
from . import _state
|
||||||
|
|
|
@ -25,14 +25,15 @@ from itertools import chain
|
||||||
import importlib
|
import importlib
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import inspect
|
import inspect
|
||||||
import uuid
|
import signal
|
||||||
|
import sys
|
||||||
from typing import (
|
from typing import (
|
||||||
Any, Optional,
|
Any, Optional,
|
||||||
Union, TYPE_CHECKING,
|
Union, TYPE_CHECKING,
|
||||||
Callable,
|
Callable,
|
||||||
)
|
)
|
||||||
|
import uuid
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
import warnings
|
import warnings
|
||||||
|
@ -709,6 +710,14 @@ class Actor:
|
||||||
log.runtime(f"No more channels for {chan.uid}")
|
log.runtime(f"No more channels for {chan.uid}")
|
||||||
self._peers.pop(uid, None)
|
self._peers.pop(uid, None)
|
||||||
|
|
||||||
|
log.runtime(f"Peers is {self._peers}")
|
||||||
|
|
||||||
|
# No more channels to other actors (at all) registered
|
||||||
|
# as connected.
|
||||||
|
if not self._peers:
|
||||||
|
log.runtime("Signalling no more peer channel connections")
|
||||||
|
self._no_more_peers.set()
|
||||||
|
|
||||||
# NOTE: block this actor from acquiring the
|
# NOTE: block this actor from acquiring the
|
||||||
# debugger-TTY-lock since we have no way to know if we
|
# debugger-TTY-lock since we have no way to know if we
|
||||||
# cancelled it and further there is no way to ensure the
|
# cancelled it and further there is no way to ensure the
|
||||||
|
@ -722,23 +731,16 @@ class Actor:
|
||||||
# if a now stale local task has the TTY lock still
|
# if a now stale local task has the TTY lock still
|
||||||
# we cancel it to allow servicing other requests for
|
# we cancel it to allow servicing other requests for
|
||||||
# the lock.
|
# the lock.
|
||||||
|
db_cs = pdb_lock._root_local_task_cs_in_debug
|
||||||
if (
|
if (
|
||||||
pdb_lock._root_local_task_cs_in_debug
|
db_cs
|
||||||
and not pdb_lock._root_local_task_cs_in_debug.cancel_called
|
and not db_cs.cancel_called
|
||||||
):
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||||
)
|
)
|
||||||
# TODO: figure out why this breaks tests..
|
# TODO: figure out why this breaks tests..
|
||||||
# pdb_lock._root_local_task_cs_in_debug.cancel()
|
db_cs.cancel()
|
||||||
|
|
||||||
log.runtime(f"Peers is {self._peers}")
|
|
||||||
|
|
||||||
# No more channels to other actors (at all) registered
|
|
||||||
# as connected.
|
|
||||||
if not self._peers:
|
|
||||||
log.runtime("Signalling no more peer channel connections")
|
|
||||||
self._no_more_peers.set()
|
|
||||||
|
|
||||||
# XXX: is this necessary (GC should do it)?
|
# XXX: is this necessary (GC should do it)?
|
||||||
if chan.connected():
|
if chan.connected():
|
||||||
|
@ -1229,6 +1231,10 @@ async def async_main(
|
||||||
and when cancelled effectively cancels the actor.
|
and when cancelled effectively cancels the actor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
|
# on our debugger lock state.
|
||||||
|
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
|
||||||
registered_with_arbiter = False
|
registered_with_arbiter = False
|
||||||
try:
|
try:
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,8 @@ from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
Callable,
|
Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
AsyncIterator
|
AsyncIterator,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import warnings
|
import warnings
|
||||||
|
@ -41,6 +42,10 @@ from .log import get_logger
|
||||||
from .trionics import broadcast_receiver, BroadcastReceiver
|
from .trionics import broadcast_receiver, BroadcastReceiver
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ._portal import Portal
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -269,9 +274,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def subscribe(
|
async def subscribe(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
) -> AsyncIterator[BroadcastReceiver]:
|
) -> AsyncIterator[BroadcastReceiver]:
|
||||||
'''Allocate and return a ``BroadcastReceiver`` which delegates
|
'''
|
||||||
|
Allocate and return a ``BroadcastReceiver`` which delegates
|
||||||
to this message stream.
|
to this message stream.
|
||||||
|
|
||||||
This allows multiple local tasks to receive each their own copy
|
This allows multiple local tasks to receive each their own copy
|
||||||
|
@ -365,7 +370,8 @@ class Context:
|
||||||
_remote_func_type: Optional[str] = None
|
_remote_func_type: Optional[str] = None
|
||||||
|
|
||||||
# only set on the caller side
|
# only set on the caller side
|
||||||
_portal: Optional['Portal'] = None # type: ignore # noqa
|
_portal: Optional[Portal] = None # type: ignore # noqa
|
||||||
|
_stream: Optional[MsgStream] = None
|
||||||
_result: Optional[Any] = False
|
_result: Optional[Any] = False
|
||||||
_error: Optional[BaseException] = None
|
_error: Optional[BaseException] = None
|
||||||
|
|
||||||
|
@ -425,19 +431,24 @@ class Context:
|
||||||
# (currently) that other portal APIs (``Portal.run()``,
|
# (currently) that other portal APIs (``Portal.run()``,
|
||||||
# ``.run_in_actor()``) do their own error checking at the point
|
# ``.run_in_actor()``) do their own error checking at the point
|
||||||
# of the call and result processing.
|
# of the call and result processing.
|
||||||
log.error(
|
error = unpack_error(msg, self.chan)
|
||||||
|
if (
|
||||||
|
isinstance(error, ContextCancelled)
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||||
f'{msg["error"]["tb_str"]}'
|
f'{msg["error"]["tb_str"]}'
|
||||||
)
|
)
|
||||||
error = unpack_error(msg, self.chan)
|
if self._cancel_called:
|
||||||
if (
|
|
||||||
isinstance(error, ContextCancelled) and
|
|
||||||
self._cancel_called
|
|
||||||
):
|
|
||||||
# this is an expected cancel request response message
|
# this is an expected cancel request response message
|
||||||
# and we don't need to raise it in scope since it will
|
# and we don't need to raise it in scope since it will
|
||||||
# potentially override a real error
|
# potentially override a real error
|
||||||
return
|
return
|
||||||
|
else:
|
||||||
|
log.error(
|
||||||
|
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||||
|
f'{msg["error"]["tb_str"]}'
|
||||||
|
)
|
||||||
|
|
||||||
self._error = error
|
self._error = error
|
||||||
|
|
||||||
|
@ -473,6 +484,7 @@ class Context:
|
||||||
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
||||||
|
|
||||||
self._cancel_called = True
|
self._cancel_called = True
|
||||||
|
ipc_broken: bool = False
|
||||||
|
|
||||||
if side == 'caller':
|
if side == 'caller':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
|
@ -490,7 +502,14 @@ class Context:
|
||||||
# NOTE: we're telling the far end actor to cancel a task
|
# NOTE: we're telling the far end actor to cancel a task
|
||||||
# corresponding to *this actor*. The far end local channel
|
# corresponding to *this actor*. The far end local channel
|
||||||
# instance is passed to `Actor._cancel_task()` implicitly.
|
# instance is passed to `Actor._cancel_task()` implicitly.
|
||||||
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
|
try:
|
||||||
|
await self._portal.run_from_ns(
|
||||||
|
'self',
|
||||||
|
'_cancel_task',
|
||||||
|
cid=cid,
|
||||||
|
)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
ipc_broken = True
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# XXX: there's no way to know if the remote task was indeed
|
# XXX: there's no way to know if the remote task was indeed
|
||||||
|
@ -506,7 +525,10 @@ class Context:
|
||||||
"Timed out on cancelling remote task "
|
"Timed out on cancelling remote task "
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
|
|
||||||
# callee side remote task
|
elif ipc_broken:
|
||||||
|
log.cancel(
|
||||||
|
"Transport layer was broken before cancel request "
|
||||||
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
else:
|
else:
|
||||||
self._cancel_msg = msg
|
self._cancel_msg = msg
|
||||||
|
|
||||||
|
@ -593,10 +615,11 @@ class Context:
|
||||||
async with MsgStream(
|
async with MsgStream(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
rx_chan=ctx._recv_chan,
|
rx_chan=ctx._recv_chan,
|
||||||
) as rchan:
|
) as stream:
|
||||||
|
self._stream = stream
|
||||||
|
|
||||||
if self._portal:
|
if self._portal:
|
||||||
self._portal._streams.add(rchan)
|
self._portal._streams.add(stream)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._stream_opened = True
|
self._stream_opened = True
|
||||||
|
@ -604,7 +627,7 @@ class Context:
|
||||||
# ensure we aren't cancelled before delivering
|
# ensure we aren't cancelled before delivering
|
||||||
# the stream
|
# the stream
|
||||||
# await trio.lowlevel.checkpoint()
|
# await trio.lowlevel.checkpoint()
|
||||||
yield rchan
|
yield stream
|
||||||
|
|
||||||
# XXX: Make the stream "one-shot use". On exit, signal
|
# XXX: Make the stream "one-shot use". On exit, signal
|
||||||
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
|
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
|
||||||
|
@ -635,25 +658,22 @@ class Context:
|
||||||
|
|
||||||
if not self._recv_chan._closed: # type: ignore
|
if not self._recv_chan._closed: # type: ignore
|
||||||
|
|
||||||
# wait for a final context result consuming
|
def consume(
|
||||||
# and discarding any bi dir stream msgs still
|
msg: dict,
|
||||||
# in transit from the far end.
|
|
||||||
while True:
|
|
||||||
|
|
||||||
msg = await self._recv_chan.receive()
|
) -> Optional[dict]:
|
||||||
try:
|
try:
|
||||||
self._result = msg['return']
|
return msg['return']
|
||||||
break
|
|
||||||
except KeyError as msgerr:
|
except KeyError as msgerr:
|
||||||
|
|
||||||
if 'yield' in msg:
|
if 'yield' in msg:
|
||||||
# far end task is still streaming to us so discard
|
# far end task is still streaming to us so discard
|
||||||
log.warning(f'Discarding stream delivered {msg}')
|
log.warning(f'Discarding stream delivered {msg}')
|
||||||
continue
|
return
|
||||||
|
|
||||||
elif 'stop' in msg:
|
elif 'stop' in msg:
|
||||||
log.debug('Remote stream terminated')
|
log.debug('Remote stream terminated')
|
||||||
continue
|
return
|
||||||
|
|
||||||
# internal error should never get here
|
# internal error should never get here
|
||||||
assert msg.get('cid'), (
|
assert msg.get('cid'), (
|
||||||
|
@ -663,6 +683,25 @@ class Context:
|
||||||
msg, self._portal.channel
|
msg, self._portal.channel
|
||||||
) from msgerr
|
) from msgerr
|
||||||
|
|
||||||
|
# wait for a final context result consuming
|
||||||
|
# and discarding any bi dir stream msgs still
|
||||||
|
# in transit from the far end.
|
||||||
|
if self._stream:
|
||||||
|
async with self._stream.subscribe() as bstream:
|
||||||
|
async for msg in bstream:
|
||||||
|
result = consume(msg)
|
||||||
|
if result:
|
||||||
|
self._result = result
|
||||||
|
break
|
||||||
|
|
||||||
|
if not self._result:
|
||||||
|
while True:
|
||||||
|
msg = await self._recv_chan.receive()
|
||||||
|
result = consume(msg)
|
||||||
|
if result:
|
||||||
|
self._result = result
|
||||||
|
break
|
||||||
|
|
||||||
return self._result
|
return self._result
|
||||||
|
|
||||||
async def started(
|
async def started(
|
||||||
|
|
|
@ -83,7 +83,7 @@ class ActorNursery:
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[tuple[str, str], BaseException],
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
self._actor: Actor = actor
|
self._actor: Actor = actor
|
||||||
|
@ -347,8 +347,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
anursery._join_procs.set()
|
anursery._join_procs.set()
|
||||||
|
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
|
||||||
errors[actor.uid] = err
|
|
||||||
# If we error in the root but the debugger is
|
# If we error in the root but the debugger is
|
||||||
# engaged we don't want to prematurely kill (and
|
# engaged we don't want to prematurely kill (and
|
||||||
# thus clobber access to) the local tty since it
|
# thus clobber access to) the local tty since it
|
||||||
|
|
Loading…
Reference in New Issue