Use `DebugStatus` around subactor lock requests

Breaks out all the (sub)actor local conc primitives from `Lock` (which
is now only used in and by the root actor) such that there's an explicit
distinction between a task that's "consuming" the `Lock` (remotely) vs.
the root-side service tasks which do the actual acquire on behalf of the
requesters.

`DebugStatus` changeover deats:
------ - ------
- move all the actor-local vars over `DebugStatus` including:
  - move `_trio_handler` and `_orig_sigint_handler`
  - `local_task_in_debug` now `repl_task`
  - `_debugger_request_cs` now `req_cs`
  - `local_pdb_complete` now `repl_release`
- drop all ^ fields from `Lock.repr()` obvi..
- move over the `.[un]shield_sigint()` and
  `.is_main_trio_thread()` methods.
- add some new attrs/meths:
  - `DebugStatus.repl` for the currently running `Pdb` in-actor
    singleton.
  - `.repr()` for pprint of state (like `Lock`).
- Note: that even when a root-actor task is in REPL, the `DebugStatus`
  is still used for certain actor-local state mgmt, such as SIGINT
  handler shielding.
- obvi change all lock-requester code bits to now use a `DebugStatus` in
  their local actor-state instead of `Lock`, i.e. change usage from
  `Lock` in `._runtime` and `._root`.
- use new `Lock.get_locking_task_cs()` API in when checking for
  sub-in-debug from `._runtime.Actor._stream_handler()`.

Unrelated to topic-at-hand tweaks:
------ - ------
- drop the commented bits about hiding `@[a]cm` stack frames from
  `_debug.pause()` and simplify to only one block with the `shield`
  passthrough since we already solved the issue with cancel-scopes using
  `@pdbp.hideframe` B)
  - this includes all the extra logging about the extra frame for the
    user (good thing i put in that wasted effort back then eh..)
- put the `try/except BaseException` with `log.exception()` around the
  whole of `._pause()` to ensure we don't miss in-func errors which can
  cause hangs..
- allow passing in `portal: Portal` to
  `Actor.start_remote_task()` such that `Portal` task spawning methods
  are always denoted correctly in terms of `Context.side`.
- lotsa logging tweaks, decreasing a bit of noise from `.runtime()`s.
runtime_to_msgspec
Tyler Goodlet 2024-04-18 12:47:28 -04:00
parent d0e7610073
commit 77a15ebf19
3 changed files with 322 additions and 354 deletions

View File

