Compare commits

..

No commits in common. "8c39b8b124de203f5ff244d20016b0416136fc57" and "f067cf48a73bbf03072921664f6604017ad81e40" have entirely different histories.

12 changed files with 46 additions and 204 deletions

View File

@ -1,4 +1,4 @@
# vim: ft=conf
# vim: ft=ini
# pytest.ini for tractor
[pytest]

View File

@ -18,10 +18,7 @@ from conftest import (
@pytest.mark.parametrize(
'debug_mode',
[False, True],
ids=[
'no_debug_mode',
'debug_mode',
],
ids=['no_debug_mode', 'debug_mode'],
)
@pytest.mark.parametrize(
'ipc_break',

View File

@ -6,7 +6,6 @@ from collections import Counter
import itertools
import platform
import pytest
import trio
import tractor
@ -144,16 +143,8 @@ def test_dynamic_pub_sub():
try:
trio.run(main)
except (
trio.TooSlowError,
ExceptionGroup,
) as err:
if isinstance(err, ExceptionGroup):
for suberr in err.exceptions:
if isinstance(suberr, trio.TooSlowError):
break
else:
pytest.fail('Never got a `TooSlowError` ?')
except trio.TooSlowError:
pass
@tractor.context

View File

@ -1024,8 +1024,6 @@ def test_maybe_allow_overruns_stream(
cancel_ctx: bool,
slow_side: str,
allow_overruns_side: str,
# conftest wide
loglevel: str,
debug_mode: bool,
):
@ -1149,52 +1147,3 @@ def test_maybe_allow_overruns_stream(
# if this hits the logic blocks from above are not
# exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
def test_ctx_with_self_actor(
loglevel: str,
debug_mode: bool,
):
'''
NOTE: for now this is an INVALID OP!
BUT, eventually presuming we add a "side" key to `Actor.get_context()`,
we might be able to get this working symmetrically, but should we??
Open a context back to the same actor and ensure all cancellation
and error semantics hold the same.
'''
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_modules=[__name__],
) as an:
assert an
async with (
tractor.find_actor('root') as portal,
portal.open_context(
expect_cancelled,
# echo_back_sequence,
# seq=seq,
# wait_for_cancel=cancel_ctx,
# be_slow=(slow_side == 'child'),
# allow_overruns_side=allow_overruns_side,
) as (ctx, sent),
ctx.open_stream() as ipc,
):
assert sent is None
seq = list(range(10))
for i in seq:
await ipc.send(i)
rx: int = await ipc.receive()
assert rx == i
await ctx.cancel()
with pytest.raises(RuntimeError) as excinfo:
trio.run(main)
assert 'Invalid Operation' in repr(excinfo.value)

View File

@ -70,7 +70,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
async def asyncio_actor(
target: str,
expect_err: Exception|None = None
expect_err: Optional[Exception] = None
) -> None:
@ -114,21 +114,10 @@ def test_aio_simple_error(reg_addr):
infect_asyncio=True,
)
with pytest.raises(
expected_exception=(RemoteActorError, ExceptionGroup),
) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
err = excinfo.value
# might get multiple `trio.Cancelled`s as well inside an inception
if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
err.exceptions
))
assert err
assert isinstance(err, RemoteActorError)
assert err.type == AssertionError
@ -301,22 +290,11 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
infect_asyncio=True,
)
with pytest.raises(
expected_exception=(RemoteActorError, ExceptionGroup),
) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# might get multiple `trio.Cancelled`s as well inside an inception
err = excinfo.value
if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
err.exceptions
))
assert err
# ensure boxed error is correct
assert err.type == to_asyncio.AsyncioCancelled
assert excinfo.value.type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this..

View File

