Compare commits

..

No commits in common. "8477919fc92b9811ec10bb9931da184319fa4d4c" and "5449bd567309b943a0f3317aabf4a9cb95cb0fd3" have entirely different histories.

5 changed files with 208 additions and 180 deletions

View File

@ -7,6 +7,9 @@ related settings around IPC contexts.
from contextlib import (
asynccontextmanager as acm,
)
from contextvars import (
Context,
)
from msgspec import (
Struct,
@ -16,7 +19,6 @@ import trio
import tractor
from tractor import (
Context,
MsgTypeError,
current_ipc_ctx,
Portal,
@ -33,17 +35,7 @@ from tractor.msg.types import (
)
class PldMsg(
Struct,
# TODO: with multiple structs in-spec we need to tag them!
# -[ ] offer a built-in `PldMsg` type to inherit from which takes
# case of these details?
#
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag=True,
# tag_field='msg_type',
):
class PldMsg(Struct):
field: str
@ -104,9 +96,7 @@ async def maybe_expect_raises(
)
@tractor.context(
pld_spec=maybe_msg_spec,
)
@tractor.context
async def child(
ctx: Context,
started_value: int|PldMsg|None,
@ -126,20 +116,22 @@ async def child(
assert ctx is curr_ctx
rx: msgops.PldRx = ctx._pld_rx
curr_pldec: _codec.MsgDec = rx.pld_dec
ctx_meta: dict = getattr(
child,
'_tractor_context_meta',
None,
)
if ctx_meta:
orig_pldec: _codec.MsgDec = rx.pld_dec
# senity that default pld-spec should be set
assert (
ctx_meta['pld_spec']
is curr_pldec.spec
is curr_pldec.pld_spec
rx.pld_dec
is
msgops._def_any_pldec
)
try:
with msgops.limit_plds(
spec=maybe_msg_spec,
) as pldec:
# sanity on `MsgDec` state
assert rx.pld_dec is pldec
assert pldec.spec is maybe_msg_spec
# 2 cases: hdndle send-side and recv-only validation
# - when `raise_on_started_mte == True`, send validate
# - else, parent-recv-side only validation
@ -219,6 +211,18 @@ async def child(
# msg-type-error from this RPC task ;)
return return_value
finally:
# sanity on `limit_plds()` reversion
assert (
rx.pld_dec
is
msgops._def_any_pldec
)
log.runtime(
'Reverted to previous pld-spec\n\n'
f'{orig_pldec}\n'
)
@pytest.mark.parametrize(
'return_value',
@ -317,6 +321,7 @@ def test_basic_payload_spec(
child,
return_value=return_value,
started_value=started_value,
pld_spec=maybe_msg_spec,
validate_pld_spec=pld_check_started_value,
) as (ctx, first),
):

View File

@ -2399,11 +2399,7 @@ def mk_context(
# |_ `return: TypeAlias`,
# |_ `invalid_policy: str|Callable` ?
# -[ ] prolly implement the `@acm` wrapper using
# a `contextlib.ContextDecorator`, i guess not if
# we don't need an `__aexit__` block right?
# |_ de hecho, @acm can already be used as a decorator as of 3.10
# but i dunno how that's gonna play with `trio.Nursery.start[_soon]()`
# |_ https://docs.python.org/3/library/contextlib.html#using-a-context-manager-as-a-function-decorator
# a `contextlib.ContextDecorator`?
#
def context(
func: Callable|None = None,

View File

@ -441,8 +441,7 @@ class RemoteActorError(Exception):
for key in fields:
if (
key == 'relay_uid'
and not self.is_inception()
key == 'relay_uid' and not self.is_inception()
):
continue
@ -1292,7 +1291,8 @@ def _mk_msg_type_err(
msgtyperr = MsgTypeError(
message=message,
_bad_msg=msg,
ipc_msg=msg,
bad_msg=msg,
)
# ya, might be `None`
msgtyperr.__cause__ = src_type_error

View File

@ -68,7 +68,6 @@ from trio import (
)
import tractor
from tractor.log import get_logger
from tractor._context import Context
from tractor._state import (
current_actor,
is_root_process,
@ -84,6 +83,7 @@ if TYPE_CHECKING:
from trio.lowlevel import Task
from threading import Thread
from tractor._ipc import Channel
from tractor._context import Context
from tractor._runtime import (
Actor,
)
@ -529,10 +529,7 @@ class Lock:
)
@tractor.context(
# enable the locking msgspec
pld_spec=__pld_spec__,
)
@tractor.context
async def lock_stdio_for_peer(
ctx: Context,
subactor_task_uid: tuple[str, int],
@ -600,6 +597,12 @@ async def lock_stdio_for_peer(
# scope despite the shielding we apply below.
debug_lock_cs: CancelScope = ctx._scope
# TODO: use `.msg._ops.maybe_limit_plds()` here instead so we
# can merge into a single async with, with the
# `Lock.acquire_for_ctx()` enter below?
#
# enable the locking msgspec
with apply_debug_pldec():
async with Lock.acquire_for_ctx(ctx=ctx):
debug_lock_cs.shield = True
@ -1034,6 +1037,48 @@ class PdbREPL(pdbp.Pdb):
return None
# TODO: prolly remove this and instead finally get our @context API
# supporting a msg/pld-spec via type annots as per,
# https://github.com/goodboy/tractor/issues/365
@cm
def apply_debug_pldec() -> _codec.MsgCodec:
'''
Apply the subactor TTY `Lock`-ing protocol's msgspec temporarily
(only in the current task).
'''
from tractor.msg import (
_ops as msgops,
)
cctx: Context = current_ipc_ctx()
rx: msgops.PldRx = cctx.pld_rx
orig_pldec: msgops.MsgDec = rx.pld_dec
try:
with msgops.limit_plds(
spec=__pld_spec__,
) as debug_dec:
assert (
debug_dec
is
rx.pld_dec
)
log.runtime(
'Applied `.devx._debug` pld-spec\n\n'
f'{debug_dec}\n'
)
yield debug_dec
finally:
assert (
rx.pld_dec is orig_pldec
)
log.runtime(
'Reverted to previous pld-spec\n\n'
f'{orig_pldec}\n'
)
async def request_root_stdio_lock(
actor_uid: tuple[str, str],
task_uid: tuple[str, int],
@ -1102,7 +1147,6 @@ async def request_root_stdio_lock(
async with portal.open_context(
lock_stdio_for_peer,
subactor_task_uid=task_uid,
# NOTE: set it here in the locker request task bc it's
# possible for multiple such requests for the lock in any
# single sub-actor AND there will be a race between when the
@ -1115,7 +1159,7 @@ async def request_root_stdio_lock(
# this IPC-ctx request task, NOT any other task(s)
# including the one that actually enters the REPL. This
# is oc desired bc ow the debugged task will msg-type-error.
# pld_spec=__pld_spec__,
pld_spec=__pld_spec__,
) as (req_ctx, status):
@ -2812,9 +2856,7 @@ def open_crash_handler(
@cm
def maybe_open_crash_handler(
pdb: bool = False,
):
def maybe_open_crash_handler(pdb: bool = False):
'''
Same as `open_crash_handler()` but with bool input flag
to allow conditional handling.

View File

@ -27,7 +27,6 @@ from contextlib import (
)
from typing import (
Any,
Callable,
Type,
TYPE_CHECKING,
Union,
@ -139,7 +138,6 @@ class PldRx(Struct):
def limit_plds(
self,
spec: Union[Type[Struct]],
**dec_kwargs,
) -> MsgDec:
'''
@ -149,10 +147,7 @@ class PldRx(Struct):
'''
orig_dec: MsgDec = self._pld_dec
limit_dec: MsgDec = mk_dec(
spec=spec,
**dec_kwargs,
)
limit_dec: MsgDec = mk_dec(spec=spec)
try:
self._pld_dec = limit_dec
yield limit_dec
@ -454,7 +449,7 @@ class PldRx(Struct):
@cm
def limit_plds(
spec: Union[Type[Struct]],
**dec_kwargs,
**kwargs,
) -> MsgDec:
'''
@ -472,7 +467,7 @@ def limit_plds(
with rx.limit_plds(
spec=spec,
**dec_kwargs,
**kwargs,
) as pldec:
log.runtime(
'Applying payload-decoder\n\n'
@ -492,9 +487,7 @@ def limit_plds(
async def maybe_limit_plds(
ctx: Context,
spec: Union[Type[Struct]]|None = None,
dec_hook: Callable|None = None,
**kwargs,
) -> MsgDec|None:
'''
Async compat maybe-payload type limiter.
@ -504,11 +497,7 @@ async def maybe_limit_plds(
used.
'''
if (
spec is None
and
dec_hook is None
):
if spec is None:
yield None
return
@ -516,11 +505,7 @@ async def maybe_limit_plds(
curr_ctx: Context = current_ipc_ctx()
assert ctx is curr_ctx
with ctx._pld_rx.limit_plds(
spec=spec,
dec_hook=dec_hook,
**kwargs,
) as msgdec:
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
yield msgdec
curr_ctx: Context = current_ipc_ctx()