Compare commits
No commits in common. "8c39b8b124de203f5ff244d20016b0416136fc57" and "f067cf48a73bbf03072921664f6604017ad81e40" have entirely different histories.
8c39b8b124
...
f067cf48a7
|
@ -1,4 +1,4 @@
|
||||||
# vim: ft=conf
|
# vim: ft=ini
|
||||||
# pytest.ini for tractor
|
# pytest.ini for tractor
|
||||||
|
|
||||||
[pytest]
|
[pytest]
|
||||||
|
|
|
@ -18,10 +18,7 @@ from conftest import (
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'debug_mode',
|
'debug_mode',
|
||||||
[False, True],
|
[False, True],
|
||||||
ids=[
|
ids=['no_debug_mode', 'debug_mode'],
|
||||||
'no_debug_mode',
|
|
||||||
'debug_mode',
|
|
||||||
],
|
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'ipc_break',
|
'ipc_break',
|
||||||
|
|
|
@ -6,7 +6,6 @@ from collections import Counter
|
||||||
import itertools
|
import itertools
|
||||||
import platform
|
import platform
|
||||||
|
|
||||||
import pytest
|
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
@ -144,16 +143,8 @@ def test_dynamic_pub_sub():
|
||||||
|
|
||||||
try:
|
try:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
except (
|
except trio.TooSlowError:
|
||||||
trio.TooSlowError,
|
pass
|
||||||
ExceptionGroup,
|
|
||||||
) as err:
|
|
||||||
if isinstance(err, ExceptionGroup):
|
|
||||||
for suberr in err.exceptions:
|
|
||||||
if isinstance(suberr, trio.TooSlowError):
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
pytest.fail('Never got a `TooSlowError` ?')
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -1024,8 +1024,6 @@ def test_maybe_allow_overruns_stream(
|
||||||
cancel_ctx: bool,
|
cancel_ctx: bool,
|
||||||
slow_side: str,
|
slow_side: str,
|
||||||
allow_overruns_side: str,
|
allow_overruns_side: str,
|
||||||
|
|
||||||
# conftest wide
|
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
|
@ -1149,52 +1147,3 @@ def test_maybe_allow_overruns_stream(
|
||||||
# if this hits the logic blocks from above are not
|
# if this hits the logic blocks from above are not
|
||||||
# exhaustive..
|
# exhaustive..
|
||||||
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
||||||
|
|
||||||
|
|
||||||
def test_ctx_with_self_actor(
|
|
||||||
loglevel: str,
|
|
||||||
debug_mode: bool,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
NOTE: for now this is an INVALID OP!
|
|
||||||
|
|
||||||
BUT, eventually presuming we add a "side" key to `Actor.get_context()`,
|
|
||||||
we might be able to get this working symmetrically, but should we??
|
|
||||||
|
|
||||||
Open a context back to the same actor and ensure all cancellation
|
|
||||||
and error semantics hold the same.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def main():
|
|
||||||
async with tractor.open_nursery(
|
|
||||||
debug_mode=debug_mode,
|
|
||||||
enable_modules=[__name__],
|
|
||||||
) as an:
|
|
||||||
assert an
|
|
||||||
async with (
|
|
||||||
tractor.find_actor('root') as portal,
|
|
||||||
portal.open_context(
|
|
||||||
expect_cancelled,
|
|
||||||
# echo_back_sequence,
|
|
||||||
# seq=seq,
|
|
||||||
# wait_for_cancel=cancel_ctx,
|
|
||||||
# be_slow=(slow_side == 'child'),
|
|
||||||
# allow_overruns_side=allow_overruns_side,
|
|
||||||
|
|
||||||
) as (ctx, sent),
|
|
||||||
ctx.open_stream() as ipc,
|
|
||||||
):
|
|
||||||
assert sent is None
|
|
||||||
|
|
||||||
seq = list(range(10))
|
|
||||||
for i in seq:
|
|
||||||
await ipc.send(i)
|
|
||||||
rx: int = await ipc.receive()
|
|
||||||
assert rx == i
|
|
||||||
|
|
||||||
await ctx.cancel()
|
|
||||||
|
|
||||||
with pytest.raises(RuntimeError) as excinfo:
|
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
assert 'Invalid Operation' in repr(excinfo.value)
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
|
||||||
async def asyncio_actor(
|
async def asyncio_actor(
|
||||||
|
|
||||||
target: str,
|
target: str,
|
||||||
expect_err: Exception|None = None
|
expect_err: Optional[Exception] = None
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
@ -114,21 +114,10 @@ def test_aio_simple_error(reg_addr):
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
expected_exception=(RemoteActorError, ExceptionGroup),
|
|
||||||
) as excinfo:
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
err = excinfo.value
|
err = excinfo.value
|
||||||
|
|
||||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
|
||||||
if isinstance(err, ExceptionGroup):
|
|
||||||
err = next(itertools.dropwhile(
|
|
||||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
|
||||||
err.exceptions
|
|
||||||
))
|
|
||||||
assert err
|
|
||||||
|
|
||||||
assert isinstance(err, RemoteActorError)
|
assert isinstance(err, RemoteActorError)
|
||||||
assert err.type == AssertionError
|
assert err.type == AssertionError
|
||||||
|
|
||||||
|
@ -301,22 +290,11 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
expected_exception=(RemoteActorError, ExceptionGroup),
|
|
||||||
) as excinfo:
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
|
||||||
err = excinfo.value
|
|
||||||
if isinstance(err, ExceptionGroup):
|
|
||||||
err = next(itertools.dropwhile(
|
|
||||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
|
||||||
err.exceptions
|
|
||||||
))
|
|
||||||
assert err
|
|
||||||
|
|
||||||
# ensure boxed error is correct
|
# ensure boxed error is correct
|
||||||
assert err.type == to_asyncio.AsyncioCancelled
|
assert excinfo.value.type == to_asyncio.AsyncioCancelled
|
||||||
|
|
||||||
|
|
||||||
# TODO: verify open_channel_from will fail on this..
|
# TODO: verify open_channel_from will fail on this..
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
'''
|
"""
|
||||||
RPC (or maybe better labelled as "RTS: remote task scheduling"?)
|
RPC related
|
||||||
related API and error checks.
|
"""
|
||||||
|
|
||||||
'''
|
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -54,13 +52,8 @@ async def short_sleep():
|
||||||
(['tmp_mod'], 'import doggy', ModuleNotFoundError),
|
(['tmp_mod'], 'import doggy', ModuleNotFoundError),
|
||||||
(['tmp_mod'], '4doggy', SyntaxError),
|
(['tmp_mod'], '4doggy', SyntaxError),
|
||||||
],
|
],
|
||||||
ids=[
|
ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
|
||||||
'no_mods',
|
'fail_on_syntax'],
|
||||||
'this_mod',
|
|
||||||
'this_mod_bad_func',
|
|
||||||
'fail_to_import',
|
|
||||||
'fail_on_syntax',
|
|
||||||
],
|
|
||||||
)
|
)
|
||||||
def test_rpc_errors(
|
def test_rpc_errors(
|
||||||
reg_addr,
|
reg_addr,
|
||||||
|
@ -134,9 +127,7 @@ def test_rpc_errors(
|
||||||
run()
|
run()
|
||||||
else:
|
else:
|
||||||
# underlying errors aren't propagated upwards (yet)
|
# underlying errors aren't propagated upwards (yet)
|
||||||
with pytest.raises(
|
with pytest.raises(remote_err) as err:
|
||||||
expected_exception=(remote_err, ExceptionGroup),
|
|
||||||
) as err:
|
|
||||||
run()
|
run()
|
||||||
|
|
||||||
# get raw instance from pytest wrapper
|
# get raw instance from pytest wrapper
|
||||||
|
|
|
@ -1101,8 +1101,6 @@ class Context:
|
||||||
chan=self.chan,
|
chan=self.chan,
|
||||||
cid=self.cid,
|
cid=self.cid,
|
||||||
nsf=self._nsf,
|
nsf=self._nsf,
|
||||||
# side=self.side,
|
|
||||||
|
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
@ -1300,7 +1298,7 @@ class Context:
|
||||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||||
# https://stackoverflow.com/a/24752607
|
# https://stackoverflow.com/a/24752607
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
raise remote_error # from None
|
raise remote_error from None
|
||||||
|
|
||||||
# TODO: change to `.wait_for_result()`?
|
# TODO: change to `.wait_for_result()`?
|
||||||
async def result(
|
async def result(
|
||||||
|
|
|
@ -162,8 +162,8 @@ async def query_actor(
|
||||||
@acm
|
@acm
|
||||||
async def find_actor(
|
async def find_actor(
|
||||||
name: str,
|
name: str,
|
||||||
arbiter_sockaddr: tuple[str, int]|None = None,
|
arbiter_sockaddr: tuple[str, int] | None = None,
|
||||||
registry_addrs: list[tuple[str, int]]|None = None,
|
registry_addrs: list[tuple[str, int]] | None = None,
|
||||||
|
|
||||||
only_first: bool = True,
|
only_first: bool = True,
|
||||||
raise_on_none: bool = False,
|
raise_on_none: bool = False,
|
||||||
|
|
|
@ -514,16 +514,6 @@ class Portal:
|
||||||
# a new `_context.py` mod.
|
# a new `_context.py` mod.
|
||||||
nsf = NamespacePath.from_ref(func)
|
nsf = NamespacePath.from_ref(func)
|
||||||
|
|
||||||
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
|
||||||
# with "self" since the local feeder mem-chan processing
|
|
||||||
# is not built for it.
|
|
||||||
if self.channel.uid == self.actor.uid:
|
|
||||||
raise RuntimeError(
|
|
||||||
'** !! Invalid Operation !! **\n'
|
|
||||||
'Can not open an IPC ctx with the local actor!\n'
|
|
||||||
f'|_{self.actor}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx: Context = await self.actor.start_remote_task(
|
ctx: Context = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
|
|
|
@ -63,26 +63,26 @@ async def open_root_actor(
|
||||||
|
|
||||||
*,
|
*,
|
||||||
# defaults are above
|
# defaults are above
|
||||||
registry_addrs: list[tuple[str, int]]|None = None,
|
registry_addrs: list[tuple[str, int]] | None = None,
|
||||||
|
|
||||||
# defaults are above
|
# defaults are above
|
||||||
arbiter_addr: tuple[str, int]|None = None,
|
arbiter_addr: tuple[str, int] | None = None,
|
||||||
|
|
||||||
name: str|None = 'root',
|
name: str | None = 'root',
|
||||||
|
|
||||||
# either the `multiprocessing` start method:
|
# either the `multiprocessing` start method:
|
||||||
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||||||
# OR `trio` (the new default).
|
# OR `trio` (the new default).
|
||||||
start_method: _spawn.SpawnMethodKey|None = None,
|
start_method: _spawn.SpawnMethodKey | None = None,
|
||||||
|
|
||||||
# enables the multi-process debugger support
|
# enables the multi-process debugger support
|
||||||
debug_mode: bool = False,
|
debug_mode: bool = False,
|
||||||
|
|
||||||
# internal logging
|
# internal logging
|
||||||
loglevel: str|None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
enable_modules: list|None = None,
|
enable_modules: list | None = None,
|
||||||
rpc_module_paths: list|None = None,
|
rpc_module_paths: list | None = None,
|
||||||
|
|
||||||
# NOTE: allow caller to ensure that only one registry exists
|
# NOTE: allow caller to ensure that only one registry exists
|
||||||
# and that this call creates it.
|
# and that this call creates it.
|
||||||
|
@ -108,11 +108,7 @@ async def open_root_actor(
|
||||||
_state._runtime_vars['_is_root'] = True
|
_state._runtime_vars['_is_root'] = True
|
||||||
|
|
||||||
# caps based rpc list
|
# caps based rpc list
|
||||||
enable_modules = (
|
enable_modules = enable_modules or []
|
||||||
enable_modules
|
|
||||||
or
|
|
||||||
[]
|
|
||||||
)
|
|
||||||
|
|
||||||
if rpc_module_paths:
|
if rpc_module_paths:
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
|
|
|
@ -434,10 +434,6 @@ async def _invoke(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
nsf=NamespacePath.from_ref(func),
|
nsf=NamespacePath.from_ref(func),
|
||||||
|
|
||||||
# TODO: if we wanted to get cray and support it?
|
|
||||||
# side='callee',
|
|
||||||
|
|
||||||
# We shouldn't ever need to pass this through right?
|
# We shouldn't ever need to pass this through right?
|
||||||
# it's up to the soon-to-be called rpc task to
|
# it's up to the soon-to-be called rpc task to
|
||||||
# open the stream with this option.
|
# open the stream with this option.
|
||||||
|
@ -690,11 +686,9 @@ async def _invoke(
|
||||||
# don't pop the local context until we know the
|
# don't pop the local context until we know the
|
||||||
# associated child isn't in debug any more
|
# associated child isn't in debug any more
|
||||||
await maybe_wait_for_debugger()
|
await maybe_wait_for_debugger()
|
||||||
ctx: Context = actor._contexts.pop((
|
ctx: Context = actor._contexts.pop(
|
||||||
chan.uid,
|
(chan.uid, cid)
|
||||||
cid,
|
)
|
||||||
# ctx.side,
|
|
||||||
))
|
|
||||||
|
|
||||||
merr: Exception|None = ctx.maybe_error
|
merr: Exception|None = ctx.maybe_error
|
||||||
|
|
||||||
|
@ -885,11 +879,7 @@ class Actor:
|
||||||
|
|
||||||
# map {actor uids -> Context}
|
# map {actor uids -> Context}
|
||||||
self._contexts: dict[
|
self._contexts: dict[
|
||||||
tuple[
|
tuple[tuple[str, str], str],
|
||||||
tuple[str, str], # .uid
|
|
||||||
str, # .cid
|
|
||||||
str, # .side
|
|
||||||
],
|
|
||||||
Context
|
Context
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
|
@ -1373,13 +1363,7 @@ class Actor:
|
||||||
uid: tuple[str, str] = chan.uid
|
uid: tuple[str, str] = chan.uid
|
||||||
assert uid, f"`chan.uid` can't be {uid}"
|
assert uid, f"`chan.uid` can't be {uid}"
|
||||||
try:
|
try:
|
||||||
ctx: Context = self._contexts[(
|
ctx: Context = self._contexts[(uid, cid)]
|
||||||
uid,
|
|
||||||
cid,
|
|
||||||
|
|
||||||
# TODO: how to determine this tho?
|
|
||||||
# side,
|
|
||||||
)]
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
|
@ -1398,16 +1382,6 @@ class Actor:
|
||||||
cid: str,
|
cid: str,
|
||||||
nsf: NamespacePath,
|
nsf: NamespacePath,
|
||||||
|
|
||||||
# TODO: support lookup by `Context.side: str` ?
|
|
||||||
# -> would allow making a self-context which might have
|
|
||||||
# certain special use cases where RPC isolation is wanted
|
|
||||||
# between 2 tasks running in the same process?
|
|
||||||
# => prolly needs some deeper though on the real use cases
|
|
||||||
# and whether or not such things should be better
|
|
||||||
# implemented using a `TaskManager` style nursery..
|
|
||||||
#
|
|
||||||
# side: str|None = None,
|
|
||||||
|
|
||||||
msg_buffer_size: int | None = None,
|
msg_buffer_size: int | None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
|
|
||||||
|
@ -1423,11 +1397,7 @@ class Actor:
|
||||||
actor_uid = chan.uid
|
actor_uid = chan.uid
|
||||||
assert actor_uid
|
assert actor_uid
|
||||||
try:
|
try:
|
||||||
ctx = self._contexts[(
|
ctx = self._contexts[(actor_uid, cid)]
|
||||||
actor_uid,
|
|
||||||
cid,
|
|
||||||
# side,
|
|
||||||
)]
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Retreived cached IPC ctx for\n'
|
f'Retreived cached IPC ctx for\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
|
@ -1453,11 +1423,7 @@ class Actor:
|
||||||
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
||||||
_allow_overruns=allow_overruns,
|
_allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
self._contexts[(
|
self._contexts[(actor_uid, cid)] = ctx
|
||||||
actor_uid,
|
|
||||||
cid,
|
|
||||||
# side,
|
|
||||||
)] = ctx
|
|
||||||
|
|
||||||
return ctx
|
return ctx
|
||||||
|
|
||||||
|
@ -1488,8 +1454,6 @@ class Actor:
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
nsf=nsf,
|
nsf=nsf,
|
||||||
|
|
||||||
# side='caller',
|
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
|
|
@ -90,29 +90,19 @@ class MsgStream(trio.abc.Channel):
|
||||||
self._closed: bool|trio.ClosedResourceError = False
|
self._closed: bool|trio.ClosedResourceError = False
|
||||||
|
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(
|
def receive_nowait(self):
|
||||||
self,
|
msg = self._rx_chan.receive_nowait()
|
||||||
allow_msg_keys: list[str] = ['yield'],
|
try:
|
||||||
):
|
return msg['yield']
|
||||||
msg: dict = self._rx_chan.receive_nowait()
|
except KeyError as kerr:
|
||||||
for (
|
_raise_from_no_key_in_msg(
|
||||||
i,
|
ctx=self._ctx,
|
||||||
key,
|
msg=msg,
|
||||||
) in enumerate(allow_msg_keys):
|
src_err=kerr,
|
||||||
try:
|
log=log,
|
||||||
return msg[key]
|
expect_key='yield',
|
||||||
except KeyError as kerr:
|
stream=self,
|
||||||
if i < (len(allow_msg_keys) - 1):
|
)
|
||||||
continue
|
|
||||||
|
|
||||||
_raise_from_no_key_in_msg(
|
|
||||||
ctx=self._ctx,
|
|
||||||
msg=msg,
|
|
||||||
src_err=kerr,
|
|
||||||
log=log,
|
|
||||||
expect_key=key,
|
|
||||||
stream=self,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def receive(self):
|
async def receive(self):
|
||||||
'''
|
'''
|
||||||
|
@ -273,9 +263,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
drained: list[Exception|dict] = []
|
drained: list[Exception|dict] = []
|
||||||
while not drained:
|
while not drained:
|
||||||
try:
|
try:
|
||||||
maybe_final_msg = self.receive_nowait(
|
maybe_final_msg = self.receive_nowait()
|
||||||
allow_msg_keys=['yield', 'return'],
|
|
||||||
)
|
|
||||||
if maybe_final_msg:
|
if maybe_final_msg:
|
||||||
log.debug(
|
log.debug(
|
||||||
'Drained un-processed stream msg:\n'
|
'Drained un-processed stream msg:\n'
|
||||||
|
|
Loading…
Reference in New Issue