diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index d8e946b..d5767ee 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -1024,6 +1024,8 @@ def test_maybe_allow_overruns_stream( cancel_ctx: bool, slow_side: str, allow_overruns_side: str, + + # conftest wide loglevel: str, debug_mode: bool, ): @@ -1147,3 +1149,52 @@ def test_maybe_allow_overruns_stream( # if this hits the logic blocks from above are not # exhaustive.. pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') + + +def test_ctx_with_self_actor( + loglevel: str, + debug_mode: bool, +): + ''' + NOTE: for now this is an INVALID OP! + + BUT, eventually presuming we add a "side" key to `Actor.get_context()`, + we might be able to get this working symmetrically, but should we?? + + Open a context back to the same actor and ensure all cancellation + and error semantics hold the same. + + ''' + async def main(): + async with tractor.open_nursery( + debug_mode=debug_mode, + enable_modules=[__name__], + ) as an: + assert an + async with ( + tractor.find_actor('root') as portal, + portal.open_context( + expect_cancelled, + # echo_back_sequence, + # seq=seq, + # wait_for_cancel=cancel_ctx, + # be_slow=(slow_side == 'child'), + # allow_overruns_side=allow_overruns_side, + + ) as (ctx, sent), + ctx.open_stream() as ipc, + ): + assert sent is None + + seq = list(range(10)) + for i in seq: + await ipc.send(i) + rx: int = await ipc.receive() + assert rx == i + + await ctx.cancel() + + with pytest.raises(RuntimeError) as excinfo: + trio.run(main) + + assert 'Invalid Operation' in repr(excinfo.value) diff --git a/tractor/_context.py b/tractor/_context.py index a7ce583..a31c3b1 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -1101,6 +1101,8 @@ class Context: chan=self.chan, cid=self.cid, nsf=self._nsf, + # side=self.side, + msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, ) @@ -1298,7 +1300,7 @@ class Context: # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://stackoverflow.com/a/24752607 __tracebackhide__: bool = True - raise remote_error from None + raise remote_error # from None # TODO: change to `.wait_for_result()`? async def result( diff --git a/tractor/_portal.py b/tractor/_portal.py index 5e5fd81..7ac5711 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -514,6 +514,16 @@ class Portal: # a new `_context.py` mod. nsf = NamespacePath.from_ref(func) + # XXX NOTE XXX: currenly we do NOT allow opening a contex + # with "self" since the local feeder mem-chan processing + # is not built for it. + if self.channel.uid == self.actor.uid: + raise RuntimeError( + '** !! Invalid Operation !! **\n' + 'Can not open an IPC ctx with the local actor!\n' + f'|_{self.actor}\n' + ) + ctx: Context = await self.actor.start_remote_task( self.channel, nsf=nsf, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 607f98c..307dacd 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -434,6 +434,10 @@ async def _invoke( chan=chan, cid=cid, nsf=NamespacePath.from_ref(func), + + # TODO: if we wanted to get cray and support it? + # side='callee', + # We shouldn't ever need to pass this through right? # it's up to the soon-to-be called rpc task to # open the stream with this option. @@ -686,9 +690,11 @@ async def _invoke( # don't pop the local context until we know the # associated child isn't in debug any more await maybe_wait_for_debugger() - ctx: Context = actor._contexts.pop( - (chan.uid, cid) - ) + ctx: Context = actor._contexts.pop(( + chan.uid, + cid, + # ctx.side, + )) merr: Exception|None = ctx.maybe_error @@ -879,7 +885,11 @@ class Actor: # map {actor uids -> Context} self._contexts: dict[ - tuple[tuple[str, str], str], + tuple[ + tuple[str, str], # .uid + str, # .cid + str, # .side + ], Context ] = {} @@ -1363,7 +1373,13 @@ class Actor: uid: tuple[str, str] = chan.uid assert uid, f"`chan.uid` can't be {uid}" try: - ctx: Context = self._contexts[(uid, cid)] + ctx: Context = self._contexts[( + uid, + cid, + + # TODO: how to determine this tho? + # side, + )] except KeyError: log.warning( 'Ignoring invalid IPC ctx msg!\n\n' @@ -1382,6 +1398,16 @@ class Actor: cid: str, nsf: NamespacePath, + # TODO: support lookup by `Context.side: str` ? + # -> would allow making a self-context which might have + # certain special use cases where RPC isolation is wanted + # between 2 tasks running in the same process? + # => prolly needs some deeper though on the real use cases + # and whether or not such things should be better + # implemented using a `TaskManager` style nursery.. + # + # side: str|None = None, + msg_buffer_size: int | None = None, allow_overruns: bool = False, @@ -1397,7 +1423,11 @@ class Actor: actor_uid = chan.uid assert actor_uid try: - ctx = self._contexts[(actor_uid, cid)] + ctx = self._contexts[( + actor_uid, + cid, + # side, + )] log.runtime( f'Retreived cached IPC ctx for\n' f'peer: {chan.uid}\n' @@ -1423,7 +1453,11 @@ class Actor: msg_buffer_size=msg_buffer_size or self.msg_buffer_size, _allow_overruns=allow_overruns, ) - self._contexts[(actor_uid, cid)] = ctx + self._contexts[( + actor_uid, + cid, + # side, + )] = ctx return ctx @@ -1454,6 +1488,8 @@ class Actor: chan=chan, cid=cid, nsf=nsf, + + # side='caller', msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, )