From cbaf4fc05b7e47f8bcf633410b7015f16132643a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Mar 2024 10:24:44 -0400 Subject: [PATCH] Add a open-ctx-with-self test Found exactly why trying this won't work when playing around with opening workspaces in `modden` using a `Portal.open_context()` back to the 'bigd' root actor: the RPC machinery only registers one entry in `Actor._contexts` which will get overwritten by each task's side and then experience race-based IPC msging errors (eg. rxing `{'started': _}` on the callee side..). Instead make opening a ctx back to the self-actor a runtime error describing it as an invalid op. To match: - add a new test `test_ctx_with_self_actor()` to the context semantics suite. - tried out adding a new `side: str` to the `Actor.get_context()` (and callers) but ran into not being able to determine the value from in `._push_result()` where it's needed to figure out which side to push to.. So, just leaving the commented arg (passing) in the runtime core for now in case we can come back to trying to make it work, tho i'm thinking it's not the right hack anyway XD --- tests/test_context_stream_semantics.py | 51 ++++++++++++++++++++++++++ tractor/_context.py | 4 +- tractor/_portal.py | 10 +++++ tractor/_runtime.py | 50 +++++++++++++++++++++---- 4 files changed, 107 insertions(+), 8 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index d8e946bf..d5767eec 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -1024,6 +1024,8 @@ def test_maybe_allow_overruns_stream( cancel_ctx: bool, slow_side: str, allow_overruns_side: str, + + # conftest wide loglevel: str, debug_mode: bool, ): @@ -1147,3 +1149,52 @@ def test_maybe_allow_overruns_stream( # if this hits the logic blocks from above are not # exhaustive.. pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') + + +def test_ctx_with_self_actor( + loglevel: str, + debug_mode: bool, +): + ''' + NOTE: for now this is an INVALID OP! + + BUT, eventually presuming we add a "side" key to `Actor.get_context()`, + we might be able to get this working symmetrically, but should we?? + + Open a context back to the same actor and ensure all cancellation + and error semantics hold the same. + + ''' + async def main(): + async with tractor.open_nursery( + debug_mode=debug_mode, + enable_modules=[__name__], + ) as an: + assert an + async with ( + tractor.find_actor('root') as portal, + portal.open_context( + expect_cancelled, + # echo_back_sequence, + # seq=seq, + # wait_for_cancel=cancel_ctx, + # be_slow=(slow_side == 'child'), + # allow_overruns_side=allow_overruns_side, + + ) as (ctx, sent), + ctx.open_stream() as ipc, + ): + assert sent is None + + seq = list(range(10)) + for i in seq: + await ipc.send(i) + rx: int = await ipc.receive() + assert rx == i + + await ctx.cancel() + + with pytest.raises(RuntimeError) as excinfo: + trio.run(main) + + assert 'Invalid Operation' in repr(excinfo.value) diff --git a/tractor/_context.py b/tractor/_context.py index a7ce5832..a31c3b1b 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -1101,6 +1101,8 @@ class Context: chan=self.chan, cid=self.cid, nsf=self._nsf, + # side=self.side, + msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, ) @@ -1298,7 +1300,7 @@ class Context: # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://stackoverflow.com/a/24752607 __tracebackhide__: bool = True - raise remote_error from None + raise remote_error # from None # TODO: change to `.wait_for_result()`? async def result( diff --git a/tractor/_portal.py b/tractor/_portal.py index 27fbaec1..c9f314f3 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -514,6 +514,16 @@ class Portal: # a new `_context.py` mod. nsf = NamespacePath.from_ref(func) + # XXX NOTE XXX: currenly we do NOT allow opening a contex + # with "self" since the local feeder mem-chan processing + # is not built for it. + if self.channel.uid == self.actor.uid: + raise RuntimeError( + '** !! Invalid Operation !! **\n' + 'Can not open an IPC ctx with the local actor!\n' + f'|_{self.actor}\n' + ) + ctx: Context = await self.actor.start_remote_task( self.channel, nsf=nsf, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 0956042e..a06b5948 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -427,6 +427,10 @@ async def _invoke( chan=chan, cid=cid, nsf=NamespacePath.from_ref(func), + + # TODO: if we wanted to get cray and support it? + # side='callee', + # We shouldn't ever need to pass this through right? # it's up to the soon-to-be called rpc task to # open the stream with this option. @@ -679,9 +683,11 @@ async def _invoke( # don't pop the local context until we know the # associated child isn't in debug any more await _debug.maybe_wait_for_debugger() - ctx: Context = actor._contexts.pop( - (chan.uid, cid) - ) + ctx: Context = actor._contexts.pop(( + chan.uid, + cid, + # ctx.side, + )) merr: Exception|None = ctx.maybe_error @@ -860,7 +866,11 @@ class Actor: # map {actor uids -> Context} self._contexts: dict[ - tuple[tuple[str, str], str], + tuple[ + tuple[str, str], # .uid + str, # .cid + str, # .side + ], Context ] = {} @@ -1302,7 +1312,13 @@ class Actor: uid: tuple[str, str] = chan.uid assert uid, f"`chan.uid` can't be {uid}" try: - ctx: Context = self._contexts[(uid, cid)] + ctx: Context = self._contexts[( + uid, + cid, + + # TODO: how to determine this tho? + # side, + )] except KeyError: log.warning( 'Ignoring invalid IPC ctx msg!\n\n' @@ -1321,6 +1337,16 @@ class Actor: cid: str, nsf: NamespacePath, + # TODO: support lookup by `Context.side: str` ? + # -> would allow making a self-context which might have + # certain special use cases where RPC isolation is wanted + # between 2 tasks running in the same process? + # => prolly needs some deeper though on the real use cases + # and whether or not such things should be better + # implemented using a `TaskManager` style nursery.. + # + # side: str|None = None, + msg_buffer_size: int | None = None, allow_overruns: bool = False, @@ -1336,7 +1362,11 @@ class Actor: actor_uid = chan.uid assert actor_uid try: - ctx = self._contexts[(actor_uid, cid)] + ctx = self._contexts[( + actor_uid, + cid, + # side, + )] log.runtime( f'Retreived cached IPC ctx for\n' f'peer: {chan.uid}\n' @@ -1362,7 +1392,11 @@ class Actor: msg_buffer_size=msg_buffer_size or self.msg_buffer_size, _allow_overruns=allow_overruns, ) - self._contexts[(actor_uid, cid)] = ctx + self._contexts[( + actor_uid, + cid, + # side, + )] = ctx return ctx @@ -1393,6 +1427,8 @@ class Actor: chan=chan, cid=cid, nsf=nsf, + + # side='caller', msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, )