Compare commits

..

No commits in common. "8c39b8b124de203f5ff244d20016b0416136fc57" and "f067cf48a73bbf03072921664f6604017ad81e40" have entirely different histories.

12 changed files with 46 additions and 204 deletions

View File

@ -1,4 +1,4 @@
# vim: ft=conf # vim: ft=ini
# pytest.ini for tractor # pytest.ini for tractor
[pytest] [pytest]

View File

@ -18,10 +18,7 @@ from conftest import (
@pytest.mark.parametrize( @pytest.mark.parametrize(
'debug_mode', 'debug_mode',
[False, True], [False, True],
ids=[ ids=['no_debug_mode', 'debug_mode'],
'no_debug_mode',
'debug_mode',
],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'ipc_break', 'ipc_break',

View File

@ -6,7 +6,6 @@ from collections import Counter
import itertools import itertools
import platform import platform
import pytest
import trio import trio
import tractor import tractor
@ -144,16 +143,8 @@ def test_dynamic_pub_sub():
try: try:
trio.run(main) trio.run(main)
except ( except trio.TooSlowError:
trio.TooSlowError, pass
ExceptionGroup,
) as err:
if isinstance(err, ExceptionGroup):
for suberr in err.exceptions:
if isinstance(suberr, trio.TooSlowError):
break
else:
pytest.fail('Never got a `TooSlowError` ?')
@tractor.context @tractor.context

View File

@ -1024,8 +1024,6 @@ def test_maybe_allow_overruns_stream(
cancel_ctx: bool, cancel_ctx: bool,
slow_side: str, slow_side: str,
allow_overruns_side: str, allow_overruns_side: str,
# conftest wide
loglevel: str, loglevel: str,
debug_mode: bool, debug_mode: bool,
): ):
@ -1149,52 +1147,3 @@ def test_maybe_allow_overruns_stream(
# if this hits the logic blocks from above are not # if this hits the logic blocks from above are not
# exhaustive.. # exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
def test_ctx_with_self_actor(
loglevel: str,
debug_mode: bool,
):
'''
NOTE: for now this is an INVALID OP!
BUT, eventually presuming we add a "side" key to `Actor.get_context()`,
we might be able to get this working symmetrically, but should we??
Open a context back to the same actor and ensure all cancellation
and error semantics hold the same.
'''
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_modules=[__name__],
) as an:
assert an
async with (
tractor.find_actor('root') as portal,
portal.open_context(
expect_cancelled,
# echo_back_sequence,
# seq=seq,
# wait_for_cancel=cancel_ctx,
# be_slow=(slow_side == 'child'),
# allow_overruns_side=allow_overruns_side,
) as (ctx, sent),
ctx.open_stream() as ipc,
):
assert sent is None
seq = list(range(10))
for i in seq:
await ipc.send(i)
rx: int = await ipc.receive()
assert rx == i
await ctx.cancel()
with pytest.raises(RuntimeError) as excinfo:
trio.run(main)
assert 'Invalid Operation' in repr(excinfo.value)

View File

@ -70,7 +70,7 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
async def asyncio_actor( async def asyncio_actor(
target: str, target: str,
expect_err: Exception|None = None expect_err: Optional[Exception] = None
) -> None: ) -> None:
@ -114,21 +114,10 @@ def test_aio_simple_error(reg_addr):
infect_asyncio=True, infect_asyncio=True,
) )
with pytest.raises( with pytest.raises(RemoteActorError) as excinfo:
expected_exception=(RemoteActorError, ExceptionGroup),
) as excinfo:
trio.run(main) trio.run(main)
err = excinfo.value err = excinfo.value
# might get multiple `trio.Cancelled`s as well inside an inception
if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
err.exceptions
))
assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.type == AssertionError assert err.type == AssertionError
@ -301,22 +290,11 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
infect_asyncio=True, infect_asyncio=True,
) )
with pytest.raises( with pytest.raises(RemoteActorError) as excinfo:
expected_exception=(RemoteActorError, ExceptionGroup),
) as excinfo:
trio.run(main) trio.run(main)
# might get multiple `trio.Cancelled`s as well inside an inception
err = excinfo.value
if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
err.exceptions
))
assert err
# ensure boxed error is correct # ensure boxed error is correct
assert err.type == to_asyncio.AsyncioCancelled assert excinfo.value.type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this.. # TODO: verify open_channel_from will fail on this..

