Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 8c39b8b124 Tweak some tests for spurious failues
With the seeming cause that some cases occasionally raise
`ExceptionGroup` instead of a (collapsed out) single error which, in
those cases at least try to check that `.exceptions` has the original
error.
2024-03-11 10:37:34 -04:00
Tyler Goodlet ededa2e88f More spaceless union type annots 2024-03-11 10:33:06 -04:00
Tyler Goodlet dd168184c3 Add a open-ctx-with-self test
Found exactly why trying this won't work when playing around with
opening workspaces in `modden` using a `Portal.open_context()` back to
the 'bigd' root actor: the RPC machinery only registers one entry in
`Actor._contexts` which will get overwritten by each task's side and
then experience race-based IPC msging errors (eg. rxing `{'started': _}`
on the callee side..). Instead make opening a ctx back to the self-actor
a runtime error describing it as an invalid op.

To match:
- add a new test `test_ctx_with_self_actor()` to the context semantics
  suite.
- tried out adding a new `side: str` to the `Actor.get_context()` (and
  callers) but ran into not being able to determine the value from in
  `._push_result()` where it's needed to figure out which side to push
  to.. So, just leaving the commented arg (passing) in the runtime core
  for now in case we can come back to trying to make it work, tho i'm
  thinking it's not the right hack anyway XD
2024-03-11 10:29:42 -04:00
Tyler Goodlet 37ee477aee Let `MsgStream.receive_nowait()` take in msg key list
Call it `allow_msg_keys: list[str] = ['yield']` and set it to accept
`['yield', 'return']` from the drain loop in `.aclose()`. Only pass the
last key error to `_raise_from_no_key_in_msg()` in the fall-through
case.

Somehow this seems to prevent all the intermittent test failures i was
seeing in local runs including when running the entire suite all in
sequence; i ain't complaining B)
2024-03-11 10:20:55 -04:00
12 changed files with 204 additions and 46 deletions

View File

@ -1,4 +1,4 @@
# vim: ft=ini # vim: ft=conf
# pytest.ini for tractor # pytest.ini for tractor
[pytest] [pytest]

View File

@ -18,7 +18,10 @@ from conftest import (
@pytest.mark.parametrize( @pytest.mark.parametrize(
'debug_mode', 'debug_mode',
[False, True], [False, True],
ids=['no_debug_mode', 'debug_mode'], ids=[
'no_debug_mode',
'debug_mode',
],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'ipc_break', 'ipc_break',

View File

@ -6,6 +6,7 @@ from collections import Counter
import itertools import itertools
import platform import platform
import pytest
import trio import trio
import tractor import tractor
@ -143,8 +144,16 @@ def test_dynamic_pub_sub():
try: try:
trio.run(main) trio.run(main)
except trio.TooSlowError: except (
pass trio.TooSlowError,
ExceptionGroup,
) as err:
if isinstance(err, ExceptionGroup):
for suberr in err.exceptions:
if isinstance(suberr, trio.TooSlowError):
break
else:
pytest.fail('Never got a `TooSlowError` ?')
@tractor.context @tractor.context

View File

@ -1024,6 +1024,8 @@ def test_maybe_allow_overruns_stream(
cancel_ctx: bool, cancel_ctx: bool,
slow_side: str, slow_side: str,
allow_overruns_side: str, allow_overruns_side: str,
# conftest wide
loglevel: str, loglevel: str,
debug_mode: bool, debug_mode: bool,
): ):
@ -1147,3 +1149,52 @@ def test_maybe_allow_overruns_stream(
# if this hits the logic blocks from above are not # if this hits the logic blocks from above are not
# exhaustive.. # exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
def test_ctx_with_self_actor(
loglevel: str,
debug_mode: bool,
):
'''
NOTE: for now this is an INVALID OP!
BUT, eventually presuming we add a "side" key to `Actor.get_context()`,
we might be able to get this working symmetrically, but should we??
Open a context back to the same actor and ensure all cancellation
and error semantics hold the same.
'''
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_modules=[__name__],
) as an:
assert an
async with (
tractor.find_actor('root') as portal,
portal.open_context(
expect_cancelled,
# echo_back_sequence,
# seq=seq,
# wait_for_cancel=cancel_ctx,
# be_slow=(slow_side == 'child'),
# allow_overruns_side=allow_overruns_side,
) as (ctx, sent),
ctx.open_stream() as ipc,
):
assert sent is None
seq = list(range(10))
for i in seq:
await ipc.send(i)
rx: int = await ipc.receive()
assert rx == i
await ctx.cancel()
with pytest.raises(RuntimeError) as excinfo:
trio.run(main)
assert 'Invalid Operation' in repr(excinfo.value)

View File

@ -70,7 +70,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
async def asyncio_actor( async def asyncio_actor(
target: str, target: str,
expect_err: Optional[Exception] = None expect_err: Exception|None = None
) -> None: ) -> None:
@ -114,10 +114,21 @@ def test_aio_simple_error(reg_addr):
infect_asyncio=True, infect_asyncio=True,
) )
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(
expected_exception=(RemoteActorError, ExceptionGroup),
) as excinfo:
trio.run(main) trio.run(main)
err = excinfo.value err = excinfo.value
# might get multiple `trio.Cancelled`s as well inside an inception
if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
err.exceptions
))
assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.type == AssertionError assert err.type == AssertionError
@ -290,11 +301,22 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
infect_asyncio=True, infect_asyncio=True,
) )
with pytest.raises(RemoteActorError) as excinfo: with pytest.raises(
expected_exception=(RemoteActorError, ExceptionGroup),
) as excinfo:
trio.run(main) trio.run(main)
# might get multiple `trio.Cancelled`s as well inside an inception
err = excinfo.value
if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
err.exceptions
))
assert err
# ensure boxed error is correct # ensure boxed error is correct
assert excinfo.value.type == to_asyncio.AsyncioCancelled assert err.type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this.. # TODO: verify open_channel_from will fail on this..

