Compare commits
4 Commits
f067cf48a7
...
8c39b8b124
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 8c39b8b124 | |
Tyler Goodlet | ededa2e88f | |
Tyler Goodlet | dd168184c3 | |
Tyler Goodlet | 37ee477aee |
|
@ -1,4 +1,4 @@
|
|||
# vim: ft=ini
|
||||
# vim: ft=conf
|
||||
# pytest.ini for tractor
|
||||
|
||||
[pytest]
|
||||
|
|
|
@ -18,7 +18,10 @@ from conftest import (
|
|||
@pytest.mark.parametrize(
|
||||
'debug_mode',
|
||||
[False, True],
|
||||
ids=['no_debug_mode', 'debug_mode'],
|
||||
ids=[
|
||||
'no_debug_mode',
|
||||
'debug_mode',
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'ipc_break',
|
||||
|
|
|
@ -6,6 +6,7 @@ from collections import Counter
|
|||
import itertools
|
||||
import platform
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
|
||||
|
@ -143,8 +144,16 @@ def test_dynamic_pub_sub():
|
|||
|
||||
try:
|
||||
trio.run(main)
|
||||
except trio.TooSlowError:
|
||||
pass
|
||||
except (
|
||||
trio.TooSlowError,
|
||||
ExceptionGroup,
|
||||
) as err:
|
||||
if isinstance(err, ExceptionGroup):
|
||||
for suberr in err.exceptions:
|
||||
if isinstance(suberr, trio.TooSlowError):
|
||||
break
|
||||
else:
|
||||
pytest.fail('Never got a `TooSlowError` ?')
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
|
@ -1024,6 +1024,8 @@ def test_maybe_allow_overruns_stream(
|
|||
cancel_ctx: bool,
|
||||
slow_side: str,
|
||||
allow_overruns_side: str,
|
||||
|
||||
# conftest wide
|
||||
loglevel: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
|
@ -1147,3 +1149,52 @@ def test_maybe_allow_overruns_stream(
|
|||
# if this hits the logic blocks from above are not
|
||||
# exhaustive..
|
||||
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
||||
|
||||
|
||||
def test_ctx_with_self_actor(
|
||||
loglevel: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
NOTE: for now this is an INVALID OP!
|
||||
|
||||
BUT, eventually presuming we add a "side" key to `Actor.get_context()`,
|
||||
we might be able to get this working symmetrically, but should we??
|
||||
|
||||
Open a context back to the same actor and ensure all cancellation
|
||||
and error semantics hold the same.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
enable_modules=[__name__],
|
||||
) as an:
|
||||
assert an
|
||||
async with (
|
||||
tractor.find_actor('root') as portal,
|
||||
portal.open_context(
|
||||
expect_cancelled,
|
||||
# echo_back_sequence,
|
||||
# seq=seq,
|
||||
# wait_for_cancel=cancel_ctx,
|
||||
# be_slow=(slow_side == 'child'),
|
||||
# allow_overruns_side=allow_overruns_side,
|
||||
|
||||
) as (ctx, sent),
|
||||
ctx.open_stream() as ipc,
|
||||
):
|
||||
assert sent is None
|
||||
|
||||
seq = list(range(10))
|
||||
for i in seq:
|
||||
await ipc.send(i)
|
||||
rx: int = await ipc.receive()
|
||||
assert rx == i
|
||||
|
||||
await ctx.cancel()
|
||||
|
||||
with pytest.raises(RuntimeError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
assert 'Invalid Operation' in repr(excinfo.value)
|
||||
|
|
|
@ -70,7 +70,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
|
|||
async def asyncio_actor(
|
||||
|
||||
target: str,
|
||||
expect_err: Optional[Exception] = None
|
||||
expect_err: Exception|None = None
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -114,10 +114,21 @@ def test_aio_simple_error(reg_addr):
|
|||
infect_asyncio=True,
|
||||
)
|
||||
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
with pytest.raises(
|
||||
expected_exception=(RemoteActorError, ExceptionGroup),
|
||||
) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
err = excinfo.value
|
||||
|
||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||
if isinstance(err, ExceptionGroup):
|
||||
err = next(itertools.dropwhile(
|
||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
||||
err.exceptions
|
||||
))
|
||||
assert err
|
||||
|
||||
assert isinstance(err, RemoteActorError)
|
||||
assert err.type == AssertionError
|
||||
|
||||
|
@ -290,11 +301,22 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
|||
infect_asyncio=True,
|
||||
)
|
||||
|
||||
with pytest.raises(RemoteActorError) as excinfo:
|
||||
with pytest.raises(
|
||||
expected_exception=(RemoteActorError, ExceptionGroup),
|
||||
) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||
err = excinfo.value
|
||||
if isinstance(err, ExceptionGroup):
|
||||
err = next(itertools.dropwhile(
|
||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
||||
err.exceptions
|
||||
))
|
||||
assert err
|
||||
|
||||
# ensure boxed error is correct
|
||||
assert excinfo.value.type == to_asyncio.AsyncioCancelled
|
||||
assert err.type == to_asyncio.AsyncioCancelled
|
||||
|
||||
|
||||
# TODO: verify open_channel_from will fail on this..
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
"""
|
||||
RPC related
|
||||
"""
|
||||
'''
|
||||
RPC (or maybe better labelled as "RTS: remote task scheduling"?)
|
||||
related API and error checks.
|
||||
|
||||
'''
|
||||
import itertools
|
||||
|
||||
import pytest
|
||||
|
@ -52,8 +54,13 @@ async def short_sleep():
|
|||
(['tmp_mod'], 'import doggy', ModuleNotFoundError),
|
||||
(['tmp_mod'], '4doggy', SyntaxError),
|
||||
],
|
||||
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
|
||||
'fail_on_syntax'],
|
||||
ids=[
|
||||
'no_mods',
|
||||
'this_mod',
|
||||
'this_mod_bad_func',
|
||||
'fail_to_import',
|
||||
'fail_on_syntax',
|
||||
],
|
||||
)
|
||||
def test_rpc_errors(
|
||||
reg_addr,
|
||||
|
@ -127,7 +134,9 @@ def test_rpc_errors(
|
|||
run()
|
||||
else:
|
||||
# underlying errors aren't propagated upwards (yet)
|
||||
with pytest.raises(remote_err) as err:
|
||||
with pytest.raises(
|
||||
expected_exception=(remote_err, ExceptionGroup),
|
||||
) as err:
|
||||
run()
|
||||
|
||||
# get raw instance from pytest wrapper
|
||||
|
|
|
@ -1101,6 +1101,8 @@ class Context:
|
|||
chan=self.chan,
|
||||
cid=self.cid,
|
||||
nsf=self._nsf,
|
||||
# side=self.side,
|
||||
|
||||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
|
@ -1298,7 +1300,7 @@ class Context:
|
|||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||
# https://stackoverflow.com/a/24752607
|
||||
__tracebackhide__: bool = True
|
||||
raise remote_error from None
|
||||
raise remote_error # from None
|
||||
|
||||
# TODO: change to `.wait_for_result()`?
|
||||
async def result(
|
||||
|
|
|
@ -514,6 +514,16 @@ class Portal:
|
|||
# a new `_context.py` mod.
|
||||
nsf = NamespacePath.from_ref(func)
|
||||
|
||||
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
||||
# with "self" since the local feeder mem-chan processing
|
||||
# is not built for it.
|
||||
if self.channel.uid == self.actor.uid:
|
||||
raise RuntimeError(
|
||||
'** !! Invalid Operation !! **\n'
|
||||
'Can not open an IPC ctx with the local actor!\n'
|
||||
f'|_{self.actor}\n'
|
||||
)
|
||||
|
||||
ctx: Context = await self.actor.start_remote_task(
|
||||
self.channel,
|
||||
nsf=nsf,
|
||||
|
|
|
@ -108,7 +108,11 @@ async def open_root_actor(
|
|||
_state._runtime_vars['_is_root'] = True
|
||||
|
||||
# caps based rpc list
|
||||
enable_modules = enable_modules or []
|
||||
enable_modules = (
|
||||
enable_modules
|
||||
or
|
||||
[]
|
||||
)
|
||||
|
||||
if rpc_module_paths:
|
||||
warnings.warn(
|
||||
|
|
|
@ -434,6 +434,10 @@ async def _invoke(
|
|||
chan=chan,
|
||||
cid=cid,
|
||||
nsf=NamespacePath.from_ref(func),
|
||||
|
||||
# TODO: if we wanted to get cray and support it?
|
||||
# side='callee',
|
||||
|
||||
# We shouldn't ever need to pass this through right?
|
||||
# it's up to the soon-to-be called rpc task to
|
||||
# open the stream with this option.
|
||||
|
@ -686,9 +690,11 @@ async def _invoke(
|
|||
# don't pop the local context until we know the
|
||||
# associated child isn't in debug any more
|
||||
await maybe_wait_for_debugger()
|
||||
ctx: Context = actor._contexts.pop(
|
||||
(chan.uid, cid)
|
||||
)
|
||||
ctx: Context = actor._contexts.pop((
|
||||
chan.uid,
|
||||
cid,
|
||||
# ctx.side,
|
||||
))
|
||||
|
||||
merr: Exception|None = ctx.maybe_error
|
||||
|
||||
|
@ -879,7 +885,11 @@ class Actor:
|
|||
|
||||
# map {actor uids -> Context}
|
||||
self._contexts: dict[
|
||||
tuple[tuple[str, str], str],
|
||||
tuple[
|
||||
tuple[str, str], # .uid
|
||||
str, # .cid
|
||||
str, # .side
|
||||
],
|
||||
Context
|
||||
] = {}
|
||||
|
||||
|
@ -1363,7 +1373,13 @@ class Actor:
|
|||
uid: tuple[str, str] = chan.uid
|
||||
assert uid, f"`chan.uid` can't be {uid}"
|
||||
try:
|
||||
ctx: Context = self._contexts[(uid, cid)]
|
||||
ctx: Context = self._contexts[(
|
||||
uid,
|
||||
cid,
|
||||
|
||||
# TODO: how to determine this tho?
|
||||
# side,
|
||||
)]
|
||||
except KeyError:
|
||||
log.warning(
|
||||
'Ignoring invalid IPC ctx msg!\n\n'
|
||||
|
@ -1382,6 +1398,16 @@ class Actor:
|
|||
cid: str,
|
||||
nsf: NamespacePath,
|
||||
|
||||
# TODO: support lookup by `Context.side: str` ?
|
||||
# -> would allow making a self-context which might have
|
||||
# certain special use cases where RPC isolation is wanted
|
||||
# between 2 tasks running in the same process?
|
||||
# => prolly needs some deeper though on the real use cases
|
||||
# and whether or not such things should be better
|
||||
# implemented using a `TaskManager` style nursery..
|
||||
#
|
||||
# side: str|None = None,
|
||||
|
||||
msg_buffer_size: int | None = None,
|
||||
allow_overruns: bool = False,
|
||||
|
||||
|
@ -1397,7 +1423,11 @@ class Actor:
|
|||
actor_uid = chan.uid
|
||||
assert actor_uid
|
||||
try:
|
||||
ctx = self._contexts[(actor_uid, cid)]
|
||||
ctx = self._contexts[(
|
||||
actor_uid,
|
||||
cid,
|
||||
# side,
|
||||
)]
|
||||
log.runtime(
|
||||
f'Retreived cached IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
|
@ -1423,7 +1453,11 @@ class Actor:
|
|||
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
||||
_allow_overruns=allow_overruns,
|
||||
)
|
||||
self._contexts[(actor_uid, cid)] = ctx
|
||||
self._contexts[(
|
||||
actor_uid,
|
||||
cid,
|
||||
# side,
|
||||
)] = ctx
|
||||
|
||||
return ctx
|
||||
|
||||
|
@ -1454,6 +1488,8 @@ class Actor:
|
|||
chan=chan,
|
||||
cid=cid,
|
||||
nsf=nsf,
|
||||
|
||||
# side='caller',
|
||||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
|
|
|
@ -90,17 +90,27 @@ class MsgStream(trio.abc.Channel):
|
|||
self._closed: bool|trio.ClosedResourceError = False
|
||||
|
||||
# delegate directly to underlying mem channel
|
||||
def receive_nowait(self):
|
||||
msg = self._rx_chan.receive_nowait()
|
||||
def receive_nowait(
|
||||
self,
|
||||
allow_msg_keys: list[str] = ['yield'],
|
||||
):
|
||||
msg: dict = self._rx_chan.receive_nowait()
|
||||
for (
|
||||
i,
|
||||
key,
|
||||
) in enumerate(allow_msg_keys):
|
||||
try:
|
||||
return msg['yield']
|
||||
return msg[key]
|
||||
except KeyError as kerr:
|
||||
if i < (len(allow_msg_keys) - 1):
|
||||
continue
|
||||
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=self._ctx,
|
||||
msg=msg,
|
||||
src_err=kerr,
|
||||
log=log,
|
||||
expect_key='yield',
|
||||
expect_key=key,
|
||||
stream=self,
|
||||
)
|
||||
|
||||
|
@ -263,7 +273,9 @@ class MsgStream(trio.abc.Channel):
|
|||
drained: list[Exception|dict] = []
|
||||
while not drained:
|
||||
try:
|
||||
maybe_final_msg = self.receive_nowait()
|
||||
maybe_final_msg = self.receive_nowait(
|
||||
allow_msg_keys=['yield', 'return'],
|
||||
)
|
||||
if maybe_final_msg:
|
||||
log.debug(
|
||||
'Drained un-processed stream msg:\n'
|
||||
|
|
Loading…
Reference in New Issue