Compare commits
4 Commits
f067cf48a7
...
8c39b8b124
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 8c39b8b124 | |
Tyler Goodlet | ededa2e88f | |
Tyler Goodlet | dd168184c3 | |
Tyler Goodlet | 37ee477aee |
|
@ -1,4 +1,4 @@
|
||||||
# vim: ft=ini
|
# vim: ft=conf
|
||||||
# pytest.ini for tractor
|
# pytest.ini for tractor
|
||||||
|
|
||||||
[pytest]
|
[pytest]
|
||||||
|
|
|
@ -18,7 +18,10 @@ from conftest import (
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'debug_mode',
|
'debug_mode',
|
||||||
[False, True],
|
[False, True],
|
||||||
ids=['no_debug_mode', 'debug_mode'],
|
ids=[
|
||||||
|
'no_debug_mode',
|
||||||
|
'debug_mode',
|
||||||
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'ipc_break',
|
'ipc_break',
|
||||||
|
|
|
@ -6,6 +6,7 @@ from collections import Counter
|
||||||
import itertools
|
import itertools
|
||||||
import platform
|
import platform
|
||||||
|
|
||||||
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
@ -143,8 +144,16 @@ def test_dynamic_pub_sub():
|
||||||
|
|
||||||
try:
|
try:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
except trio.TooSlowError:
|
except (
|
||||||
pass
|
trio.TooSlowError,
|
||||||
|
ExceptionGroup,
|
||||||
|
) as err:
|
||||||
|
if isinstance(err, ExceptionGroup):
|
||||||
|
for suberr in err.exceptions:
|
||||||
|
if isinstance(suberr, trio.TooSlowError):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
pytest.fail('Never got a `TooSlowError` ?')
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -1024,6 +1024,8 @@ def test_maybe_allow_overruns_stream(
|
||||||
cancel_ctx: bool,
|
cancel_ctx: bool,
|
||||||
slow_side: str,
|
slow_side: str,
|
||||||
allow_overruns_side: str,
|
allow_overruns_side: str,
|
||||||
|
|
||||||
|
# conftest wide
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
|
@ -1147,3 +1149,52 @@ def test_maybe_allow_overruns_stream(
|
||||||
# if this hits the logic blocks from above are not
|
# if this hits the logic blocks from above are not
|
||||||
# exhaustive..
|
# exhaustive..
|
||||||
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
||||||
|
|
||||||
|
|
||||||
|
def test_ctx_with_self_actor(
|
||||||
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
NOTE: for now this is an INVALID OP!
|
||||||
|
|
||||||
|
BUT, eventually presuming we add a "side" key to `Actor.get_context()`,
|
||||||
|
we might be able to get this working symmetrically, but should we??
|
||||||
|
|
||||||
|
Open a context back to the same actor and ensure all cancellation
|
||||||
|
and error semantics hold the same.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
enable_modules=[__name__],
|
||||||
|
) as an:
|
||||||
|
assert an
|
||||||
|
async with (
|
||||||
|
tractor.find_actor('root') as portal,
|
||||||
|
portal.open_context(
|
||||||
|
expect_cancelled,
|
||||||
|
# echo_back_sequence,
|
||||||
|
# seq=seq,
|
||||||
|
# wait_for_cancel=cancel_ctx,
|
||||||
|
# be_slow=(slow_side == 'child'),
|
||||||
|
# allow_overruns_side=allow_overruns_side,
|
||||||
|
|
||||||
|
) as (ctx, sent),
|
||||||
|
ctx.open_stream() as ipc,
|
||||||
|
):
|
||||||
|
assert sent is None
|
||||||
|
|
||||||
|
seq = list(range(10))
|
||||||
|
for i in seq:
|
||||||
|
await ipc.send(i)
|
||||||
|
rx: int = await ipc.receive()
|
||||||
|
assert rx == i
|
||||||
|
|
||||||
|
await ctx.cancel()
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert 'Invalid Operation' in repr(excinfo.value)
|
||||||
|
|
|
@ -70,7 +70,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||||
async def asyncio_actor(
|
async def asyncio_actor(
|
||||||
|
|
||||||
target: str,
|
target: str,
|
||||||
expect_err: Optional[Exception] = None
|
expect_err: Exception|None = None
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
@ -114,10 +114,21 @@ def test_aio_simple_error(reg_addr):
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(
|
||||||
|
expected_exception=(RemoteActorError, ExceptionGroup),
|
||||||
|
) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
err = excinfo.value
|
err = excinfo.value
|
||||||
|
|
||||||
|
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||||
|
if isinstance(err, ExceptionGroup):
|
||||||
|
err = next(itertools.dropwhile(
|
||||||
|
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
||||||
|
err.exceptions
|
||||||
|
))
|
||||||
|
assert err
|
||||||
|
|
||||||
assert isinstance(err, RemoteActorError)
|
assert isinstance(err, RemoteActorError)
|
||||||
assert err.type == AssertionError
|
assert err.type == AssertionError
|
||||||
|
|
||||||
|
@ -290,11 +301,22 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(
|
||||||
|
expected_exception=(RemoteActorError, ExceptionGroup),
|
||||||
|
) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||||
|
err = excinfo.value
|
||||||
|
if isinstance(err, ExceptionGroup):
|
||||||
|
err = next(itertools.dropwhile(
|
||||||
|
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
||||||
|
err.exceptions
|
||||||
|
))
|
||||||
|
assert err
|
||||||
|
|
||||||
# ensure boxed error is correct
|
# ensure boxed error is correct
|
||||||
assert excinfo.value.type == to_asyncio.AsyncioCancelled
|
assert err.type == to_asyncio.AsyncioCancelled
|
||||||
|
|
||||||
|
|
||||||
# TODO: verify open_channel_from will fail on this..
|
# TODO: verify open_channel_from will fail on this..
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
"""
|
'''
|
||||||
RPC related
|
RPC (or maybe better labelled as "RTS: remote task scheduling"?)
|
||||||
"""
|
related API and error checks.
|
||||||
|
|
||||||
|
'''
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -52,8 +54,13 @@ async def short_sleep():
|
||||||
(['tmp_mod'], 'import doggy', ModuleNotFoundError),
|
(['tmp_mod'], 'import doggy', ModuleNotFoundError),
|
||||||
(['tmp_mod'], '4doggy', SyntaxError),
|
(['tmp_mod'], '4doggy', SyntaxError),
|
||||||
],
|
],
|
||||||
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
|
ids=[
|
||||||
'fail_on_syntax'],
|
'no_mods',
|
||||||
|
'this_mod',
|
||||||
|
'this_mod_bad_func',
|
||||||
|
'fail_to_import',
|
||||||
|
'fail_on_syntax',
|
||||||
|
],
|
||||||
)
|
)
|
||||||
def test_rpc_errors(
|
def test_rpc_errors(
|
||||||
reg_addr,
|
reg_addr,
|
||||||
|
@ -127,7 +134,9 @@ def test_rpc_errors(
|
||||||
run()
|
run()
|
||||||
else:
|
else:
|
||||||
# underlying errors aren't propagated upwards (yet)
|
# underlying errors aren't propagated upwards (yet)
|
||||||
with pytest.raises(remote_err) as err:
|
with pytest.raises(
|
||||||
|
expected_exception=(remote_err, ExceptionGroup),
|
||||||
|
) as err:
|
||||||
run()
|
run()
|
||||||
|
|
||||||
# get raw instance from pytest wrapper
|
# get raw instance from pytest wrapper
|
||||||
|
|
|
@ -1101,6 +1101,8 @@ class Context:
|
||||||
chan=self.chan,
|
chan=self.chan,
|
||||||
cid=self.cid,
|
cid=self.cid,
|
||||||
nsf=self._nsf,
|
nsf=self._nsf,
|
||||||
|
# side=self.side,
|
||||||
|
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
@ -1298,7 +1300,7 @@ class Context:
|
||||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||||
# https://stackoverflow.com/a/24752607
|
# https://stackoverflow.com/a/24752607
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
raise remote_error from None
|
raise remote_error # from None
|
||||||
|
|
||||||
# TODO: change to `.wait_for_result()`?
|
# TODO: change to `.wait_for_result()`?
|
||||||
async def result(
|
async def result(
|
||||||
|
|
|
@ -514,6 +514,16 @@ class Portal:
|
||||||
# a new `_context.py` mod.
|
# a new `_context.py` mod.
|
||||||
nsf = NamespacePath.from_ref(func)
|
nsf = NamespacePath.from_ref(func)
|
||||||
|
|
||||||
|
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
||||||
|
# with "self" since the local feeder mem-chan processing
|
||||||
|
# is not built for it.
|
||||||
|
if self.channel.uid == self.actor.uid:
|
||||||
|
raise RuntimeError(
|
||||||
|
'** !! Invalid Operation !! **\n'
|
||||||
|
'Can not open an IPC ctx with the local actor!\n'
|
||||||
|
f'|_{self.actor}\n'
|
||||||
|
)
|
||||||
|
|
||||||
ctx: Context = await self.actor.start_remote_task(
|
ctx: Context = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
|
|
|
@ -108,7 +108,11 @@ async def open_root_actor(
|
||||||
_state._runtime_vars['_is_root'] = True
|
_state._runtime_vars['_is_root'] = True
|
||||||
|
|
||||||
# caps based rpc list
|
# caps based rpc list
|
||||||
enable_modules = enable_modules or []
|
enable_modules = (
|
||||||
|
enable_modules
|
||||||
|
or
|
||||||
|
[]
|
||||||
|
)
|
||||||
|
|
||||||
if rpc_module_paths:
|
if rpc_module_paths:
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
|
|
|
@ -434,6 +434,10 @@ async def _invoke(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
nsf=NamespacePath.from_ref(func),
|
nsf=NamespacePath.from_ref(func),
|
||||||
|
|
||||||
|
# TODO: if we wanted to get cray and support it?
|
||||||
|
# side='callee',
|
||||||
|
|
||||||
# We shouldn't ever need to pass this through right?
|
# We shouldn't ever need to pass this through right?
|
||||||
# it's up to the soon-to-be called rpc task to
|
# it's up to the soon-to-be called rpc task to
|
||||||
# open the stream with this option.
|
# open the stream with this option.
|
||||||
|
@ -686,9 +690,11 @@ async def _invoke(
|
||||||
# don't pop the local context until we know the
|
# don't pop the local context until we know the
|
||||||
# associated child isn't in debug any more
|
# associated child isn't in debug any more
|
||||||
await maybe_wait_for_debugger()
|
await maybe_wait_for_debugger()
|
||||||
ctx: Context = actor._contexts.pop(
|
ctx: Context = actor._contexts.pop((
|
||||||
(chan.uid, cid)
|
chan.uid,
|
||||||
)
|
cid,
|
||||||
|
# ctx.side,
|
||||||
|
))
|
||||||
|
|
||||||
merr: Exception|None = ctx.maybe_error
|
merr: Exception|None = ctx.maybe_error
|
||||||
|
|
||||||
|
@ -879,7 +885,11 @@ class Actor:
|
||||||
|
|
||||||
# map {actor uids -> Context}
|
# map {actor uids -> Context}
|
||||||
self._contexts: dict[
|
self._contexts: dict[
|
||||||
tuple[tuple[str, str], str],
|
tuple[
|
||||||
|
tuple[str, str], # .uid
|
||||||
|
str, # .cid
|
||||||
|
str, # .side
|
||||||
|
],
|
||||||
Context
|
Context
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
|
@ -1363,7 +1373,13 @@ class Actor:
|
||||||
uid: tuple[str, str] = chan.uid
|
uid: tuple[str, str] = chan.uid
|
||||||
assert uid, f"`chan.uid` can't be {uid}"
|
assert uid, f"`chan.uid` can't be {uid}"
|
||||||
try:
|
try:
|
||||||
ctx: Context = self._contexts[(uid, cid)]
|
ctx: Context = self._contexts[(
|
||||||
|
uid,
|
||||||
|
cid,
|
||||||
|
|
||||||
|
# TODO: how to determine this tho?
|
||||||
|
# side,
|
||||||
|
)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
|
@ -1382,6 +1398,16 @@ class Actor:
|
||||||
cid: str,
|
cid: str,
|
||||||
nsf: NamespacePath,
|
nsf: NamespacePath,
|
||||||
|
|
||||||
|
# TODO: support lookup by `Context.side: str` ?
|
||||||
|
# -> would allow making a self-context which might have
|
||||||
|
# certain special use cases where RPC isolation is wanted
|
||||||
|
# between 2 tasks running in the same process?
|
||||||
|
# => prolly needs some deeper though on the real use cases
|
||||||
|
# and whether or not such things should be better
|
||||||
|
# implemented using a `TaskManager` style nursery..
|
||||||
|
#
|
||||||
|
# side: str|None = None,
|
||||||
|
|
||||||
msg_buffer_size: int | None = None,
|
msg_buffer_size: int | None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
|
|
||||||
|
@ -1397,7 +1423,11 @@ class Actor:
|
||||||
actor_uid = chan.uid
|
actor_uid = chan.uid
|
||||||
assert actor_uid
|
assert actor_uid
|
||||||
try:
|
try:
|
||||||
ctx = self._contexts[(actor_uid, cid)]
|
ctx = self._contexts[(
|
||||||
|
actor_uid,
|
||||||
|
cid,
|
||||||
|
# side,
|
||||||
|
)]
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Retreived cached IPC ctx for\n'
|
f'Retreived cached IPC ctx for\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
|
@ -1423,7 +1453,11 @@ class Actor:
|
||||||
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
||||||
_allow_overruns=allow_overruns,
|
_allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
self._contexts[(actor_uid, cid)] = ctx
|
self._contexts[(
|
||||||
|
actor_uid,
|
||||||
|
cid,
|
||||||
|
# side,
|
||||||
|
)] = ctx
|
||||||
|
|
||||||
return ctx
|
return ctx
|
||||||
|
|
||||||
|
@ -1454,6 +1488,8 @@ class Actor:
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
|
|
||||||
|
# side='caller',
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
|
|
@ -90,17 +90,27 @@ class MsgStream(trio.abc.Channel):
|
||||||
self._closed: bool|trio.ClosedResourceError = False
|
self._closed: bool|trio.ClosedResourceError = False
|
||||||
|
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(self):
|
def receive_nowait(
|
||||||
msg = self._rx_chan.receive_nowait()
|
self,
|
||||||
|
allow_msg_keys: list[str] = ['yield'],
|
||||||
|
):
|
||||||
|
msg: dict = self._rx_chan.receive_nowait()
|
||||||
|
for (
|
||||||
|
i,
|
||||||
|
key,
|
||||||
|
) in enumerate(allow_msg_keys):
|
||||||
try:
|
try:
|
||||||
return msg['yield']
|
return msg[key]
|
||||||
except KeyError as kerr:
|
except KeyError as kerr:
|
||||||
|
if i < (len(allow_msg_keys) - 1):
|
||||||
|
continue
|
||||||
|
|
||||||
_raise_from_no_key_in_msg(
|
_raise_from_no_key_in_msg(
|
||||||
ctx=self._ctx,
|
ctx=self._ctx,
|
||||||
msg=msg,
|
msg=msg,
|
||||||
src_err=kerr,
|
src_err=kerr,
|
||||||
log=log,
|
log=log,
|
||||||
expect_key='yield',
|
expect_key=key,
|
||||||
stream=self,
|
stream=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -263,7 +273,9 @@ class MsgStream(trio.abc.Channel):
|
||||||
drained: list[Exception|dict] = []
|
drained: list[Exception|dict] = []
|
||||||
while not drained:
|
while not drained:
|
||||||
try:
|
try:
|
||||||
maybe_final_msg = self.receive_nowait()
|
maybe_final_msg = self.receive_nowait(
|
||||||
|
allow_msg_keys=['yield', 'return'],
|
||||||
|
)
|
||||||
if maybe_final_msg:
|
if maybe_final_msg:
|
||||||
log.debug(
|
log.debug(
|
||||||
'Drained un-processed stream msg:\n'
|
'Drained un-processed stream msg:\n'
|
||||||
|
|
Loading…
Reference in New Issue