View File

@ -1,6 +1,8 @@
""" '''
RPC related RPC (or maybe better labelled as "RTS: remote task scheduling"?)
""" related API and error checks.
'''
import itertools import itertools
import pytest import pytest
@ -52,8 +54,13 @@ async def short_sleep():
(['tmp_mod'], 'import doggy', ModuleNotFoundError), (['tmp_mod'], 'import doggy', ModuleNotFoundError),
(['tmp_mod'], '4doggy', SyntaxError), (['tmp_mod'], '4doggy', SyntaxError),
], ],
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import', ids=[
'fail_on_syntax'], 'no_mods',
'this_mod',
'this_mod_bad_func',
'fail_to_import',
'fail_on_syntax',
],
) )
def test_rpc_errors( def test_rpc_errors(
reg_addr, reg_addr,
@ -127,7 +134,9 @@ def test_rpc_errors(
run() run()
else: else:
# underlying errors aren't propagated upwards (yet) # underlying errors aren't propagated upwards (yet)
with pytest.raises(remote_err) as err: with pytest.raises(
expected_exception=(remote_err, ExceptionGroup),
) as err:
run() run()
# get raw instance from pytest wrapper # get raw instance from pytest wrapper

View File

@ -1101,6 +1101,8 @@ class Context:
chan=self.chan, chan=self.chan,
cid=self.cid, cid=self.cid,
nsf=self._nsf, nsf=self._nsf,
# side=self.side,
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
@ -1298,7 +1300,7 @@ class Context:
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607 # https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True __tracebackhide__: bool = True
raise remote_error from None raise remote_error # from None
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
async def result( async def result(

View File

@ -162,8 +162,8 @@ async def query_actor(
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None, arbiter_sockaddr: tuple[str, int]|None = None,
registry_addrs: list[tuple[str, int]] | None = None, registry_addrs: list[tuple[str, int]]|None = None,
only_first: bool = True, only_first: bool = True,
raise_on_none: bool = False, raise_on_none: bool = False,

View File

@ -514,6 +514,16 @@ class Portal:
# a new `_context.py` mod. # a new `_context.py` mod.
nsf = NamespacePath.from_ref(func) nsf = NamespacePath.from_ref(func)
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if self.channel.uid == self.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
f'|_{self.actor}\n'
)
ctx: Context = await self.actor.start_remote_task( ctx: Context = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=nsf, nsf=nsf,

View File

@ -63,26 +63,26 @@ async def open_root_actor(
*, *,
# defaults are above # defaults are above
registry_addrs: list[tuple[str, int]] | None = None, registry_addrs: list[tuple[str, int]]|None = None,
# defaults are above # defaults are above
arbiter_addr: tuple[str, int] | None = None, arbiter_addr: tuple[str, int]|None = None,
name: str | None = 'root', name: str|None = 'root',
# either the `multiprocessing` start method: # either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default). # OR `trio` (the new default).
start_method: _spawn.SpawnMethodKey | None = None, start_method: _spawn.SpawnMethodKey|None = None,
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
# internal logging # internal logging
loglevel: str | None = None, loglevel: str|None = None,
enable_modules: list | None = None, enable_modules: list|None = None,
rpc_module_paths: list | None = None, rpc_module_paths: list|None = None,
# NOTE: allow caller to ensure that only one registry exists # NOTE: allow caller to ensure that only one registry exists
# and that this call creates it. # and that this call creates it.
@ -108,7 +108,11 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True
# caps based rpc list # caps based rpc list
enable_modules = enable_modules or [] enable_modules = (
enable_modules
or
[]
)
if rpc_module_paths: if rpc_module_paths:
warnings.warn( warnings.warn(

View File

@ -434,6 +434,10 @@ async def _invoke(
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=NamespacePath.from_ref(func), nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it?
# side='callee',
# We shouldn't ever need to pass this through right? # We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to # it's up to the soon-to-be called rpc task to
# open the stream with this option. # open the stream with this option.
@ -686,9 +690,11 @@ async def _invoke(
# don't pop the local context until we know the # don't pop the local context until we know the
# associated child isn't in debug any more # associated child isn't in debug any more
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop( ctx: Context = actor._contexts.pop((
(chan.uid, cid) chan.uid,
) cid,
# ctx.side,
))
merr: Exception|None = ctx.maybe_error merr: Exception|None = ctx.maybe_error
@ -879,7 +885,11 @@ class Actor:
# map {actor uids -> Context} # map {actor uids -> Context}
self._contexts: dict[ self._contexts: dict[
tuple[tuple[str, str], str], tuple[
tuple[str, str], # .uid
str, # .cid
str, # .side
],
Context Context
] = {} ] = {}
@ -1363,7 +1373,13 @@ class Actor:
uid: tuple[str, str] = chan.uid uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}" assert uid, f"`chan.uid` can't be {uid}"
try: try:
ctx: Context = self._contexts[(uid, cid)] ctx: Context = self._contexts[(
uid,
cid,
# TODO: how to determine this tho?
# side,
)]
except KeyError: except KeyError:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
@ -1382,6 +1398,16 @@ class Actor:
cid: str, cid: str,
nsf: NamespacePath, nsf: NamespacePath,
# TODO: support lookup by `Context.side: str` ?
# -> would allow making a self-context which might have
# certain special use cases where RPC isolation is wanted
# between 2 tasks running in the same process?
# => prolly needs some deeper though on the real use cases
# and whether or not such things should be better
# implemented using a `TaskManager` style nursery..
#
# side: str|None = None,
msg_buffer_size: int | None = None, msg_buffer_size: int | None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
@ -1397,7 +1423,11 @@ class Actor:
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
ctx = self._contexts[(actor_uid, cid)] ctx = self._contexts[(
actor_uid,
cid,
# side,
)]
log.runtime( log.runtime(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
@ -1423,7 +1453,11 @@ class Actor:
msg_buffer_size=msg_buffer_size or self.msg_buffer_size, msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns, _allow_overruns=allow_overruns,
) )
self._contexts[(actor_uid, cid)] = ctx self._contexts[(
actor_uid,
cid,
# side,
)] = ctx
return ctx return ctx
@ -1454,6 +1488,8 @@ class Actor:
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=nsf, nsf=nsf,
# side='caller',
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )

View File

@ -90,17 +90,27 @@ class MsgStream(trio.abc.Channel):
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(
msg = self._rx_chan.receive_nowait() self,
allow_msg_keys: list[str] = ['yield'],
):
msg: dict = self._rx_chan.receive_nowait()
for (
i,
key,
) in enumerate(allow_msg_keys):
try: try:
return msg['yield'] return msg[key]
except KeyError as kerr: except KeyError as kerr:
if i < (len(allow_msg_keys) - 1):
continue
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=self._ctx, ctx=self._ctx,
msg=msg, msg=msg,
src_err=kerr, src_err=kerr,
log=log, log=log,
expect_key='yield', expect_key=key,
stream=self, stream=self,
) )
@ -263,7 +273,9 @@ class MsgStream(trio.abc.Channel):
drained: list[Exception|dict] = [] drained: list[Exception|dict] = []
while not drained: while not drained:
try: try:
maybe_final_msg = self.receive_nowait() maybe_final_msg = self.receive_nowait(
allow_msg_keys=['yield', 'return'],
)
if maybe_final_msg: if maybe_final_msg:
log.debug( log.debug(
'Drained un-processed stream msg:\n' 'Drained un-processed stream msg:\n'