@ -135,7 +135,7 @@ async def open_root_actor(
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True

View File

@ -267,10 +267,13 @@ class Actor:
self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None
# track each child/sub-actor in it's locally
# supervising nursery
self._actoruid2nursery: dict[
tuple[str, str],
tuple[str, str], # sub-`Actor.uid`
ActorNursery|None,
] = {} # type: ignore # noqa
] = {}
# when provided, init the registry addresses property from
# input via the validator.
@ -659,12 +662,18 @@ class Actor:
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
pdb_user_uid
and local_nursery
):
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
if entry:
proc: trio.Process
_, proc, _ = entry
@ -674,10 +683,10 @@ class Actor:
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT '
'Root actor reports no-more-peers, BUT\n'
'a DISCONNECTED child still has the debug '
'lock!\n'
f'root uid: {self.uid}\n'
'lock!\n\n'
# f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
@ -703,9 +712,8 @@ class Actor:
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if (
db_cs
(db_cs := pdb_lock.get_locking_task_cs())
and not db_cs.cancel_called
and uid == pdb_user_uid
):
@ -742,7 +750,7 @@ class Actor:
except KeyError:
log.warning(
'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n'
f'<= sender: {uid}\n\n'
# XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n'
@ -796,7 +804,7 @@ class Actor:
cid,
# side,
)]
log.runtime(
log.debug(
f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n'
f'cid:{cid}\n'
@ -835,10 +843,14 @@ class Actor:
nsf: NamespacePath,
kwargs: dict,
# determines `Context.side: str`
portal: Portal|None = None,
# IPC channel config
msg_buffer_size: int|None = None,
allow_overruns: bool = False,
load_nsf: bool = False,
ack_timeout: float = 3,
) -> Context:
'''
@ -863,10 +875,12 @@ class Actor:
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._portal = portal
if (
'self' in nsf
or not load_nsf
or
not load_nsf
):
ns, _, func = nsf.partition(':')
else:
@ -874,42 +888,29 @@ class Actor:
# -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple()
msg = msgtypes.Start(
ns=ns,
func=func,
kwargs=kwargs,
uid=self.uid,
cid=cid,
)
log.runtime(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
'Sending RPC start msg\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n'
)
await chan.send(
msgtypes.Start(
ns=ns,
func=func,
kwargs=kwargs,
uid=self.uid,
cid=cid,
)
)
# {'cmd': (
# ns,
# func,
# kwargs,
# self.uid,
# cid,
# )}
# )
await chan.send(msg)
# Wait on first response msg and validate; this should be
# immediate.
# first_msg: dict = await ctx._recv_chan.receive()
# functype: str = first_msg.get('functype')
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
# NOTE wait on first `StartAck` response msg and validate;
# this should be immediate and does not (yet) wait for the
# remote child task to sync via `Context.started()`.
with trio.fail_after(ack_timeout):
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
try:
functype: str = first_msg.functype
except AttributeError:
raise unpack_error(first_msg, chan)
# if 'error' in first_msg:
# raise unpack_error(first_msg, chan)
if functype not in (
'asyncfunc',
@ -917,7 +918,7 @@ class Actor:
'context',
):
raise ValueError(
f'{first_msg} is an invalid response packet?'
f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
)
ctx._remote_func_type = functype
@ -1162,7 +1163,7 @@ class Actor:
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
dbcs = _debug.Lock._debugger_request_cs
dbcs = _debug.DebugStatus.req_cs
if dbcs is not None:
msg += (
'>> Cancelling active debugger request..\n'
@ -1237,9 +1238,9 @@ class Actor:
except KeyError:
# NOTE: during msging race conditions this will often
# emit, some examples:
# - callee returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request,
# - callee errors prior to cancel req.
# - child returns a result before cancel-msg/ctxc-raised
# - child self raises ctxc before parent send request,
# - child errors prior to cancel req.
log.cancel(
'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n'
@ -1302,15 +1303,15 @@ class Actor:
flow_info: str = (
f'<= canceller: {requesting_uid}\n'
f'=> ipc-parent: {parent_chan}\n'
f' |_{ctx}\n'
f'|_{ctx}\n'
)
log.runtime(
'Waiting on RPC task to cancel\n'
'Waiting on RPC task to cancel\n\n'
f'{flow_info}'
)
await is_complete.wait()
log.runtime(
f'Sucessfully cancelled RPC task\n'
f'Sucessfully cancelled RPC task\n\n'
f'{flow_info}'
)
return True
@ -1536,8 +1537,8 @@ async def async_main(
'''
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# on our debugger state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
is_registered: bool = False
try:

View File

@ -160,12 +160,6 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Callable | None = None
_trio_handler: (
Callable[[int, FrameType|None], Any]
|int
| None
) = None
remote_task_in_debug: str|None = None
@staticmethod
@ -188,12 +182,6 @@ class Lock:
Lock._locking_task_cs = cs
# SUBACTOR ONLY
# ------ - -------
local_task_in_debug: Task|None = None
_debugger_request_cs: trio.CancelScope|None = None
local_pdb_complete: trio.Event|None = None
# ROOT ONLY
# ------ - -------
# the root-actor-ONLY singletons for,
@ -214,16 +202,12 @@ class Lock:
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_blocked: set[tuple[str, str]] = set() # `Actor.uid` block list
# TODO: should go on `PbpREPL`?
_orig_sigint_handler: Callable | None = None
@classmethod
def repr(cls) -> str:
# both root and subs
fields: str = (
f'repl: {cls.repl}\n'
f'local_repl_task: {cls.local_task_in_debug}\n'
)
if is_root_process():
@ -238,12 +222,6 @@ class Lock:
f'_debug_lock: {cls._debug_lock}\n'
f'lock_stats: {lock_stats}\n'
)
else:
fields += (
f'local_task_in_debug: {cls.local_task_in_debug}\n'
f'local_pdb_complete: {cls.local_pdb_complete}\n'
f'_debugger_request_cs: {cls._debugger_request_cs}\n'
)
body: str = textwrap.indent(
fields,
@ -255,7 +233,101 @@ class Lock:
')>'
)
# TODO: move to PdbREPL!
@classmethod
def release(cls):
try:
if not DebugStatus.is_main_trio_thread():
trio.from_thread.run_sync(
cls._debug_lock.release
)
else:
cls._debug_lock.release()
except RuntimeError as rte:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
# if (
# owner
# and
# cls.remote_task_in_debug is None
# ):
# raise RuntimeError(
# 'Stale `Lock` detected, no remote task active!?\n'
# f'|_{owner}\n'
# # f'{Lock}'
# ) from rte
if owner:
raise rte
# OW suppress, can't member why tho .. XD
# something somethin corrupts a cancel-scope
# somewhere..
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
if DebugStatus.repl_release is not None:
DebugStatus.repl_release.set()
finally:
cls.repl = None
cls.global_actor_in_debug = None
# restore original sigint handler
DebugStatus.unshield_sigint()
# actor-local state, irrelevant for non-root.
DebugStatus.repl_task = None
# TODO: actually use this instead throughout for subs!
class DebugStatus:
'''
Singleton-state for debugging machinery in a subactor.
Composes conc primitives for syncing with a root actor to
acquire the tree-global (TTY) `Lock` such that only ever one
actor's task can have the REPL active at a given time.
Methods to shield the process' `SIGINT` handler are used
whenever a local task is an active REPL.
'''
repl: PdbREPL|None = None
repl_task: Task|None = None
req_cs: trio.CancelScope|None = None
repl_release: trio.Event|None = None
lock_status: LockStatus|None = None
_orig_sigint_handler: Callable | None = None
_trio_handler: (
Callable[[int, FrameType|None], Any]
|int
| None
) = None
@classmethod
def repr(cls) -> str:
fields: str = (
f'repl: {cls.repl}\n'
f'repl_task: {cls.repl_task}\n'
f'repl_release: {cls.repl_release}\n'
f'req_cs: {cls.req_cs}\n'
)
body: str = textwrap.indent(
fields,
prefix=' |_',
)
return (
f'<{cls.__name__}(\n'
f'{body}'
')>'
)
@classmethod
def shield_sigint(cls):
'''
@ -339,77 +411,6 @@ class Lock:
# is not threading.main_thread()
# )
@classmethod
def release(cls):
try:
if not cls.is_main_trio_thread():
trio.from_thread.run_sync(
cls._debug_lock.release
)
else:
cls._debug_lock.release()
except RuntimeError as rte:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
# if (
# owner
# and
# cls.remote_task_in_debug is None
# ):
# raise RuntimeError(
# 'Stale `Lock` detected, no remote task active!?\n'
# f'|_{owner}\n'
# # f'{Lock}'
# ) from rte
if owner:
raise rte
# OW suppress, can't member why tho .. XD
# something somethin corrupts a cancel-scope
# somewhere..
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
if cls.local_pdb_complete is not None:
cls.local_pdb_complete.set()
finally:
# restore original sigint handler
cls.unshield_sigint()
cls.repl = None
# actor-local state, irrelevant for non-root.
cls.global_actor_in_debug = None
cls.local_task_in_debug = None
# TODO: actually use this instead throughout for subs!
class DebugStatus:
'''
Singleton-state for debugging machinery in a subactor.
Composes conc primitives for syncing with a root actor to
acquire the tree-global (TTY) `Lock` such that only ever one
actor's task can have the REPL active at a given time.
'''
repl: PdbREPL|None = None
lock_status: LockStatus|None = None
repl_task: Task|None = None
# local_task_in_debug: Task|None = None
req_cs: trio.CancelScope|None = None
# _debugger_request_cs: trio.CancelScope|None = None
repl_release: trio.Event|None = None
# local_pdb_complete: trio.Event|None = None
class TractorConfig(pdbp.DefaultConfig):
'''
@ -445,6 +446,7 @@ class PdbREPL(pdbp.Pdb):
status = DebugStatus
# def preloop(self):
# print('IN PRELOOP')
# super().preloop()
@ -660,16 +662,19 @@ async def lock_tty_for_child(
highly reliable at releasing the mutex complete!
'''
req_task_uid: tuple = tuple(subactor_task_uid)
if req_task_uid in Lock._blocked:
raise RuntimeError(
f'Double lock request!?\n'
f'The same remote task already has an active request for TTY lock ??\n\n'
f'task uid: {req_task_uid}\n'
f'subactor uid: {subactor_uid}\n'
)
f'subactor uid: {subactor_uid}\n\n'
Lock._blocked.add(req_task_uid)
'This might be mean that the requesting task '
'in `wait_for_parent_stdin_hijack()` may have crashed?\n'
'Consider that an internal bug exists given the TTY '
'`Lock`ing IPC dialog..\n'
)
root_task_name: str = current_task().name
if tuple(subactor_uid) in Lock._blocked:
@ -695,8 +700,9 @@ async def lock_tty_for_child(
f'subactor_uid: {subactor_uid}\n'
f'remote task: {subactor_task_uid}\n'
)
Lock.shield_sigint()
DebugStatus.shield_sigint()
try:
Lock._blocked.add(req_task_uid)
with (
# NOTE: though a cs is created for every subactor lock
# REQUEST in this ctx-child task, only the root-task
@ -708,6 +714,9 @@ async def lock_tty_for_child(
# used to do so!
trio.CancelScope(shield=True) as debug_lock_cs,
# TODO: make this ONLY limit the pld_spec such that we
# can on-error-decode-`.pld: Raw` fields in
# `Context._deliver_msg()`?
_codec.limit_msg_spec(
payload_spec=__msg_spec__,
) as codec,
@ -763,8 +772,9 @@ async def lock_tty_for_child(
finally:
debug_lock_cs.cancel()
Lock._blocked.remove(req_task_uid)
Lock.set_locking_task_cs(None)
Lock.unshield_sigint()
DebugStatus.unshield_sigint()
@cm
@ -817,7 +827,7 @@ async def wait_for_parent_stdin_hijack(
trio.CancelScope(shield=True) as cs,
apply_debug_codec(),
):
Lock._debugger_request_cs = cs
DebugStatus.req_cs = cs
try:
# TODO: merge into sync async with ?
async with get_root() as portal:
@ -829,7 +839,7 @@ async def wait_for_parent_stdin_hijack(
) as (ctx, resp):
log.pdb(
'Subactor locked TTY per msg\n'
'Subactor locked TTY with msg\n\n'
f'{resp}\n'
)
assert resp.subactor_uid == actor_uid
@ -837,12 +847,12 @@ async def wait_for_parent_stdin_hijack(
async with ctx.open_stream() as stream:
try: # to unblock local caller
assert Lock.local_pdb_complete
assert DebugStatus.repl_release
task_status.started(cs)
# wait for local task to exit and
# release the REPL
await Lock.local_pdb_complete.wait()
await DebugStatus.repl_release.wait()
finally:
await stream.send(
@ -867,12 +877,12 @@ async def wait_for_parent_stdin_hijack(
raise
finally:
Lock.local_task_in_debug = None
DebugStatus.repl_task = None
log.debug('Exiting debugger TTY lock request func from child')
log.cancel('Reverting SIGINT handler!')
Lock.unshield_sigint()
DebugStatus.unshield_sigint()
@ -901,7 +911,7 @@ def mk_mpdb() -> PdbREPL:
# in which case schedule the SIGINT shielding override
# to in the main thread.
# https://docs.python.org/3/library/signal.html#signals-and-threads
Lock.shield_sigint()
DebugStatus.shield_sigint()
# XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155
@ -1036,7 +1046,8 @@ def shield_sigint_handler(
)
log.warning(message)
Lock.unshield_sigint()
# Lock.unshield_sigint()
DebugStatus.unshield_sigint()
case_handled = True
else:
@ -1064,7 +1075,7 @@ def shield_sigint_handler(
if maybe_stale_lock_cs:
lock_cs.cancel()
Lock.unshield_sigint()
DebugStatus.unshield_sigint()
case_handled = True
# child actor that has locked the debugger
@ -1086,11 +1097,11 @@ def shield_sigint_handler(
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
Lock.unshield_sigint()
DebugStatus.unshield_sigint()
# do_cancel()
case_handled = True
task: str|None = Lock.local_task_in_debug
task: str|None = DebugStatus.repl_task
if (
task
and
@ -1124,7 +1135,7 @@ def shield_sigint_handler(
+
'Reverting handler to `trio` default!\n'
)
Lock.unshield_sigint()
DebugStatus.unshield_sigint()
case_handled = True
# XXX ensure that the reverted-to-handler actually is
@ -1200,32 +1211,15 @@ def _set_trace(
pdb
and actor is not None
)
# or shield
):
msg: str = _pause_msg
if shield:
# log.warning(
msg = (
'\n\n'
' ------ - ------\n'
'Debugger invoked with `shield=True` so an extra\n'
'`trio.CancelScope.__exit__()` frame is shown..\n'
'\n'
'Try going up one frame to see your pause point!\n'
'\n'
' SORRY we need to fix this!\n'
' ------ - ------\n\n'
) + msg
# pdbp.set_trace()
# TODO: maybe print the actor supervion tree up to the
# root here? Bo
log.pdb(
f'{msg}\n'
f'{_pause_msg}\n'
'|\n'
# TODO: make an `Actor.__repr()__`
# f'|_ {current_task()} @ {actor.name}\n'
f'|_ {current_task()}\n'
f'|_ {current_task()} @ {actor.uid}\n'
)
# no f!#$&* idea, but when we're in async land
# we need 2x frames up?
@ -1286,11 +1280,11 @@ async def _pause(
# task_name: str = task.name
if (
not Lock.local_pdb_complete
not DebugStatus.repl_release
or
Lock.local_pdb_complete.is_set()
DebugStatus.repl_release.is_set()
):
Lock.local_pdb_complete = trio.Event()
DebugStatus.repl_release = trio.Event()
if debug_func is not None:
debug_func = partial(debug_func)
@ -1333,71 +1327,14 @@ async def _pause(
Lock.release()
raise
except BaseException:
log.exception(
'Failed to engage debugger via `_pause()` ??\n'
)
raise
try:
if is_root_process():
if is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
# TODO: wait, what about multiple root tasks acquiring it though?
if Lock.global_actor_in_debug == actor.uid:
# re-entrant root process already has it: noop.
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'ignoring..'
)
await trio.lowlevel.checkpoint()
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if Lock._debug_lock.locked():
log.warning(
'attempting to shield-acquire active TTY lock'
f' owned by {Lock.global_actor_in_debug}'
)
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task
Lock.repl = pdb
# enter REPL from root, no TTY locking IPC ctx necessary
_enter_repl_sync(debug_func)
return # next branch is mutex and for subactors
# TODO: need a more robust check for the "root" actor
elif (
not is_root_process()
and actor._parent_chan # a connected child
):
if Lock.local_task_in_debug:
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
#
# NOTE: noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some
# subactor that would otherwise not bubble until the
# next checkpoint was hit.
if (
(repl_task := Lock.local_task_in_debug)
and
repl_task is task
):
# we also wait in the root-parent for any child that
# may have the tty locked prior
# TODO: wait, what about multiple root tasks acquiring it though?
if Lock.global_actor_in_debug == actor.uid:
# re-entrant root process already has it: noop.
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'ignoring..'
@ -1405,79 +1342,137 @@ async def _pause(
await trio.lowlevel.checkpoint()
return
# if **this** actor is already in debug REPL we want
# to maintain actor-local-task mutex access, so block
# here waiting for the control to be released - this
# -> allows for recursive entries to `tractor.pause()`
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'waiting for release..'
)
await Lock.local_pdb_complete.wait()
await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
Lock.local_task_in_debug = task
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# TODO: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE: MUST it here bc multiple tasks are spawned by any
# one sub-actor AND there will be a race between when the
# root locking task delivers the `Started(pld=LockStatus)`
# and when the REPL is actually entered here. SO ensure
# the codec is set before either are run!
#
with (
# _codec.limit_msg_spec(
# payload_spec=__msg_spec__,
# ) as debug_codec,
trio.CancelScope(shield=shield),
):
# async with trio.open_nursery() as tn:
# tn.cancel_scope.shield = True
try:
# cs: trio.CancelScope = await tn.start(
cs: trio.CancelScope = await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
(task.name, id(task)),
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if Lock._debug_lock.locked():
log.warning(
'attempting to shield-acquire active TTY lock'
f' owned by {Lock.global_actor_in_debug}'
)
# our locker task should be the one in ctx
# with the root actor
assert Lock._debugger_request_cs is cs
# XXX used by the SIGINT handler to check if
# THIS actor is in REPL interaction
Lock.repl = pdb
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
except RuntimeError:
Lock.release()
Lock.global_actor_in_debug = actor.uid
DebugStatus.repl_task = task
DebugStatus.repl = Lock.repl = pdb
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
# enter REPL from root, no TTY locking IPC ctx necessary
_enter_repl_sync(debug_func)
return # next branch is mutex and for subactors
# TODO: need a more robust check for the "root" actor
elif (
not is_root_process()
and actor._parent_chan # a connected child
):
if DebugStatus.repl_task:
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
#
# NOTE: noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some
# subactor that would otherwise not bubble until the
# next checkpoint was hit.
if (
(repl_task := DebugStatus.repl_task)
and
repl_task is task
):
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'ignoring..'
)
await trio.lowlevel.checkpoint()
return
raise
# if **this** actor is already in debug REPL we want
# to maintain actor-local-task mutex access, so block
# here waiting for the control to be released - this
# -> allows for recursive entries to `tractor.pause()`
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'waiting for release..'
)
await DebugStatus.repl_release.wait()
await trio.sleep(0.1)
# enter REPL
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
DebugStatus.repl_task = task
try:
_enter_repl_sync(debug_func)
finally:
Lock.unshield_sigint()
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# TODO: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE: MUST it here bc multiple tasks are spawned by any
# one sub-actor AND there will be a race between when the
# root locking task delivers the `Started(pld=LockStatus)`
# and when the REPL is actually entered here. SO ensure
# the codec is set before either are run!
#
with (
# _codec.limit_msg_spec(
# payload_spec=__msg_spec__,
# ) as debug_codec,
trio.CancelScope(shield=shield),
):
# async with trio.open_nursery() as tn:
# tn.cancel_scope.shield = True
try:
# cs: trio.CancelScope = await tn.start(
cs: trio.CancelScope = await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
(task.name, id(task)),
)
# our locker task should be the one in ctx
# with the root actor
assert DebugStatus.req_cs is cs
# XXX used by the SIGINT handler to check if
# THIS actor is in REPL interaction
Lock.repl = pdb
except RuntimeError:
Lock.release()
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
return
raise
# enter REPL
try:
_enter_repl_sync(debug_func)
finally:
DebugStatus.unshield_sigint()
except BaseException:
log.exception(
'Failed to engage debugger via `_pause()` ??\n'
)
raise
# XXX: apparently we can't do this without showing this frame
@ -1527,45 +1522,16 @@ async def pause(
'''
__tracebackhide__: bool = True
if shield:
# NOTE XXX: even hard coding this inside the `class CancelScope:`
# doesn't seem to work for me!?
# ^ XXX ^
with trio.CancelScope(
shield=shield,
) as cs:
# def _exit(self, *args, **kwargs):
# __tracebackhide__: bool = True
# super().__exit__(*args, **kwargs)
# trio.CancelScope.__enter__.__tracebackhide__ = True
# trio.CancelScope.__exit__.__tracebackhide__ = True
# import types
# with trio.CancelScope(shield=shield) as cs:
# cs.__exit__ = types.MethodType(_exit, cs)
# cs.__exit__.__tracebackhide__ = True
# TODO: LOL, solved this with the `pdb.hideframe` stuff
# at top-of-mod.. so I guess we can just only use this
# block right?
with trio.CancelScope(
shield=shield,
) as cs:
print(f'debug cs is {cs}\n')
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
# NOTE: so the caller can always cancel even if shielded
task_status.started(cs)
return await _pause(
debug_func=debug_func,
shield=True,
task_status=task_status,
**_pause_kwargs
)
else:
# NOTE: so the caller can always manually cancel even
# if shielded!
task_status.started(cs)
return await _pause(
debug_func=debug_func,
shield=False,
shield=shield,
task_status=task_status,
**_pause_kwargs
)
@ -1682,7 +1648,7 @@ def pause_from_sync(
)
)
# TODO: maybe the `trio.current_task()` id/name if avail?
Lock.local_task_in_debug: str = str(threading.current_thread())
DebugStatus.repl_task: str = str(threading.current_thread())
else: # we are presumably the `trio.run()` + main thread
greenback.await_(
@ -1692,7 +1658,7 @@ def pause_from_sync(
hide_tb=hide_tb,
)
)
Lock.local_task_in_debug: str = current_task()
DebugStatus.repl_task: str = current_task()
# TODO: ensure we aggressively make the user aware about
# entering the global ``breakpoint()`` built-in from sync
@ -1754,7 +1720,8 @@ def _post_mortem(
log.pdb(
f'{_crash_msg}\n'
'|\n'
f'|_ {current_task()}\n'
# f'|_ {current_task()}\n'
f'|_ {current_task()} @ {actor.uid}\n'
# f'|_ @{actor.uid}\n'
# TODO: make an `Actor.__repr()__`