Compare commits

..

5 Commits

Author SHA1 Message Date
Tyler Goodlet 8477919fc9 Don't pass `ipc_msg` for send side MTEs
Just pass `_bad_msg` such that it get's injected to `.msgdata` since
with a send-side `MsgTypeError` we don't have a remote `._ipc_msg:
Error` per say to include.
2024-06-17 10:32:50 -04:00
Tyler Goodlet 872feef24b Add note about using `@acm` as decorator in 3.10 2024-06-17 10:32:38 -04:00
Tyler Goodlet affc210033 Update pld-rx limiting test(s) to use deco input
The tests only use one input spec (conveniently) so there's not much to
change in the logic,
- only pass the `maybe_msg_spec` to the child-side decorator and obvi
  drop the surrounding `msgops.limit_plds()` block in the child.
- tweak a few `MsgDec` asserts, mostly dropping the
  `msg._ops._def_any_spec` state checks since the child-side won't have
  any pre pld-spec state given the runtime now applies the `pld_spec`
  before running the task's func body.
  - also allowed dropping the `finally:` which did a similar check
    outside the `.limit_plds()` block.
2024-06-17 09:24:03 -04:00
Tyler Goodlet 04bd111037 Proxy through `dec_hook` in `.limit_plds()` APIs 2024-06-17 09:23:31 -04:00
Tyler Goodlet a0ee0cc713 Port debug request ep to use `@context(pld_spec)`
Namely passing the `.__pld_spec__` directly to the
`lock_stdio_for_peer()` decorator B)

Also, allows dropping `apply_debug_pldec()` (which was a todo) and
removing a `lock_stdio_for_peer()` indent level.
2024-06-17 09:01:13 -04:00
5 changed files with 181 additions and 209 deletions

View File

@ -7,9 +7,6 @@ related settings around IPC contexts.
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from contextvars import (
Context,
)
from msgspec import ( from msgspec import (
Struct, Struct,
@ -19,6 +16,7 @@ import trio
import tractor import tractor
from tractor import ( from tractor import (
Context,
MsgTypeError, MsgTypeError,
current_ipc_ctx, current_ipc_ctx,
Portal, Portal,
@ -35,7 +33,17 @@ from tractor.msg.types import (
) )
class PldMsg(Struct): class PldMsg(
Struct,
# TODO: with multiple structs in-spec we need to tag them!
# -[ ] offer a built-in `PldMsg` type to inherit from which takes
# case of these details?
#
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag=True,
# tag_field='msg_type',
):
field: str field: str
@ -96,12 +104,14 @@ async def maybe_expect_raises(
) )
@tractor.context @tractor.context(
pld_spec=maybe_msg_spec,
)
async def child( async def child(
ctx: Context, ctx: Context,
started_value: int|PldMsg|None, started_value: int|PldMsg|None,
return_value: str|None, return_value: str|None,
validate_pld_spec: bool, validate_pld_spec: bool,
raise_on_started_mte: bool = True, raise_on_started_mte: bool = True,
) -> None: ) -> None:
@ -116,113 +126,99 @@ async def child(
assert ctx is curr_ctx assert ctx is curr_ctx
rx: msgops.PldRx = ctx._pld_rx rx: msgops.PldRx = ctx._pld_rx
orig_pldec: _codec.MsgDec = rx.pld_dec curr_pldec: _codec.MsgDec = rx.pld_dec
# senity that default pld-spec should be set
assert ( ctx_meta: dict = getattr(
rx.pld_dec child,
is '_tractor_context_meta',
msgops._def_any_pldec None,
) )
if ctx_meta:
assert (
ctx_meta['pld_spec']
is curr_pldec.spec
is curr_pldec.pld_spec
)
# 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation
mte: MsgTypeError|None = None
try: try:
with msgops.limit_plds( await ctx.started(
spec=maybe_msg_spec, value=started_value,
) as pldec: validate_pld_spec=validate_pld_spec,
# sanity on `MsgDec` state )
assert rx.pld_dec is pldec
assert pldec.spec is maybe_msg_spec
# 2 cases: hdndle send-side and recv-only validation except MsgTypeError as _mte:
# - when `raise_on_started_mte == True`, send validate mte = _mte
# - else, parent-recv-side only validation log.exception('started()` raised an MTE!\n')
mte: MsgTypeError|None = None if not expect_started_mte:
try: raise RuntimeError(
await ctx.started( 'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n'
value=started_value, f'{started_value!r}\n'
validate_pld_spec=validate_pld_spec,
)
except MsgTypeError as _mte:
mte = _mte
log.exception('started()` raised an MTE!\n')
if not expect_started_mte:
raise RuntimeError(
'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
boxed_div: str = '------ - ------'
assert boxed_div not in mte._message
assert boxed_div not in mte.tb_str
assert boxed_div not in repr(mte)
assert boxed_div not in str(mte)
mte_repr: str = repr(mte)
for line in mte.message.splitlines():
assert line in mte_repr
# since this is a *local error* there should be no
# boxed traceback content!
assert not mte.tb_str
# propagate to parent?
if raise_on_started_mte:
raise
# no-send-side-error fallthrough
if (
validate_pld_spec
and
expect_started_mte
):
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
)
assert (
not expect_started_mte
or
not validate_pld_spec
) )
# if wait_for_parent_to_cancel: boxed_div: str = '------ - ------'
# ... assert boxed_div not in mte._message
# assert boxed_div not in mte.tb_str
# ^-TODO-^ logic for diff validation policies on each side: assert boxed_div not in repr(mte)
# assert boxed_div not in str(mte)
# -[ ] ensure that if we don't validate on the send mte_repr: str = repr(mte)
# side, that we are eventually error-cancelled by our for line in mte.message.splitlines():
# parent due to the bad `Started` payload! assert line in mte_repr
# -[ ] the boxed error should be srced from the parent's
# runtime NOT ours!
# -[ ] we should still error on bad `return_value`s
# despite the parent not yet error-cancelling us?
# |_ how do we want the parent side to look in that
# case?
# -[ ] maybe the equiv of "during handling of the
# above error another occurred" for the case where
# the parent sends a MTE to this child and while
# waiting for the child to terminate it gets back
# the MTE for this case?
#
# XXX should always fail on recv side since we can't # since this is a *local error* there should be no
# really do much else beside terminate and relay the # boxed traceback content!
# msg-type-error from this RPC task ;) assert not mte.tb_str
return return_value
finally: # propagate to parent?
# sanity on `limit_plds()` reversion if raise_on_started_mte:
assert ( raise
rx.pld_dec
is # no-send-side-error fallthrough
msgops._def_any_pldec if (
) validate_pld_spec
log.runtime( and
'Reverted to previous pld-spec\n\n' expect_started_mte
f'{orig_pldec}\n' ):
raise RuntimeError(
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
f'{started_value!r}\n'
) )
assert (
not expect_started_mte
or
not validate_pld_spec
)
# if wait_for_parent_to_cancel:
# ...
#
# ^-TODO-^ logic for diff validation policies on each side:
#
# -[ ] ensure that if we don't validate on the send
# side, that we are eventually error-cancelled by our
# parent due to the bad `Started` payload!
# -[ ] the boxed error should be srced from the parent's
# runtime NOT ours!
# -[ ] we should still error on bad `return_value`s
# despite the parent not yet error-cancelling us?
# |_ how do we want the parent side to look in that
# case?
# -[ ] maybe the equiv of "during handling of the
# above error another occurred" for the case where
# the parent sends a MTE to this child and while
# waiting for the child to terminate it gets back
# the MTE for this case?
#
# XXX should always fail on recv side since we can't
# really do much else beside terminate and relay the
# msg-type-error from this RPC task ;)
return return_value
@pytest.mark.parametrize( @pytest.mark.parametrize(
'return_value', 'return_value',
@ -321,7 +317,6 @@ def test_basic_payload_spec(
child, child,
return_value=return_value, return_value=return_value,
started_value=started_value, started_value=started_value,
pld_spec=maybe_msg_spec,
validate_pld_spec=pld_check_started_value, validate_pld_spec=pld_check_started_value,
) as (ctx, first), ) as (ctx, first),
): ):