View File

@ -1,8 +1,6 @@
''' """
RPC (or maybe better labelled as "RTS: remote task scheduling"?) RPC related
related API and error checks. """
'''
import itertools import itertools
import pytest import pytest
@ -54,13 +52,8 @@ async def short_sleep():
(['tmp_mod'], 'import doggy', ModuleNotFoundError), (['tmp_mod'], 'import doggy', ModuleNotFoundError),
(['tmp_mod'], '4doggy', SyntaxError), (['tmp_mod'], '4doggy', SyntaxError),
], ],
ids=[ ids=['no_mods', 'this_mod', 'this_mod_bad_func', 'fail_to_import',
'no_mods', 'fail_on_syntax'],
'this_mod',
'this_mod_bad_func',
'fail_to_import',
'fail_on_syntax',
],
) )
def test_rpc_errors( def test_rpc_errors(
reg_addr, reg_addr,
@ -134,9 +127,7 @@ def test_rpc_errors(
run() run()
else: else:
# underlying errors aren't propagated upwards (yet) # underlying errors aren't propagated upwards (yet)
with pytest.raises( with pytest.raises(remote_err) as err:
expected_exception=(remote_err, ExceptionGroup),
) as err:
run() run()
# get raw instance from pytest wrapper # get raw instance from pytest wrapper

View File

@ -1101,8 +1101,6 @@ class Context:
chan=self.chan, chan=self.chan,
cid=self.cid, cid=self.cid,
nsf=self._nsf, nsf=self._nsf,
# side=self.side,
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
@ -1300,7 +1298,7 @@ class Context:
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607 # https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True __tracebackhide__: bool = True
raise remote_error # from None raise remote_error from None
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
async def result( async def result(

View File

@ -162,8 +162,8 @@ async def query_actor(
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int]|None = None, arbiter_sockaddr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]]|None = None, registry_addrs: list[tuple[str, int]] | None = None,
only_first: bool = True, only_first: bool = True,
raise_on_none: bool = False, raise_on_none: bool = False,

View File

@ -514,16 +514,6 @@ class Portal:
# a new `_context.py` mod. # a new `_context.py` mod.
nsf = NamespacePath.from_ref(func) nsf = NamespacePath.from_ref(func)
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if self.channel.uid == self.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
f'|_{self.actor}\n'
)
ctx: Context = await self.actor.start_remote_task( ctx: Context = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=nsf, nsf=nsf,

View File

@ -63,26 +63,26 @@ async def open_root_actor(
*, *,
# defaults are above # defaults are above
registry_addrs: list[tuple[str, int]]|None = None, registry_addrs: list[tuple[str, int]] | None = None,
# defaults are above # defaults are above
arbiter_addr: tuple[str, int]|None = None, arbiter_addr: tuple[str, int] | None = None,
name: str|None = 'root', name: str | None = 'root',
# either the `multiprocessing` start method: # either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default). # OR `trio` (the new default).
start_method: _spawn.SpawnMethodKey|None = None, start_method: _spawn.SpawnMethodKey | None = None,
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
# internal logging # internal logging
loglevel: str|None = None, loglevel: str | None = None,
enable_modules: list|None = None, enable_modules: list | None = None,
rpc_module_paths: list|None = None, rpc_module_paths: list | None = None,
# NOTE: allow caller to ensure that only one registry exists # NOTE: allow caller to ensure that only one registry exists
# and that this call creates it. # and that this call creates it.
@ -108,11 +108,7 @@ async def open_root_actor(
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True
# caps based rpc list # caps based rpc list
enable_modules = ( enable_modules = enable_modules or []
enable_modules
or
[]
)
if rpc_module_paths: if rpc_module_paths:
warnings.warn( warnings.warn(

View File

@ -434,10 +434,6 @@ async def _invoke(
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=NamespacePath.from_ref(func), nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it?
# side='callee',
# We shouldn't ever need to pass this through right? # We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to # it's up to the soon-to-be called rpc task to
# open the stream with this option. # open the stream with this option.
@ -690,11 +686,9 @@ async def _invoke(
# don't pop the local context until we know the # don't pop the local context until we know the
# associated child isn't in debug any more # associated child isn't in debug any more
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop(( ctx: Context = actor._contexts.pop(
chan.uid, (chan.uid, cid)
cid, )
# ctx.side,
))
merr: Exception|None = ctx.maybe_error merr: Exception|None = ctx.maybe_error
@ -885,11 +879,7 @@ class Actor:
# map {actor uids -> Context} # map {actor uids -> Context}
self._contexts: dict[ self._contexts: dict[
tuple[ tuple[tuple[str, str], str],
tuple[str, str], # .uid
str, # .cid
str, # .side
],
Context Context
] = {} ] = {}
@ -1373,13 +1363,7 @@ class Actor:
uid: tuple[str, str] = chan.uid uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}" assert uid, f"`chan.uid` can't be {uid}"
try: try:
ctx: Context = self._contexts[( ctx: Context = self._contexts[(uid, cid)]
uid,
cid,
# TODO: how to determine this tho?
# side,
)]
except KeyError: except KeyError:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
@ -1398,16 +1382,6 @@ class Actor:
cid: str, cid: str,
nsf: NamespacePath, nsf: NamespacePath,
# TODO: support lookup by `Context.side: str` ?
# -> would allow making a self-context which might have
# certain special use cases where RPC isolation is wanted
# between 2 tasks running in the same process?
# => prolly needs some deeper though on the real use cases
# and whether or not such things should be better
# implemented using a `TaskManager` style nursery..
#
# side: str|None = None,
msg_buffer_size: int | None = None, msg_buffer_size: int | None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
@ -1423,11 +1397,7 @@ class Actor:
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
ctx = self._contexts[( ctx = self._contexts[(actor_uid, cid)]
actor_uid,
cid,
# side,
)]
log.runtime( log.runtime(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
@ -1453,11 +1423,7 @@ class Actor:
msg_buffer_size=msg_buffer_size or self.msg_buffer_size, msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns, _allow_overruns=allow_overruns,
) )
self._contexts[( self._contexts[(actor_uid, cid)] = ctx
actor_uid,
cid,
# side,
)] = ctx
return ctx return ctx
@ -1488,8 +1454,6 @@ class Actor:
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=nsf, nsf=nsf,
# side='caller',
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )

View File

@ -90,29 +90,19 @@ class MsgStream(trio.abc.Channel):
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait( def receive_nowait(self):
self, msg = self._rx_chan.receive_nowait()
allow_msg_keys: list[str] = ['yield'], try:
): return msg['yield']
msg: dict = self._rx_chan.receive_nowait() except KeyError as kerr:
for ( _raise_from_no_key_in_msg(
i, ctx=self._ctx,
key, msg=msg,
) in enumerate(allow_msg_keys): src_err=kerr,
try: log=log,
return msg[key] expect_key='yield',
except KeyError as kerr: stream=self,
if i < (len(allow_msg_keys) - 1): )
continue
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key=key,
stream=self,
)
async def receive(self): async def receive(self):
''' '''
@ -273,9 +263,7 @@ class MsgStream(trio.abc.Channel):
drained: list[Exception|dict] = [] drained: list[Exception|dict] = []
while not drained: while not drained:
try: try:
maybe_final_msg = self.receive_nowait( maybe_final_msg = self.receive_nowait()
allow_msg_keys=['yield', 'return'],
)
if maybe_final_msg: if maybe_final_msg:
log.debug( log.debug(
'Drained un-processed stream msg:\n' 'Drained un-processed stream msg:\n'