Compare commits
5 Commits
5449bd5673
...
8477919fc9
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 8477919fc9 | |
Tyler Goodlet | 872feef24b | |
Tyler Goodlet | affc210033 | |
Tyler Goodlet | 04bd111037 | |
Tyler Goodlet | a0ee0cc713 |
|
@ -7,9 +7,6 @@ related settings around IPC contexts.
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from contextvars import (
|
||||
Context,
|
||||
)
|
||||
|
||||
from msgspec import (
|
||||
Struct,
|
||||
|
@ -19,6 +16,7 @@ import trio
|
|||
|
||||
import tractor
|
||||
from tractor import (
|
||||
Context,
|
||||
MsgTypeError,
|
||||
current_ipc_ctx,
|
||||
Portal,
|
||||
|
@ -35,7 +33,17 @@ from tractor.msg.types import (
|
|||
)
|
||||
|
||||
|
||||
class PldMsg(Struct):
|
||||
class PldMsg(
|
||||
Struct,
|
||||
|
||||
# TODO: with multiple structs in-spec we need to tag them!
|
||||
# -[ ] offer a built-in `PldMsg` type to inherit from which takes
|
||||
# case of these details?
|
||||
#
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
# tag=True,
|
||||
# tag_field='msg_type',
|
||||
):
|
||||
field: str
|
||||
|
||||
|
||||
|
@ -96,12 +104,14 @@ async def maybe_expect_raises(
|
|||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
@tractor.context(
|
||||
pld_spec=maybe_msg_spec,
|
||||
)
|
||||
async def child(
|
||||
ctx: Context,
|
||||
started_value: int|PldMsg|None,
|
||||
return_value: str|None,
|
||||
validate_pld_spec: bool,
|
||||
validate_pld_spec: bool,
|
||||
raise_on_started_mte: bool = True,
|
||||
|
||||
) -> None:
|
||||
|
@ -116,113 +126,99 @@ async def child(
|
|||
assert ctx is curr_ctx
|
||||
|
||||
rx: msgops.PldRx = ctx._pld_rx
|
||||
orig_pldec: _codec.MsgDec = rx.pld_dec
|
||||
# senity that default pld-spec should be set
|
||||
assert (
|
||||
rx.pld_dec
|
||||
is
|
||||
msgops._def_any_pldec
|
||||
curr_pldec: _codec.MsgDec = rx.pld_dec
|
||||
|
||||
ctx_meta: dict = getattr(
|
||||
child,
|
||||
'_tractor_context_meta',
|
||||
None,
|
||||
)
|
||||
if ctx_meta:
|
||||
assert (
|
||||
ctx_meta['pld_spec']
|
||||
is curr_pldec.spec
|
||||
is curr_pldec.pld_spec
|
||||
)
|
||||
|
||||
# 2 cases: hdndle send-side and recv-only validation
|
||||
# - when `raise_on_started_mte == True`, send validate
|
||||
# - else, parent-recv-side only validation
|
||||
mte: MsgTypeError|None = None
|
||||
try:
|
||||
with msgops.limit_plds(
|
||||
spec=maybe_msg_spec,
|
||||
) as pldec:
|
||||
# sanity on `MsgDec` state
|
||||
assert rx.pld_dec is pldec
|
||||
assert pldec.spec is maybe_msg_spec
|
||||
await ctx.started(
|
||||
value=started_value,
|
||||
validate_pld_spec=validate_pld_spec,
|
||||
)
|
||||
|
||||
# 2 cases: hdndle send-side and recv-only validation
|
||||
# - when `raise_on_started_mte == True`, send validate
|
||||
# - else, parent-recv-side only validation
|
||||
mte: MsgTypeError|None = None
|
||||
try:
|
||||
await ctx.started(
|
||||
value=started_value,
|
||||
validate_pld_spec=validate_pld_spec,
|
||||
)
|
||||
|
||||
except MsgTypeError as _mte:
|
||||
mte = _mte
|
||||
log.exception('started()` raised an MTE!\n')
|
||||
if not expect_started_mte:
|
||||
raise RuntimeError(
|
||||
'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n'
|
||||
f'{started_value!r}\n'
|
||||
)
|
||||
|
||||
boxed_div: str = '------ - ------'
|
||||
assert boxed_div not in mte._message
|
||||
assert boxed_div not in mte.tb_str
|
||||
assert boxed_div not in repr(mte)
|
||||
assert boxed_div not in str(mte)
|
||||
mte_repr: str = repr(mte)
|
||||
for line in mte.message.splitlines():
|
||||
assert line in mte_repr
|
||||
|
||||
# since this is a *local error* there should be no
|
||||
# boxed traceback content!
|
||||
assert not mte.tb_str
|
||||
|
||||
# propagate to parent?
|
||||
if raise_on_started_mte:
|
||||
raise
|
||||
|
||||
# no-send-side-error fallthrough
|
||||
if (
|
||||
validate_pld_spec
|
||||
and
|
||||
expect_started_mte
|
||||
):
|
||||
raise RuntimeError(
|
||||
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
|
||||
f'{started_value!r}\n'
|
||||
)
|
||||
|
||||
assert (
|
||||
not expect_started_mte
|
||||
or
|
||||
not validate_pld_spec
|
||||
except MsgTypeError as _mte:
|
||||
mte = _mte
|
||||
log.exception('started()` raised an MTE!\n')
|
||||
if not expect_started_mte:
|
||||
raise RuntimeError(
|
||||
'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n'
|
||||
f'{started_value!r}\n'
|
||||
)
|
||||
|
||||
# if wait_for_parent_to_cancel:
|
||||
# ...
|
||||
#
|
||||
# ^-TODO-^ logic for diff validation policies on each side:
|
||||
#
|
||||
# -[ ] ensure that if we don't validate on the send
|
||||
# side, that we are eventually error-cancelled by our
|
||||
# parent due to the bad `Started` payload!
|
||||
# -[ ] the boxed error should be srced from the parent's
|
||||
# runtime NOT ours!
|
||||
# -[ ] we should still error on bad `return_value`s
|
||||
# despite the parent not yet error-cancelling us?
|
||||
# |_ how do we want the parent side to look in that
|
||||
# case?
|
||||
# -[ ] maybe the equiv of "during handling of the
|
||||
# above error another occurred" for the case where
|
||||
# the parent sends a MTE to this child and while
|
||||
# waiting for the child to terminate it gets back
|
||||
# the MTE for this case?
|
||||
#
|
||||
boxed_div: str = '------ - ------'
|
||||
assert boxed_div not in mte._message
|
||||
assert boxed_div not in mte.tb_str
|
||||
assert boxed_div not in repr(mte)
|
||||
assert boxed_div not in str(mte)
|
||||
mte_repr: str = repr(mte)
|
||||
for line in mte.message.splitlines():
|
||||
assert line in mte_repr
|
||||
|
||||
# XXX should always fail on recv side since we can't
|
||||
# really do much else beside terminate and relay the
|
||||
# msg-type-error from this RPC task ;)
|
||||
return return_value
|
||||
# since this is a *local error* there should be no
|
||||
# boxed traceback content!
|
||||
assert not mte.tb_str
|
||||
|
||||
finally:
|
||||
# sanity on `limit_plds()` reversion
|
||||
assert (
|
||||
rx.pld_dec
|
||||
is
|
||||
msgops._def_any_pldec
|
||||
)
|
||||
log.runtime(
|
||||
'Reverted to previous pld-spec\n\n'
|
||||
f'{orig_pldec}\n'
|
||||
# propagate to parent?
|
||||
if raise_on_started_mte:
|
||||
raise
|
||||
|
||||
# no-send-side-error fallthrough
|
||||
if (
|
||||
validate_pld_spec
|
||||
and
|
||||
expect_started_mte
|
||||
):
|
||||
raise RuntimeError(
|
||||
'Child-ctx-task SHOULD HAVE raised an MTE for\n\n'
|
||||
f'{started_value!r}\n'
|
||||
)
|
||||
|
||||
assert (
|
||||
not expect_started_mte
|
||||
or
|
||||
not validate_pld_spec
|
||||
)
|
||||
|
||||
# if wait_for_parent_to_cancel:
|
||||
# ...
|
||||
#
|
||||
# ^-TODO-^ logic for diff validation policies on each side:
|
||||
#
|
||||
# -[ ] ensure that if we don't validate on the send
|
||||
# side, that we are eventually error-cancelled by our
|
||||
# parent due to the bad `Started` payload!
|
||||
# -[ ] the boxed error should be srced from the parent's
|
||||
# runtime NOT ours!
|
||||
# -[ ] we should still error on bad `return_value`s
|
||||
# despite the parent not yet error-cancelling us?
|
||||
# |_ how do we want the parent side to look in that
|
||||
# case?
|
||||
# -[ ] maybe the equiv of "during handling of the
|
||||
# above error another occurred" for the case where
|
||||
# the parent sends a MTE to this child and while
|
||||
# waiting for the child to terminate it gets back
|
||||
# the MTE for this case?
|
||||
#
|
||||
|
||||
# XXX should always fail on recv side since we can't
|
||||
# really do much else beside terminate and relay the
|
||||
# msg-type-error from this RPC task ;)
|
||||
return return_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'return_value',
|
||||
|
@ -321,7 +317,6 @@ def test_basic_payload_spec(
|
|||
child,
|
||||
return_value=return_value,
|
||||
started_value=started_value,
|
||||
pld_spec=maybe_msg_spec,
|
||||
validate_pld_spec=pld_check_started_value,
|
||||
) as (ctx, first),
|
||||
):
|
||||
|
|
|
@ -2399,7 +2399,11 @@ def mk_context(
|
|||
# |_ `return: TypeAlias`,
|
||||
# |_ `invalid_policy: str|Callable` ?
|
||||
# -[ ] prolly implement the `@acm` wrapper using
|
||||
# a `contextlib.ContextDecorator`?
|
||||
# a `contextlib.ContextDecorator`, i guess not if
|
||||
# we don't need an `__aexit__` block right?
|
||||
# |_ de hecho, @acm can already be used as a decorator as of 3.10
|
||||
# but i dunno how that's gonna play with `trio.Nursery.start[_soon]()`
|
||||
# |_ https://docs.python.org/3/library/contextlib.html#using-a-context-manager-as-a-function-decorator
|
||||
#
|
||||
def context(
|
||||
func: Callable|None = None,
|
||||
|
|
|
@ -441,7 +441,8 @@ class RemoteActorError(Exception):
|
|||
|
||||
for key in fields:
|
||||
if (
|
||||
key == 'relay_uid' and not self.is_inception()
|
||||
key == 'relay_uid'
|
||||
and not self.is_inception()
|
||||
):
|
||||
continue
|
||||
|
||||
|
@ -1291,8 +1292,7 @@ def _mk_msg_type_err(
|
|||
|
||||
msgtyperr = MsgTypeError(
|
||||
message=message,
|
||||
ipc_msg=msg,
|
||||
bad_msg=msg,
|
||||
_bad_msg=msg,
|
||||
)
|
||||
# ya, might be `None`
|
||||
msgtyperr.__cause__ = src_type_error
|
||||
|
|
|
@ -68,6 +68,7 @@ from trio import (
|
|||
)
|
||||
import tractor
|
||||
from tractor.log import get_logger
|
||||
from tractor._context import Context
|
||||
from tractor._state import (
|
||||
current_actor,
|
||||
is_root_process,
|
||||
|
@ -83,7 +84,6 @@ if TYPE_CHECKING:
|
|||
from trio.lowlevel import Task
|
||||
from threading import Thread
|
||||
from tractor._ipc import Channel
|
||||
from tractor._context import Context
|
||||
from tractor._runtime import (
|
||||
Actor,
|
||||
)
|
||||
|
@ -529,7 +529,10 @@ class Lock:
|
|||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
@tractor.context(
|
||||
# enable the locking msgspec
|
||||
pld_spec=__pld_spec__,
|
||||
)
|
||||
async def lock_stdio_for_peer(
|
||||
ctx: Context,
|
||||
subactor_task_uid: tuple[str, int],
|
||||
|
@ -597,61 +600,55 @@ async def lock_stdio_for_peer(
|
|||
# scope despite the shielding we apply below.
|
||||
debug_lock_cs: CancelScope = ctx._scope
|
||||
|
||||
# TODO: use `.msg._ops.maybe_limit_plds()` here instead so we
|
||||
# can merge into a single async with, with the
|
||||
# `Lock.acquire_for_ctx()` enter below?
|
||||
#
|
||||
# enable the locking msgspec
|
||||
with apply_debug_pldec():
|
||||
async with Lock.acquire_for_ctx(ctx=ctx):
|
||||
debug_lock_cs.shield = True
|
||||
async with Lock.acquire_for_ctx(ctx=ctx):
|
||||
debug_lock_cs.shield = True
|
||||
|
||||
log.devx(
|
||||
'Subactor acquired debugger request lock!\n'
|
||||
f'root task: {root_task_name}\n'
|
||||
f'subactor_uid: {subactor_uid}\n'
|
||||
f'remote task: {subactor_task_uid}\n\n'
|
||||
log.devx(
|
||||
'Subactor acquired debugger request lock!\n'
|
||||
f'root task: {root_task_name}\n'
|
||||
f'subactor_uid: {subactor_uid}\n'
|
||||
f'remote task: {subactor_task_uid}\n\n'
|
||||
|
||||
'Sending `ctx.started(LockStatus)`..\n'
|
||||
'Sending `ctx.started(LockStatus)`..\n'
|
||||
|
||||
)
|
||||
|
||||
# indicate to child that we've locked stdio
|
||||
await ctx.started(
|
||||
LockStatus(
|
||||
subactor_uid=subactor_uid,
|
||||
cid=ctx.cid,
|
||||
locked=True,
|
||||
)
|
||||
)
|
||||
|
||||
log.devx(
|
||||
f'Actor {subactor_uid} acquired `Lock` via debugger request'
|
||||
)
|
||||
|
||||
# wait for unlock pdb by child
|
||||
async with ctx.open_stream() as stream:
|
||||
release_msg: LockRelease = await stream.receive()
|
||||
|
||||
# TODO: security around only releasing if
|
||||
# these match?
|
||||
log.devx(
|
||||
f'TTY lock released requested\n\n'
|
||||
f'{release_msg}\n'
|
||||
)
|
||||
assert release_msg.cid == ctx.cid
|
||||
assert release_msg.subactor_uid == tuple(subactor_uid)
|
||||
|
||||
log.devx(
|
||||
f'Actor {subactor_uid} released TTY lock'
|
||||
)
|
||||
|
||||
return LockStatus(
|
||||
subactor_uid=subactor_uid,
|
||||
cid=ctx.cid,
|
||||
locked=False,
|
||||
)
|
||||
|
||||
# indicate to child that we've locked stdio
|
||||
await ctx.started(
|
||||
LockStatus(
|
||||
subactor_uid=subactor_uid,
|
||||
cid=ctx.cid,
|
||||
locked=True,
|
||||
)
|
||||
)
|
||||
|
||||
log.devx(
|
||||
f'Actor {subactor_uid} acquired `Lock` via debugger request'
|
||||
)
|
||||
|
||||
# wait for unlock pdb by child
|
||||
async with ctx.open_stream() as stream:
|
||||
release_msg: LockRelease = await stream.receive()
|
||||
|
||||
# TODO: security around only releasing if
|
||||
# these match?
|
||||
log.devx(
|
||||
f'TTY lock released requested\n\n'
|
||||
f'{release_msg}\n'
|
||||
)
|
||||
assert release_msg.cid == ctx.cid
|
||||
assert release_msg.subactor_uid == tuple(subactor_uid)
|
||||
|
||||
log.devx(
|
||||
f'Actor {subactor_uid} released TTY lock'
|
||||
)
|
||||
|
||||
return LockStatus(
|
||||
subactor_uid=subactor_uid,
|
||||
cid=ctx.cid,
|
||||
locked=False,
|
||||
)
|
||||
|
||||
except BaseException as req_err:
|
||||
message: str = (
|
||||
f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
|
||||
|
@ -1037,48 +1034,6 @@ class PdbREPL(pdbp.Pdb):
|
|||
return None
|
||||
|
||||
|
||||
# TODO: prolly remove this and instead finally get our @context API
|
||||
# supporting a msg/pld-spec via type annots as per,
|
||||
# https://github.com/goodboy/tractor/issues/365
|
||||
@cm
|
||||
def apply_debug_pldec() -> _codec.MsgCodec:
|
||||
'''
|
||||
Apply the subactor TTY `Lock`-ing protocol's msgspec temporarily
|
||||
(only in the current task).
|
||||
|
||||
'''
|
||||
from tractor.msg import (
|
||||
_ops as msgops,
|
||||
)
|
||||
cctx: Context = current_ipc_ctx()
|
||||
rx: msgops.PldRx = cctx.pld_rx
|
||||
orig_pldec: msgops.MsgDec = rx.pld_dec
|
||||
|
||||
try:
|
||||
with msgops.limit_plds(
|
||||
spec=__pld_spec__,
|
||||
) as debug_dec:
|
||||
assert (
|
||||
debug_dec
|
||||
is
|
||||
rx.pld_dec
|
||||
)
|
||||
log.runtime(
|
||||
'Applied `.devx._debug` pld-spec\n\n'
|
||||
f'{debug_dec}\n'
|
||||
)
|
||||
yield debug_dec
|
||||
|
||||
finally:
|
||||
assert (
|
||||
rx.pld_dec is orig_pldec
|
||||
)
|
||||
log.runtime(
|
||||
'Reverted to previous pld-spec\n\n'
|
||||
f'{orig_pldec}\n'
|
||||
)
|
||||
|
||||
|
||||
async def request_root_stdio_lock(
|
||||
actor_uid: tuple[str, str],
|
||||
task_uid: tuple[str, int],
|
||||
|
@ -1147,6 +1102,7 @@ async def request_root_stdio_lock(
|
|||
async with portal.open_context(
|
||||
lock_stdio_for_peer,
|
||||
subactor_task_uid=task_uid,
|
||||
|
||||
# NOTE: set it here in the locker request task bc it's
|
||||
# possible for multiple such requests for the lock in any
|
||||
# single sub-actor AND there will be a race between when the
|
||||
|
@ -1159,7 +1115,7 @@ async def request_root_stdio_lock(
|
|||
# this IPC-ctx request task, NOT any other task(s)
|
||||
# including the one that actually enters the REPL. This
|
||||
# is oc desired bc ow the debugged task will msg-type-error.
|
||||
pld_spec=__pld_spec__,
|
||||
# pld_spec=__pld_spec__,
|
||||
|
||||
) as (req_ctx, status):
|
||||
|
||||
|
@ -2856,7 +2812,9 @@ def open_crash_handler(
|
|||
|
||||
|
||||
@cm
|
||||
def maybe_open_crash_handler(pdb: bool = False):
|
||||
def maybe_open_crash_handler(
|
||||
pdb: bool = False,
|
||||
):
|
||||
'''
|
||||
Same as `open_crash_handler()` but with bool input flag
|
||||
to allow conditional handling.
|
||||
|
|
|
@ -27,6 +27,7 @@ from contextlib import (
|
|||
)
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
Union,
|
||||
|
@ -138,6 +139,7 @@ class PldRx(Struct):
|
|||
def limit_plds(
|
||||
self,
|
||||
spec: Union[Type[Struct]],
|
||||
**dec_kwargs,
|
||||
|
||||
) -> MsgDec:
|
||||
'''
|
||||
|
@ -147,7 +149,10 @@ class PldRx(Struct):
|
|||
|
||||
'''
|
||||
orig_dec: MsgDec = self._pld_dec
|
||||
limit_dec: MsgDec = mk_dec(spec=spec)
|
||||
limit_dec: MsgDec = mk_dec(
|
||||
spec=spec,
|
||||
**dec_kwargs,
|
||||
)
|
||||
try:
|
||||
self._pld_dec = limit_dec
|
||||
yield limit_dec
|
||||
|
@ -449,7 +454,7 @@ class PldRx(Struct):
|
|||
@cm
|
||||
def limit_plds(
|
||||
spec: Union[Type[Struct]],
|
||||
**kwargs,
|
||||
**dec_kwargs,
|
||||
|
||||
) -> MsgDec:
|
||||
'''
|
||||
|
@ -467,7 +472,7 @@ def limit_plds(
|
|||
|
||||
with rx.limit_plds(
|
||||
spec=spec,
|
||||
**kwargs,
|
||||
**dec_kwargs,
|
||||
) as pldec:
|
||||
log.runtime(
|
||||
'Applying payload-decoder\n\n'
|
||||
|
@ -487,7 +492,9 @@ def limit_plds(
|
|||
async def maybe_limit_plds(
|
||||
ctx: Context,
|
||||
spec: Union[Type[Struct]]|None = None,
|
||||
dec_hook: Callable|None = None,
|
||||
**kwargs,
|
||||
|
||||
) -> MsgDec|None:
|
||||
'''
|
||||
Async compat maybe-payload type limiter.
|
||||
|
@ -497,7 +504,11 @@ async def maybe_limit_plds(
|
|||
used.
|
||||
|
||||
'''
|
||||
if spec is None:
|
||||
if (
|
||||
spec is None
|
||||
and
|
||||
dec_hook is None
|
||||
):
|
||||
yield None
|
||||
return
|
||||
|
||||
|
@ -505,7 +516,11 @@ async def maybe_limit_plds(
|
|||
curr_ctx: Context = current_ipc_ctx()
|
||||
assert ctx is curr_ctx
|
||||
|
||||
with ctx._pld_rx.limit_plds(spec=spec) as msgdec:
|
||||
with ctx._pld_rx.limit_plds(
|
||||
spec=spec,
|
||||
dec_hook=dec_hook,
|
||||
**kwargs,
|
||||
) as msgdec:
|
||||
yield msgdec
|
||||
|
||||
curr_ctx: Context = current_ipc_ctx()
|
||||
|
|
Loading…
Reference in New Issue