forked from goodboy/tractor
1
0
Fork 0

Add a open-ctx-with-self test

Found exactly why trying this won't work when playing around with
opening workspaces in `modden` using a `Portal.open_context()` back to
the 'bigd' root actor: the RPC machinery only registers one entry in
`Actor._contexts` which will get overwritten by each task's side and
then experience race-based IPC msging errors (eg. rxing `{'started': _}`
on the callee side..). Instead make opening a ctx back to the self-actor
a runtime error describing it as an invalid op.

To match:
- add a new test `test_ctx_with_self_actor()` to the context semantics
  suite.
- tried out adding a new `side: str` to the `Actor.get_context()` (and
  callers) but ran into not being able to determine the value from in
  `._push_result()` where it's needed to figure out which side to push
  to.. So, just leaving the commented arg (passing) in the runtime core
  for now in case we can come back to trying to make it work, tho i'm
  thinking it's not the right hack anyway XD
modden_spawn_from_client_req
Tyler Goodlet 2024-03-11 10:24:44 -04:00
parent 37ee477aee
commit dd168184c3
4 changed files with 107 additions and 8 deletions

View File

@ -1024,6 +1024,8 @@ def test_maybe_allow_overruns_stream(
cancel_ctx: bool, cancel_ctx: bool,
slow_side: str, slow_side: str,
allow_overruns_side: str, allow_overruns_side: str,
# conftest wide
loglevel: str, loglevel: str,
debug_mode: bool, debug_mode: bool,
): ):
@ -1147,3 +1149,52 @@ def test_maybe_allow_overruns_stream(
# if this hits the logic blocks from above are not # if this hits the logic blocks from above are not
# exhaustive.. # exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
def test_ctx_with_self_actor(
loglevel: str,
debug_mode: bool,
):
'''
NOTE: for now this is an INVALID OP!
BUT, eventually presuming we add a "side" key to `Actor.get_context()`,
we might be able to get this working symmetrically, but should we??
Open a context back to the same actor and ensure all cancellation
and error semantics hold the same.
'''
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
enable_modules=[__name__],
) as an:
assert an
async with (
tractor.find_actor('root') as portal,
portal.open_context(
expect_cancelled,
# echo_back_sequence,
# seq=seq,
# wait_for_cancel=cancel_ctx,
# be_slow=(slow_side == 'child'),
# allow_overruns_side=allow_overruns_side,
) as (ctx, sent),
ctx.open_stream() as ipc,
):
assert sent is None
seq = list(range(10))
for i in seq:
await ipc.send(i)
rx: int = await ipc.receive()
assert rx == i
await ctx.cancel()
with pytest.raises(RuntimeError) as excinfo:
trio.run(main)
assert 'Invalid Operation' in repr(excinfo.value)

View File

@ -1101,6 +1101,8 @@ class Context:
chan=self.chan, chan=self.chan,
cid=self.cid, cid=self.cid,
nsf=self._nsf, nsf=self._nsf,
# side=self.side,
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
@ -1298,7 +1300,7 @@ class Context:
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement # https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607 # https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True __tracebackhide__: bool = True
raise remote_error from None raise remote_error # from None
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
async def result( async def result(

View File

@ -514,6 +514,16 @@ class Portal:
# a new `_context.py` mod. # a new `_context.py` mod.
nsf = NamespacePath.from_ref(func) nsf = NamespacePath.from_ref(func)
# XXX NOTE XXX: currenly we do NOT allow opening a contex
# with "self" since the local feeder mem-chan processing
# is not built for it.
if self.channel.uid == self.actor.uid:
raise RuntimeError(
'** !! Invalid Operation !! **\n'
'Can not open an IPC ctx with the local actor!\n'
f'|_{self.actor}\n'
)
ctx: Context = await self.actor.start_remote_task( ctx: Context = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=nsf, nsf=nsf,

View File

@ -434,6 +434,10 @@ async def _invoke(
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=NamespacePath.from_ref(func), nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it?
# side='callee',
# We shouldn't ever need to pass this through right? # We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to # it's up to the soon-to-be called rpc task to
# open the stream with this option. # open the stream with this option.
@ -686,9 +690,11 @@ async def _invoke(
# don't pop the local context until we know the # don't pop the local context until we know the
# associated child isn't in debug any more # associated child isn't in debug any more
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop( ctx: Context = actor._contexts.pop((
(chan.uid, cid) chan.uid,
) cid,
# ctx.side,
))
merr: Exception|None = ctx.maybe_error merr: Exception|None = ctx.maybe_error
@ -879,7 +885,11 @@ class Actor:
# map {actor uids -> Context} # map {actor uids -> Context}
self._contexts: dict[ self._contexts: dict[
tuple[tuple[str, str], str], tuple[
tuple[str, str], # .uid
str, # .cid
str, # .side
],
Context Context
] = {} ] = {}
@ -1363,7 +1373,13 @@ class Actor:
uid: tuple[str, str] = chan.uid uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}" assert uid, f"`chan.uid` can't be {uid}"
try: try:
ctx: Context = self._contexts[(uid, cid)] ctx: Context = self._contexts[(
uid,
cid,
# TODO: how to determine this tho?
# side,
)]
except KeyError: except KeyError:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
@ -1382,6 +1398,16 @@ class Actor:
cid: str, cid: str,
nsf: NamespacePath, nsf: NamespacePath,
# TODO: support lookup by `Context.side: str` ?
# -> would allow making a self-context which might have
# certain special use cases where RPC isolation is wanted
# between 2 tasks running in the same process?
# => prolly needs some deeper though on the real use cases
# and whether or not such things should be better
# implemented using a `TaskManager` style nursery..
#
# side: str|None = None,
msg_buffer_size: int | None = None, msg_buffer_size: int | None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
@ -1397,7 +1423,11 @@ class Actor:
actor_uid = chan.uid actor_uid = chan.uid
assert actor_uid assert actor_uid
try: try:
ctx = self._contexts[(actor_uid, cid)] ctx = self._contexts[(
actor_uid,
cid,
# side,
)]
log.runtime( log.runtime(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
@ -1423,7 +1453,11 @@ class Actor:
msg_buffer_size=msg_buffer_size or self.msg_buffer_size, msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
_allow_overruns=allow_overruns, _allow_overruns=allow_overruns,
) )
self._contexts[(actor_uid, cid)] = ctx self._contexts[(
actor_uid,
cid,
# side,
)] = ctx
return ctx return ctx
@ -1454,6 +1488,8 @@ class Actor:
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=nsf, nsf=nsf,
# side='caller',
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )