Port debug request ep to use `@context(pld_spec)`

Namely passing the `.__pld_spec__` directly to the
`lock_stdio_for_peer()` decorator B)

Also, allows dropping `apply_debug_pldec()` (which was a todo) and
removing a `lock_stdio_for_peer()` indent level.
runtime_to_msgspec
Tyler Goodlet 2024-06-17 09:01:13 -04:00
parent 5449bd5673
commit a0ee0cc713
1 changed files with 54 additions and 96 deletions

View File

@ -68,6 +68,7 @@ from trio import (
)
import tractor
from tractor.log import get_logger
from tractor._context import Context
from tractor._state import (
current_actor,
is_root_process,
@ -83,7 +84,6 @@ if TYPE_CHECKING:
from trio.lowlevel import Task
from threading import Thread
from tractor._ipc import Channel
from tractor._context import Context
from tractor._runtime import (
Actor,
)
@ -529,7 +529,10 @@ class Lock:
)
@tractor.context
@tractor.context(
# enable the locking msgspec
pld_spec=__pld_spec__,
)
async def lock_stdio_for_peer(
ctx: Context,
subactor_task_uid: tuple[str, int],
@ -597,61 +600,55 @@ async def lock_stdio_for_peer(
# scope despite the shielding we apply below.
debug_lock_cs: CancelScope = ctx._scope
# TODO: use `.msg._ops.maybe_limit_plds()` here instead so we
# can merge into a single async with, with the
# `Lock.acquire_for_ctx()` enter below?
#
# enable the locking msgspec
with apply_debug_pldec():
async with Lock.acquire_for_ctx(ctx=ctx):
debug_lock_cs.shield = True
async with Lock.acquire_for_ctx(ctx=ctx):
debug_lock_cs.shield = True
log.devx(
'Subactor acquired debugger request lock!\n'
f'root task: {root_task_name}\n'
f'subactor_uid: {subactor_uid}\n'
f'remote task: {subactor_task_uid}\n\n'
log.devx(
'Subactor acquired debugger request lock!\n'
f'root task: {root_task_name}\n'
f'subactor_uid: {subactor_uid}\n'
f'remote task: {subactor_task_uid}\n\n'
'Sending `ctx.started(LockStatus)`..\n'
'Sending `ctx.started(LockStatus)`..\n'
)
# indicate to child that we've locked stdio
await ctx.started(
LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=True,
)
)
log.devx(
f'Actor {subactor_uid} acquired `Lock` via debugger request'
)
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
release_msg: LockRelease = await stream.receive()
# TODO: security around only releasing if
# these match?
log.devx(
f'TTY lock released requested\n\n'
f'{release_msg}\n'
)
assert release_msg.cid == ctx.cid
assert release_msg.subactor_uid == tuple(subactor_uid)
log.devx(
f'Actor {subactor_uid} released TTY lock'
)
return LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=False,
)
# indicate to child that we've locked stdio
await ctx.started(
LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=True,
)
)
log.devx(
f'Actor {subactor_uid} acquired `Lock` via debugger request'
)
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
release_msg: LockRelease = await stream.receive()
# TODO: security around only releasing if
# these match?
log.devx(
f'TTY lock released requested\n\n'
f'{release_msg}\n'
)
assert release_msg.cid == ctx.cid
assert release_msg.subactor_uid == tuple(subactor_uid)
log.devx(
f'Actor {subactor_uid} released TTY lock'
)
return LockStatus(
subactor_uid=subactor_uid,
cid=ctx.cid,
locked=False,
)
except BaseException as req_err:
message: str = (
f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
@ -1037,48 +1034,6 @@ class PdbREPL(pdbp.Pdb):
return None
# TODO: prolly remove this and instead finally get our @context API
# supporting a msg/pld-spec via type annots as per,
# https://github.com/goodboy/tractor/issues/365
@cm
def apply_debug_pldec() -> _codec.MsgCodec:
'''
Apply the subactor TTY `Lock`-ing protocol's msgspec temporarily
(only in the current task).
'''
from tractor.msg import (
_ops as msgops,
)
cctx: Context = current_ipc_ctx()
rx: msgops.PldRx = cctx.pld_rx
orig_pldec: msgops.MsgDec = rx.pld_dec
try:
with msgops.limit_plds(
spec=__pld_spec__,
) as debug_dec:
assert (
debug_dec
is
rx.pld_dec
)
log.runtime(
'Applied `.devx._debug` pld-spec\n\n'
f'{debug_dec}\n'
)
yield debug_dec
finally:
assert (
rx.pld_dec is orig_pldec
)
log.runtime(
'Reverted to previous pld-spec\n\n'
f'{orig_pldec}\n'
)
async def request_root_stdio_lock(
actor_uid: tuple[str, str],
task_uid: tuple[str, int],
@ -1147,6 +1102,7 @@ async def request_root_stdio_lock(
async with portal.open_context(
lock_stdio_for_peer,
subactor_task_uid=task_uid,
# NOTE: set it here in the locker request task bc it's
# possible for multiple such requests for the lock in any
# single sub-actor AND there will be a race between when the
@ -1159,7 +1115,7 @@ async def request_root_stdio_lock(
# this IPC-ctx request task, NOT any other task(s)
# including the one that actually enters the REPL. This
# is oc desired bc ow the debugged task will msg-type-error.
pld_spec=__pld_spec__,
# pld_spec=__pld_spec__,
) as (req_ctx, status):
@ -2856,7 +2812,9 @@ def open_crash_handler(
@cm
def maybe_open_crash_handler(pdb: bool = False):
def maybe_open_crash_handler(
pdb: bool = False,
):
'''
Same as `open_crash_handler()` but with bool input flag
to allow conditional handling.