@ -1,8 +1,6 @@
'''
RPC (or maybe better labelled as "RTS: remote task scheduling"?)
related API and error checks.
'''
"""
RPC related
"""
import itertools
import pytest
@ -54,13 +52,8 @@ async def short_sleep():
(['tmp_mod'], 'import doggy', ModuleNotFoundError),
(['tmp_mod'], '4doggy', SyntaxError),
],
ids=[
'no_mods',
'this_mod',
'this_mod_bad_func',
'fail_to_import',
'fail_on_syntax',
],
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
'fail_on_syntax'],
)
def test_rpc_errors(
reg_addr,
@ -134,9 +127,7 @@ def test_rpc_errors(
run()
else:
# underlying errors aren't propagated upwards (yet)
with pytest.raises(
expected_exception=(remote_err, ExceptionGroup),
) as err:
with pytest.raises(remote_err) as err:
run()
# get raw instance from pytest wrapper

View File

@ -1101,8 +1101,6 @@ class Context:
chan=self.chan,
cid=self.cid,
nsf=self._nsf,
# side=self.side,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
@ -1300,7 +1298,7 @@ class Context:
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
raise remote_error # from None
raise remote_error from None
# TODO: change to `.wait_for_result()`?
async def result(

View File

@ -162,8 +162,8 @@ async def query_actor(
@acm
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int]|None = None,
registry_addrs: list[tuple[str, int]]|None = None,
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]] | None = None,
only_first: bool = True,
raise_on_none: bool = False,

View File

@ -514,16 +514,6 @@ class Portal:
# a new `_context.py` mod.
nsf = NamespacePath.from_ref(func)
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if self.channel.uid == self.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
f'|_{self.actor}\n'
)
ctx: Context = await self.actor.start_remote_task(
self.channel,
nsf=nsf,

View File

@ -63,26 +63,26 @@ async def open_root_actor(
*,
# defaults are above
registry_addrs: list[tuple[str, int]]|None = None,
registry_addrs: list[tuple[str, int]] | None = None,
# defaults are above
arbiter_addr: tuple[str, int]|None = None,
arbiter_addr: tuple[str, int] | None = None,
name: str|None = 'root',
name: str | None = 'root',
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
start_method: _spawn.SpawnMethodKey|None = None,
start_method: _spawn.SpawnMethodKey | None = None,
# enables the multi-process debugger support
debug_mode: bool = False,
# internal logging
loglevel: str|None = None,
loglevel: str | None = None,
enable_modules: list|None = None,
rpc_module_paths: list|None = None,
enable_modules: list | None = None,
rpc_module_paths: list | None = None,
# NOTE: allow caller to ensure that only one registry exists
# and that this call creates it.
@ -108,11 +108,7 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True
# caps based rpc list
enable_modules = (
enable_modules
or
[]
)
enable_modules = enable_modules or []
if rpc_module_paths:
warnings.warn(

View File

@ -434,10 +434,6 @@ async def _invoke(
chan=chan,
cid=cid,
nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it?
# side='callee',
# We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to
# open the stream with this option.
@ -690,11 +686,9 @@ async def _invoke(
# don't pop the local context until we know the
# associated child isn't in debug any more
await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((
chan.uid,
cid,
# ctx.side,
))
ctx: Context = actor._contexts.pop(
(chan.uid, cid)
)
merr: Exception|None = ctx.maybe_error
@ -885,11 +879,7 @@ class Actor:
# map {actor uids -> Context}
self._contexts: dict[
tuple[
tuple[str, str], # .uid
str, # .cid
str, # .side
],
tuple[tuple[str, str], str],
Context
] = {}
@ -1373,13 +1363,7 @@ class Actor:
uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}"
try:
ctx: Context = self._contexts[(
uid,
cid,
# TODO: how to determine this tho?
# side,
)]
ctx: Context = self._contexts[(uid, cid)]
except KeyError:
log.warning(
'Ignoring invalid IPC ctx msg!\n\n'
@ -1398,16 +1382,6 @@ class Actor:
cid: str,
nsf: NamespacePath,
# TODO: support lookup by `Context.side: str` ?
# -> would allow making a self-context which might have
# certain special use cases where RPC isolation is wanted
# between 2 tasks running in the same process?
# => prolly needs some deeper though on the real use cases
# and whether or not such things should be better
# implemented using a `TaskManager` style nursery..
#
# side: str|None = None,
msg_buffer_size: int | None = None,
allow_overruns: bool = False,
@ -1423,11 +1397,7 @@ class Actor:
actor_uid = chan.uid
assert actor_uid
try:
ctx = self._contexts[(
actor_uid,
cid,
# side,
)]
ctx = self._contexts[(actor_uid, cid)]
log.runtime(
f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n'
@ -1453,11 +1423,7 @@ class Actor:
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns,
)
self._contexts[(
actor_uid,
cid,
# side,
)] = ctx
self._contexts[(actor_uid, cid)] = ctx
return ctx
@ -1488,8 +1454,6 @@ class Actor:
chan=chan,
cid=cid,
nsf=nsf,
# side='caller',
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)

View File

@ -90,29 +90,19 @@ class MsgStream(trio.abc.Channel):
self._closed: bool|trio.ClosedResourceError = False
# delegate directly to underlying mem channel
def receive_nowait(
self,
allow_msg_keys: list[str] = ['yield'],
):
msg: dict = self._rx_chan.receive_nowait()
for (
i,
key,
) in enumerate(allow_msg_keys):
try:
return msg[key]
except KeyError as kerr:
if i < (len(allow_msg_keys) - 1):
continue
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key=key,
stream=self,
)
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
async def receive(self):
'''
@ -273,9 +263,7 @@ class MsgStream(trio.abc.Channel):
drained: list[Exception|dict] = []
while not drained:
try:
maybe_final_msg = self.receive_nowait(
allow_msg_keys=['yield', 'return'],
)
maybe_final_msg = self.receive_nowait()
if maybe_final_msg:
log.debug(
'Drained un-processed stream msg:\n'