View File

@ -2399,7 +2399,11 @@ def mk_context(
# |_ `return: TypeAlias`, # |_ `return: TypeAlias`,
# |_ `invalid_policy: str|Callable` ? # |_ `invalid_policy: str|Callable` ?
# -[ ] prolly implement the `@acm` wrapper using # -[ ] prolly implement the `@acm` wrapper using
# a `contextlib.ContextDecorator`? # a `contextlib.ContextDecorator`, i guess not if
# we don't need an `__aexit__` block right?
# |_ de hecho, @acm can already be used as a decorator as of 3.10
# but i dunno how that's gonna play with `trio.Nursery.start[_soon]()`
# |_ https://docs.python.org/3/library/contextlib.html#using-a-context-manager-as-a-function-decorator
# #
def context( def context(
func: Callable|None = None, func: Callable|None = None,

View File

@ -441,7 +441,8 @@ class RemoteActorError(Exception):
for key in fields: for key in fields:
if ( if (
key == 'relay_uid' and not self.is_inception() key == 'relay_uid'
and not self.is_inception()
): ):
continue continue
@ -1291,8 +1292,7 @@ def _mk_msg_type_err(
msgtyperr = MsgTypeError( msgtyperr = MsgTypeError(
message=message, message=message,
ipc_msg=msg, _bad_msg=msg,
bad_msg=msg,
) )
# ya, might be `None` # ya, might be `None`
msgtyperr.__cause__ = src_type_error msgtyperr.__cause__ = src_type_error

View File

@ -68,6 +68,7 @@ from trio import (
) )
import tractor import tractor
from tractor.log import get_logger from tractor.log import get_logger
from tractor._context import Context
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
is_root_process, is_root_process,
@ -83,7 +84,6 @@ if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
from threading import Thread from threading import Thread
from tractor._ipc import Channel from tractor._ipc import Channel
from tractor._context import Context
from tractor._runtime import ( from tractor._runtime import (
Actor, Actor,
) )
@ -529,7 +529,10 @@ class Lock:
) )
@tractor.context @tractor.context(
# enable the locking msgspec
pld_spec=__pld_spec__,
)
async def lock_stdio_for_peer( async def lock_stdio_for_peer(
ctx: Context, ctx: Context,
subactor_task_uid: tuple[str, int], subactor_task_uid: tuple[str, int],
@ -597,61 +600,55 @@ async def lock_stdio_for_peer(
# scope despite the shielding we apply below. # scope despite the shielding we apply below.
debug_lock_cs: CancelScope = ctx._scope debug_lock_cs: CancelScope = ctx._scope
# TODO: use `.msg._ops.maybe_limit_plds()` here instead so we async with Lock.acquire_for_ctx(ctx=ctx):
# can merge into a single async with, with the debug_lock_cs.shield = True
# `Lock.acquire_for_ctx()` enter below?
#
# enable the locking msgspec
with apply_debug_pldec():
async with Lock.acquire_for_ctx(ctx=ctx):
debug_lock_cs.shield = True
log.devx( log.devx(
'Subactor acquired debugger request lock!\n' 'Subactor acquired debugger request lock!\n'
f'root task: {root_task_name}\n' f'root task: {root_task_name}\n'
f'subactor_uid: {subactor_uid}\n' f'subactor_uid: {subactor_uid}\n'
f'remote task: {subactor_task_uid}\n\n' f'remote task: {subactor_task_uid}\n\n'
'Sending `ctx.started(LockStatus)`..\n' 'Sending `ctx.started(LockStatus)`..\n'
)
# indicate to child that we've locked stdio
await ctx.started(
LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=True,
)
)
log.devx(
f'Actor {subactor_uid} acquired `Lock` via debugger request'
)
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
release_msg: LockRelease = await stream.receive()
# TODO: security around only releasing if
# these match?
log.devx(
f'TTY lock released requested\n\n'
f'{release_msg}\n'
)
assert release_msg.cid == ctx.cid
assert release_msg.subactor_uid == tuple(subactor_uid)
log.devx(
f'Actor {subactor_uid} released TTY lock'
)
return LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=False,
) )
# indicate to child that we've locked stdio
await ctx.started(
LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=True,
)
)
log.devx(
f'Actor {subactor_uid} acquired `Lock` via debugger request'
)
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
release_msg: LockRelease = await stream.receive()
# TODO: security around only releasing if
# these match?
log.devx(
f'TTY lock released requested\n\n'
f'{release_msg}\n'
)
assert release_msg.cid == ctx.cid
assert release_msg.subactor_uid == tuple(subactor_uid)
log.devx(
f'Actor {subactor_uid} released TTY lock'
)
return LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=False,
)
except BaseException as req_err: except BaseException as req_err:
message: str = ( message: str = (
f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n' f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
@ -1037,48 +1034,6 @@ class PdbREPL(pdbp.Pdb):
return None return None
# TODO: prolly remove this and instead finally get our @context API
# supporting a msg/pld-spec via type annots as per,
# https://github.com/goodboy/tractor/issues/365
@cm
def apply_debug_pldec() -> _codec.MsgCodec:
'''
Apply the subactor TTY `Lock`-ing protocol's msgspec temporarily
(only in the current task).
'''
from tractor.msg import (
_ops as msgops,
)
cctx: Context = current_ipc_ctx()
rx: msgops.PldRx = cctx.pld_rx
orig_pldec: msgops.MsgDec = rx.pld_dec
try:
with msgops.limit_plds(
spec=__pld_spec__,
) as debug_dec:
assert (
debug_dec
is
rx.pld_dec
)
log.runtime(
'Applied `.devx._debug` pld-spec\n\n'
f'{debug_dec}\n'
)
yield debug_dec
finally:
assert (
rx.pld_dec is orig_pldec
)
log.runtime(
'Reverted to previous pld-spec\n\n'
f'{orig_pldec}\n'
)
async def request_root_stdio_lock( async def request_root_stdio_lock(
actor_uid: tuple[str, str], actor_uid: tuple[str, str],
task_uid: tuple[str, int], task_uid: tuple[str, int],
@ -1147,6 +1102,7 @@ async def request_root_stdio_lock(
async with portal.open_context( async with portal.open_context(
lock_stdio_for_peer, lock_stdio_for_peer,
subactor_task_uid=task_uid, subactor_task_uid=task_uid,
# NOTE: set it here in the locker request task bc it's # NOTE: set it here in the locker request task bc it's
# possible for multiple such requests for the lock in any # possible for multiple such requests for the lock in any
# single sub-actor AND there will be a race between when the # single sub-actor AND there will be a race between when the
@ -1159,7 +1115,7 @@ async def request_root_stdio_lock(
# this IPC-ctx request task, NOT any other task(s) # this IPC-ctx request task, NOT any other task(s)
# including the one that actually enters the REPL. This # including the one that actually enters the REPL. This
# is oc desired bc ow the debugged task will msg-type-error. # is oc desired bc ow the debugged task will msg-type-error.
pld_spec=__pld_spec__, # pld_spec=__pld_spec__,
) as (req_ctx, status): ) as (req_ctx, status):
@ -2856,7 +2812,9 @@ def open_crash_handler(
@cm @cm
def maybe_open_crash_handler(pdb: bool = False): def maybe_open_crash_handler(
pdb: bool = False,
):
''' '''
Same as `open_crash_handler()` but with bool input flag Same as `open_crash_handler()` but with bool input flag
to allow conditional handling. to allow conditional handling.

View File

@ -27,6 +27,7 @@ from contextlib import (
) )
from typing import ( from typing import (
Any, Any,
Callable,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
Union, Union,
@ -138,6 +139,7 @@ class PldRx(Struct):
def limit_plds( def limit_plds(
self, self,
spec: Union[Type[Struct]], spec: Union[Type[Struct]],
**dec_kwargs,
) -> MsgDec: ) -> MsgDec:
''' '''
@ -147,7 +149,10 @@ class PldRx(Struct):
''' '''
orig_dec: MsgDec = self._pld_dec orig_dec: MsgDec = self._pld_dec
limit_dec: MsgDec = mk_dec(spec=spec) limit_dec: MsgDec = mk_dec(
spec=spec,
**dec_kwargs,
)
try: try:
self._pld_dec = limit_dec self._pld_dec = limit_dec
yield limit_dec yield limit_dec
@ -449,7 +454,7 @@ class PldRx(Struct):
@cm @cm
def limit_plds( def limit_plds(
spec: Union[Type[Struct]], spec: Union[Type[Struct]],
**kwargs, **dec_kwargs,
) -> MsgDec: ) -> MsgDec:
''' '''
@ -467,7 +472,7 @@ def limit_plds(
with rx.limit_plds( with rx.limit_plds(
spec=spec, spec=spec,
**kwargs, **dec_kwargs,
) as pldec: ) as pldec:
log.runtime( log.runtime(
'Applying payload-decoder\n\n' 'Applying payload-decoder\n\n'
@ -487,7 +492,9 @@ def limit_plds(
async def maybe_limit_plds( async def maybe_limit_plds(
ctx: Context, ctx: Context,
spec: Union[Type[Struct]]|None = None, spec: Union[Type[Struct]]|None = None,
dec_hook: Callable|None = None,
**kwargs, **kwargs,
) -> MsgDec|None: ) -> MsgDec|None:
''' '''
Async compat maybe-payload type limiter. Async compat maybe-payload type limiter.
@ -497,7 +504,11 @@ async def maybe_limit_plds(
used. used.
''' '''
if spec is None: if (
spec is None
and
dec_hook is None
):
yield None yield None
return return
@ -505,7 +516,11 @@ async def maybe_limit_plds(
curr_ctx: Context = current_ipc_ctx() curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx assert ctx is curr_ctx
with ctx._pld_rx.limit_plds(spec=spec) as msgdec: with ctx._pld_rx.limit_plds(
spec=spec,
dec_hook=dec_hook,
**kwargs,
) as msgdec:
yield msgdec yield msgdec
curr_ctx: Context = current_ipc_ctx() curr_ctx: Context = current_ipc_ctx()