Compare commits
No commits in common. "8477919fc92b9811ec10bb9931da184319fa4d4c" and "5449bd567309b943a0f3317aabf4a9cb95cb0fd3" have entirely different histories.
8477919fc9
...
5449bd5673
|
@ -7,6 +7,9 @@ related settings around IPC contexts.
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from contextvars import (
|
||||||
|
Context,
|
||||||
|
)
|
||||||
|
|
||||||
from msgspec import (
|
from msgspec import (
|
||||||
Struct,
|
Struct,
|
||||||
|
@ -16,7 +19,6 @@ import trio
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import (
|
from tractor import (
|
||||||
Context,
|
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
current_ipc_ctx,
|
current_ipc_ctx,
|
||||||
Portal,
|
Portal,
|
||||||
|
@ -33,17 +35,7 @@ from tractor.msg.types import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class PldMsg(
|
class PldMsg(Struct):
|
||||||
Struct,
|
|
||||||
|
|
||||||
# TODO: with multiple structs in-spec we need to tag them!
|
|
||||||
# -[ ] offer a built-in `PldMsg` type to inherit from which takes
|
|
||||||
# case of these details?
|
|
||||||
#
|
|
||||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
|
||||||
# tag=True,
|
|
||||||
# tag_field='msg_type',
|
|
||||||
):
|
|
||||||
field: str
|
field: str
|
||||||
|
|
||||||
|
|
||||||
|
@ -104,9 +96,7 @@ async def maybe_expect_raises(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context(
|
@tractor.context
|
||||||
pld_spec=maybe_msg_spec,
|
|
||||||
)
|
|
||||||
async def child(
|
async def child(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
started_value: int|PldMsg|None,
|
started_value: int|PldMsg|None,
|
||||||
|
@ -126,20 +116,22 @@ async def child(
|
||||||
assert ctx is curr_ctx
|
assert ctx is curr_ctx
|
||||||
|
|
||||||
rx: msgops.PldRx = ctx._pld_rx
|
rx: msgops.PldRx = ctx._pld_rx
|
||||||
curr_pldec: _codec.MsgDec = rx.pld_dec
|
orig_pldec: _codec.MsgDec = rx.pld_dec
|
||||||
|
# senity that default pld-spec should be set
|
||||||
ctx_meta: dict = getattr(
|
|
||||||
child,
|
|
||||||
'_tractor_context_meta',
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
if ctx_meta:
|
|
||||||
assert (
|
assert (
|
||||||
ctx_meta['pld_spec']
|
rx.pld_dec
|
||||||
is curr_pldec.spec
|
is
|
||||||
is curr_pldec.pld_spec
|
msgops._def_any_pldec
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with msgops.limit_plds(
|
||||||
|
spec=maybe_msg_spec,
|
||||||
|
) as pldec:
|
||||||
|
# sanity on `MsgDec` state
|
||||||
|
assert rx.pld_dec is pldec
|
||||||
|
assert pldec.spec is maybe_msg_spec
|
||||||
|
|
||||||
# 2 cases: hdndle send-side and recv-only validation
|
# 2 cases: hdndle send-side and recv-only validation
|
||||||
# - when `raise_on_started_mte == True`, send validate
|
# - when `raise_on_started_mte == True`, send validate
|
||||||
# - else, parent-recv-side only validation
|
# - else, parent-recv-side only validation
|
||||||
|
@ -219,6 +211,18 @@ async def child(
|
||||||
# msg-type-error from this RPC task ;)
|
# msg-type-error from this RPC task ;)
|
||||||
return return_value
|
return return_value
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# sanity on `limit_plds()` reversion
|
||||||
|
assert (
|
||||||
|
rx.pld_dec
|
||||||
|
is
|
||||||
|
msgops._def_any_pldec
|
||||||
|
)
|
||||||
|
log.runtime(
|
||||||
|
'Reverted to previous pld-spec\n\n'
|
||||||
|
f'{orig_pldec}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'return_value',
|
'return_value',
|
||||||
|
@ -317,6 +321,7 @@ def test_basic_payload_spec(
|
||||||
child,
|
child,
|
||||||
return_value=return_value,
|
return_value=return_value,
|
||||||
started_value=started_value,
|
started_value=started_value,
|
||||||
|
pld_spec=maybe_msg_spec,
|
||||||
validate_pld_spec=pld_check_started_value,
|
validate_pld_spec=pld_check_started_value,
|
||||||
) as (ctx, first),
|
) as (ctx, first),
|
||||||
):
|
):
|
||||||
|
|
|
@ -2399,11 +2399,7 @@ def mk_context(
|
||||||
# |_ `return: TypeAlias`,
|
# |_ `return: TypeAlias`,
|
||||||
# |_ `invalid_policy: str|Callable` ?
|
# |_ `invalid_policy: str|Callable` ?
|
||||||
# -[ ] prolly implement the `@acm` wrapper using
|
# -[ ] prolly implement the `@acm` wrapper using
|
||||||
# a `contextlib.ContextDecorator`, i guess not if
|
# a `contextlib.ContextDecorator`?
|
||||||
# we don't need an `__aexit__` block right?
|
|
||||||
# |_ de hecho, @acm can already be used as a decorator as of 3.10
|
|
||||||
# but i dunno how that's gonna play with `trio.Nursery.start[_soon]()`
|
|
||||||
# |_ https://docs.python.org/3/library/contextlib.html#using-a-context-manager-as-a-function-decorator
|
|
||||||
#
|
#
|
||||||
def context(
|
def context(
|
||||||
func: Callable|None = None,
|
func: Callable|None = None,
|
||||||
|
|
|
@ -441,8 +441,7 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
for key in fields:
|
for key in fields:
|
||||||
if (
|
if (
|
||||||
key == 'relay_uid'
|
key == 'relay_uid' and not self.is_inception()
|
||||||
and not self.is_inception()
|
|
||||||
):
|
):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -1292,7 +1291,8 @@ def _mk_msg_type_err(
|
||||||
|
|
||||||
msgtyperr = MsgTypeError(
|
msgtyperr = MsgTypeError(
|
||||||
message=message,
|
message=message,
|
||||||
_bad_msg=msg,
|
ipc_msg=msg,
|
||||||
|
bad_msg=msg,
|
||||||
)
|
)
|
||||||
# ya, might be `None`
|
# ya, might be `None`
|
||||||
msgtyperr.__cause__ = src_type_error
|
msgtyperr.__cause__ = src_type_error
|
||||||
|
|
|
@ -68,7 +68,6 @@ from trio import (
|
||||||
)
|
)
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._context import Context
|
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
is_root_process,
|
is_root_process,
|
||||||
|
@ -84,6 +83,7 @@ if TYPE_CHECKING:
|
||||||
from trio.lowlevel import Task
|
from trio.lowlevel import Task
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from tractor._ipc import Channel
|
from tractor._ipc import Channel
|
||||||
|
from tractor._context import Context
|
||||||
from tractor._runtime import (
|
from tractor._runtime import (
|
||||||
Actor,
|
Actor,
|
||||||
)
|
)
|
||||||
|
@ -529,10 +529,7 @@ class Lock:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context(
|
@tractor.context
|
||||||
# enable the locking msgspec
|
|
||||||
pld_spec=__pld_spec__,
|
|
||||||
)
|
|
||||||
async def lock_stdio_for_peer(
|
async def lock_stdio_for_peer(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
subactor_task_uid: tuple[str, int],
|
subactor_task_uid: tuple[str, int],
|
||||||
|
@ -600,6 +597,12 @@ async def lock_stdio_for_peer(
|
||||||
# scope despite the shielding we apply below.
|
# scope despite the shielding we apply below.
|
||||||
debug_lock_cs: CancelScope = ctx._scope
|
debug_lock_cs: CancelScope = ctx._scope
|
||||||
|
|
||||||
|
# TODO: use `.msg._ops.maybe_limit_plds()` here instead so we
|
||||||
|
# can merge into a single async with, with the
|
||||||
|
# `Lock.acquire_for_ctx()` enter below?
|
||||||
|
#
|
||||||
|
# enable the locking msgspec
|
||||||
|
with apply_debug_pldec():
|
||||||
async with Lock.acquire_for_ctx(ctx=ctx):
|
async with Lock.acquire_for_ctx(ctx=ctx):
|
||||||
debug_lock_cs.shield = True
|
debug_lock_cs.shield = True
|
||||||
|
|
||||||
|
@ -1034,6 +1037,48 @@ class PdbREPL(pdbp.Pdb):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: prolly remove this and instead finally get our @context API
|
||||||
|
# supporting a msg/pld-spec via type annots as per,
|
||||||
|
# https://github.com/goodboy/tractor/issues/365
|
||||||
|
@cm
|
||||||
|
def apply_debug_pldec() -> _codec.MsgCodec:
|
||||||
|
'''
|
||||||
|
Apply the subactor TTY `Lock`-ing protocol's msgspec temporarily
|
||||||
|
(only in the current task).
|
||||||
|
|
||||||
|
'''
|
||||||
|
from tractor.msg import (
|
||||||
|
_ops as msgops,
|
||||||
|
)
|
||||||
|
cctx: Context = current_ipc_ctx()
|
||||||
|
rx: msgops.PldRx = cctx.pld_rx
|
||||||
|
orig_pldec: msgops.MsgDec = rx.pld_dec
|
||||||
|
|
||||||
|
try:
|
||||||
|
with msgops.limit_plds(
|
||||||
|
spec=__pld_spec__,
|
||||||
|
) as debug_dec:
|
||||||
|
assert (
|
||||||
|
debug_dec
|
||||||
|
is
|
||||||
|
rx.pld_dec
|
||||||
|
)
|
||||||
|
log.runtime(
|
||||||
|
'Applied `.devx._debug` pld-spec\n\n'
|
||||||
|
f'{debug_dec}\n'
|
||||||
|
)
|
||||||
|
yield debug_dec
|
||||||
|
|
||||||
|
finally:
|
||||||
|
assert (
|
||||||
|
rx.pld_dec is orig_pldec
|
||||||
|
)
|
||||||
|
log.runtime(
|
||||||
|
'Reverted to previous pld-spec\n\n'
|
||||||
|
f'{orig_pldec}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def request_root_stdio_lock(
|
async def request_root_stdio_lock(
|
||||||
actor_uid: tuple[str, str],
|
actor_uid: tuple[str, str],
|
||||||
task_uid: tuple[str, int],
|
task_uid: tuple[str, int],
|
||||||
|
@ -1102,7 +1147,6 @@ async def request_root_stdio_lock(
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
lock_stdio_for_peer,
|
lock_stdio_for_peer,
|
||||||
subactor_task_uid=task_uid,
|
subactor_task_uid=task_uid,
|
||||||
|
|
||||||
# NOTE: set it here in the locker request task bc it's
|
# NOTE: set it here in the locker request task bc it's
|
||||||
# possible for multiple such requests for the lock in any
|
# possible for multiple such requests for the lock in any
|
||||||
# single sub-actor AND there will be a race between when the
|
# single sub-actor AND there will be a race between when the
|
||||||
|
@ -1115,7 +1159,7 @@ async def request_root_stdio_lock(
|
||||||
# this IPC-ctx request task, NOT any other task(s)
|
# this IPC-ctx request task, NOT any other task(s)
|
||||||
# including the one that actually enters the REPL. This
|
# including the one that actually enters the REPL. This
|
||||||
# is oc desired bc ow the debugged task will msg-type-error.
|
# is oc desired bc ow the debugged task will msg-type-error.
|
||||||
# pld_spec=__pld_spec__,
|
pld_spec=__pld_spec__,
|
||||||
|
|
||||||
) as (req_ctx, status):
|
) as (req_ctx, status):
|
||||||
|
|
||||||
|
@ -2812,9 +2856,7 @@ def open_crash_handler(
|
||||||
|
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
def maybe_open_crash_handler(
|
def maybe_open_crash_handler(pdb: bool = False):
|
||||||
pdb: bool = False,
|
|
||||||
):
|
|
||||||
'''
|
'''
|
||||||
Same as `open_crash_handler()` but with bool input flag
|
Same as `open_crash_handler()` but with bool input flag
|
||||||
to allow conditional handling.
|
to allow conditional handling.
|
||||||
|
|
|
@ -27,7 +27,6 @@ from contextlib import (
|
||||||
)
|
)
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
|
||||||
Type,
|
Type,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Union,
|
Union,
|
||||||
|
@ -139,7 +138,6 @@ class PldRx(Struct):
|
||||||
def limit_plds(
|
def limit_plds(
|
||||||
self,
|
self,
|
||||||
spec: Union[Type[Struct]],
|
spec: Union[Type[Struct]],
|
||||||
**dec_kwargs,
|
|
||||||
|
|
||||||
) -> MsgDec:
|
) -> MsgDec:
|
||||||
'''
|
'''
|
||||||
|
@ -149,10 +147,7 @@ class PldRx(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
orig_dec: MsgDec = self._pld_dec
|
orig_dec: MsgDec = self._pld_dec
|
||||||
limit_dec: MsgDec = mk_dec(
|
limit_dec: MsgDec = mk_dec(spec=spec)
|
||||||
spec=spec,
|
|
||||||
**dec_kwargs,
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
self._pld_dec = limit_dec
|
self._pld_dec = limit_dec
|
||||||
yield limit_dec
|
yield limit_dec
|
||||||
|
@ -454,7 +449,7 @@ class PldRx(Struct):
|
||||||
@cm
|
@cm
|
||||||
def limit_plds(
|
def limit_plds(
|
||||||
spec: Union[Type[Struct]],
|
spec: Union[Type[Struct]],
|
||||||
**dec_kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> MsgDec:
|
) -> MsgDec:
|
||||||
'''
|
'''
|
||||||
|
@ -472,7 +467,7 @@ def limit_plds(
|
||||||
|
|
||||||
with rx.limit_plds(
|
with rx.limit_plds(
|
||||||
spec=spec,
|
spec=spec,
|
||||||
**dec_kwargs,
|
**kwargs,
|
||||||
) as pldec:
|
) as pldec:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Applying payload-decoder\n\n'
|
'Applying payload-decoder\n\n'
|
||||||
|
@ -492,9 +487,7 @@ def limit_plds(
|
||||||
async def maybe_limit_plds(
|
async def maybe_limit_plds(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
spec: Union[Type[Struct]]|None = None,
|
spec: Union[Type[Struct]]|None = None,
|
||||||
dec_hook: Callable|None = None,
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> MsgDec|None:
|
) -> MsgDec|None:
|
||||||
'''
|
'''
|
||||||
Async compat maybe-payload type limiter.
|
Async compat maybe-payload type limiter.
|
||||||
|
@ -504,11 +497,7 @@ async def maybe_limit_plds(
|
||||||
used.
|
used.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if (
|
if spec is None:
|
||||||
spec is None
|
|
||||||
and
|
|
||||||
dec_hook is None
|
|
||||||
):
|
|
||||||
yield None
|
yield None
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -516,11 +505,7 @@ async def maybe_limit_plds(
|
||||||
curr_ctx: Context = current_ipc_ctx()
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
assert ctx is curr_ctx
|
assert ctx is curr_ctx
|
||||||
|
|
||||||
with ctx._pld_rx.limit_plds(
|
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
|
||||||
spec=spec,
|
|
||||||
dec_hook=dec_hook,
|
|
||||||
**kwargs,
|
|
||||||
) as msgdec:
|
|
||||||
yield msgdec
|
yield msgdec
|
||||||
|
|
||||||
curr_ctx: Context = current_ipc_ctx()
|
curr_ctx: Context = current_ipc_ctx()
|
||||||
|
|
Loading…
Reference in New Issue