forked from goodboy/tractor
Use `DebugStatus` around subactor lock requests
Breaks out all the (sub)actor local conc primitives from `Lock` (which is now only used in and by the root actor) such that there's an explicit distinction between a task that's "consuming" the `Lock` (remotely) vs. the root-side service tasks which do the actual acquire on behalf of the requesters. `DebugStatus` changeover deats: ------ - ------ - move all the actor-local vars over `DebugStatus` including: - move `_trio_handler` and `_orig_sigint_handler` - `local_task_in_debug` now `repl_task` - `_debugger_request_cs` now `req_cs` - `local_pdb_complete` now `repl_release` - drop all ^ fields from `Lock.repr()` obvi.. - move over the `.[un]shield_sigint()` and `.is_main_trio_thread()` methods. - add some new attrs/meths: - `DebugStatus.repl` for the currently running `Pdb` in-actor singleton. - `.repr()` for pprint of state (like `Lock`). - Note: that even when a root-actor task is in REPL, the `DebugStatus` is still used for certain actor-local state mgmt, such as SIGINT handler shielding. - obvi change all lock-requester code bits to now use a `DebugStatus` in their local actor-state instead of `Lock`, i.e. change usage from `Lock` in `._runtime` and `._root`. - use new `Lock.get_locking_task_cs()` API in when checking for sub-in-debug from `._runtime.Actor._stream_handler()`. Unrelated to topic-at-hand tweaks: ------ - ------ - drop the commented bits about hiding `@[a]cm` stack frames from `_debug.pause()` and simplify to only one block with the `shield` passthrough since we already solved the issue with cancel-scopes using `@pdbp.hideframe` B) - this includes all the extra logging about the extra frame for the user (good thing i put in that wasted effort back then eh..) - put the `try/except BaseException` with `log.exception()` around the whole of `._pause()` to ensure we don't miss in-func errors which can cause hangs.. - allow passing in `portal: Portal` to `Actor.start_remote_task()` such that `Portal` task spawning methods are always denoted correctly in terms of `Context.side`. - lotsa logging tweaks, decreasing a bit of noise from `.runtime()`s.runtime_to_msgspec
parent
d0e7610073
commit
77a15ebf19
|
@ -135,7 +135,7 @@ async def open_root_actor(
|
|||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger lock state.
|
||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
# mark top most level process as root actor
|
||||
_state._runtime_vars['_is_root'] = True
|
||||
|
|
|
@ -267,10 +267,13 @@ class Actor:
|
|||
self._listeners: list[trio.abc.Listener] = []
|
||||
self._parent_chan: Channel|None = None
|
||||
self._forkserver_info: tuple|None = None
|
||||
|
||||
# track each child/sub-actor in it's locally
|
||||
# supervising nursery
|
||||
self._actoruid2nursery: dict[
|
||||
tuple[str, str],
|
||||
tuple[str, str], # sub-`Actor.uid`
|
||||
ActorNursery|None,
|
||||
] = {} # type: ignore # noqa
|
||||
] = {}
|
||||
|
||||
# when provided, init the registry addresses property from
|
||||
# input via the validator.
|
||||
|
@ -659,12 +662,18 @@ class Actor:
|
|||
|
||||
# TODO: NEEEDS TO BE TESTED!
|
||||
# actually, no idea if this ever even enters.. XD
|
||||
#
|
||||
# XXX => YES IT DOES, when i was testing ctl-c
|
||||
# from broken debug TTY locking due to
|
||||
# msg-spec races on application using RunVar...
|
||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||
if (
|
||||
pdb_user_uid
|
||||
and local_nursery
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
|
||||
entry: tuple|None = local_nursery._children.get(
|
||||
tuple(pdb_user_uid)
|
||||
)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
@ -674,10 +683,10 @@ class Actor:
|
|||
and poll() is None
|
||||
):
|
||||
log.cancel(
|
||||
'Root actor reports no-more-peers, BUT '
|
||||
'Root actor reports no-more-peers, BUT\n'
|
||||
'a DISCONNECTED child still has the debug '
|
||||
'lock!\n'
|
||||
f'root uid: {self.uid}\n'
|
||||
'lock!\n\n'
|
||||
# f'root uid: {self.uid}\n'
|
||||
f'last disconnected child uid: {uid}\n'
|
||||
f'locking child uid: {pdb_user_uid}\n'
|
||||
)
|
||||
|
@ -703,9 +712,8 @@ class Actor:
|
|||
# if a now stale local task has the TTY lock still
|
||||
# we cancel it to allow servicing other requests for
|
||||
# the lock.
|
||||
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
|
||||
if (
|
||||
db_cs
|
||||
(db_cs := pdb_lock.get_locking_task_cs())
|
||||
and not db_cs.cancel_called
|
||||
and uid == pdb_user_uid
|
||||
):
|
||||
|
@ -742,7 +750,7 @@ class Actor:
|
|||
except KeyError:
|
||||
log.warning(
|
||||
'Ignoring invalid IPC ctx msg!\n\n'
|
||||
f'<= sender: {uid}\n'
|
||||
f'<= sender: {uid}\n\n'
|
||||
# XXX don't need right since it's always in msg?
|
||||
# f'=> cid: {cid}\n\n'
|
||||
|
||||
|
@ -796,7 +804,7 @@ class Actor:
|
|||
cid,
|
||||
# side,
|
||||
)]
|
||||
log.runtime(
|
||||
log.debug(
|
||||
f'Retreived cached IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'cid:{cid}\n'
|
||||
|
@ -835,10 +843,14 @@ class Actor:
|
|||
nsf: NamespacePath,
|
||||
kwargs: dict,
|
||||
|
||||
# determines `Context.side: str`
|
||||
portal: Portal|None = None,
|
||||
|
||||
# IPC channel config
|
||||
msg_buffer_size: int|None = None,
|
||||
allow_overruns: bool = False,
|
||||
load_nsf: bool = False,
|
||||
ack_timeout: float = 3,
|
||||
|
||||
) -> Context:
|
||||
'''
|
||||
|
@ -863,10 +875,12 @@ class Actor:
|
|||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
ctx._portal = portal
|
||||
|
||||
if (
|
||||
'self' in nsf
|
||||
or not load_nsf
|
||||
or
|
||||
not load_nsf
|
||||
):
|
||||
ns, _, func = nsf.partition(':')
|
||||
else:
|
||||
|
@ -874,42 +888,29 @@ class Actor:
|
|||
# -[ ] but, how to do `self:<Actor.meth>`??
|
||||
ns, func = nsf.to_tuple()
|
||||
|
||||
msg = msgtypes.Start(
|
||||
ns=ns,
|
||||
func=func,
|
||||
kwargs=kwargs,
|
||||
uid=self.uid,
|
||||
cid=cid,
|
||||
)
|
||||
log.runtime(
|
||||
'Sending cmd to\n'
|
||||
f'peer: {chan.uid} => \n'
|
||||
'\n'
|
||||
f'=> {ns}.{func}({kwargs})\n'
|
||||
'Sending RPC start msg\n\n'
|
||||
f'=> peer: {chan.uid}\n'
|
||||
f' |_ {ns}.{func}({kwargs})\n'
|
||||
)
|
||||
await chan.send(
|
||||
msgtypes.Start(
|
||||
ns=ns,
|
||||
func=func,
|
||||
kwargs=kwargs,
|
||||
uid=self.uid,
|
||||
cid=cid,
|
||||
)
|
||||
)
|
||||
# {'cmd': (
|
||||
# ns,
|
||||
# func,
|
||||
# kwargs,
|
||||
# self.uid,
|
||||
# cid,
|
||||
# )}
|
||||
# )
|
||||
await chan.send(msg)
|
||||
|
||||
# Wait on first response msg and validate; this should be
|
||||
# immediate.
|
||||
# first_msg: dict = await ctx._recv_chan.receive()
|
||||
# functype: str = first_msg.get('functype')
|
||||
|
||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
||||
# NOTE wait on first `StartAck` response msg and validate;
|
||||
# this should be immediate and does not (yet) wait for the
|
||||
# remote child task to sync via `Context.started()`.
|
||||
with trio.fail_after(ack_timeout):
|
||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
||||
try:
|
||||
functype: str = first_msg.functype
|
||||
except AttributeError:
|
||||
raise unpack_error(first_msg, chan)
|
||||
# if 'error' in first_msg:
|
||||
# raise unpack_error(first_msg, chan)
|
||||
|
||||
if functype not in (
|
||||
'asyncfunc',
|
||||
|
@ -917,7 +918,7 @@ class Actor:
|
|||
'context',
|
||||
):
|
||||
raise ValueError(
|
||||
f'{first_msg} is an invalid response packet?'
|
||||
f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
|
||||
)
|
||||
|
||||
ctx._remote_func_type = functype
|
||||
|
@ -1162,7 +1163,7 @@ class Actor:
|
|||
|
||||
# kill any debugger request task to avoid deadlock
|
||||
# with the root actor in this tree
|
||||
dbcs = _debug.Lock._debugger_request_cs
|
||||
dbcs = _debug.DebugStatus.req_cs
|
||||
if dbcs is not None:
|
||||
msg += (
|
||||
'>> Cancelling active debugger request..\n'
|
||||
|
@ -1237,9 +1238,9 @@ class Actor:
|
|||
except KeyError:
|
||||
# NOTE: during msging race conditions this will often
|
||||
# emit, some examples:
|
||||
# - callee returns a result before cancel-msg/ctxc-raised
|
||||
# - callee self raises ctxc before caller send request,
|
||||
# - callee errors prior to cancel req.
|
||||
# - child returns a result before cancel-msg/ctxc-raised
|
||||
# - child self raises ctxc before parent send request,
|
||||
# - child errors prior to cancel req.
|
||||
log.cancel(
|
||||
'Cancel request invalid, RPC task already completed?\n\n'
|
||||
f'<= canceller: {requesting_uid}\n\n'
|
||||
|
@ -1302,15 +1303,15 @@ class Actor:
|
|||
flow_info: str = (
|
||||
f'<= canceller: {requesting_uid}\n'
|
||||
f'=> ipc-parent: {parent_chan}\n'
|
||||
f' |_{ctx}\n'
|
||||
f'|_{ctx}\n'
|
||||
)
|
||||
log.runtime(
|
||||
'Waiting on RPC task to cancel\n'
|
||||
'Waiting on RPC task to cancel\n\n'
|
||||
f'{flow_info}'
|
||||
)
|
||||
await is_complete.wait()
|
||||
log.runtime(
|
||||
f'Sucessfully cancelled RPC task\n'
|
||||
f'Sucessfully cancelled RPC task\n\n'
|
||||
f'{flow_info}'
|
||||
)
|
||||
return True
|
||||
|
@ -1536,8 +1537,8 @@ async def async_main(
|
|||
|
||||
'''
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger lock state.
|
||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
# on our debugger state.
|
||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
|
||||
is_registered: bool = False
|
||||
try:
|
||||
|
|
|
@ -160,12 +160,6 @@ class Lock:
|
|||
# placeholder for function to set a ``trio.Event`` on debugger exit
|
||||
# pdb_release_hook: Callable | None = None
|
||||
|
||||
_trio_handler: (
|
||||
Callable[[int, FrameType|None], Any]
|
||||
|int
|
||||
| None
|
||||
) = None
|
||||
|
||||
remote_task_in_debug: str|None = None
|
||||
|
||||
@staticmethod
|
||||
|
@ -188,12 +182,6 @@ class Lock:
|
|||
|
||||
Lock._locking_task_cs = cs
|
||||
|
||||
# SUBACTOR ONLY
|
||||
# ------ - -------
|
||||
local_task_in_debug: Task|None = None
|
||||
_debugger_request_cs: trio.CancelScope|None = None
|
||||
local_pdb_complete: trio.Event|None = None
|
||||
|
||||
# ROOT ONLY
|
||||
# ------ - -------
|
||||
# the root-actor-ONLY singletons for,
|
||||
|
@ -214,16 +202,12 @@ class Lock:
|
|||
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||
_blocked: set[tuple[str, str]] = set() # `Actor.uid` block list
|
||||
|
||||
# TODO: should go on `PbpREPL`?
|
||||
_orig_sigint_handler: Callable | None = None
|
||||
|
||||
@classmethod
|
||||
def repr(cls) -> str:
|
||||
|
||||
# both root and subs
|
||||
fields: str = (
|
||||
f'repl: {cls.repl}\n'
|
||||
f'local_repl_task: {cls.local_task_in_debug}\n'
|
||||
)
|
||||
|
||||
if is_root_process():
|
||||
|
@ -238,12 +222,6 @@ class Lock:
|
|||
f'_debug_lock: {cls._debug_lock}\n'
|
||||
f'lock_stats: {lock_stats}\n'
|
||||
)
|
||||
else:
|
||||
fields += (
|
||||
f'local_task_in_debug: {cls.local_task_in_debug}\n'
|
||||
f'local_pdb_complete: {cls.local_pdb_complete}\n'
|
||||
f'_debugger_request_cs: {cls._debugger_request_cs}\n'
|
||||
)
|
||||
|
||||
body: str = textwrap.indent(
|
||||
fields,
|
||||
|
@ -255,7 +233,101 @@ class Lock:
|
|||
')>'
|
||||
)
|
||||
|
||||
# TODO: move to PdbREPL!
|
||||
@classmethod
|
||||
def release(cls):
|
||||
try:
|
||||
if not DebugStatus.is_main_trio_thread():
|
||||
trio.from_thread.run_sync(
|
||||
cls._debug_lock.release
|
||||
)
|
||||
else:
|
||||
cls._debug_lock.release()
|
||||
|
||||
except RuntimeError as rte:
|
||||
# uhhh makes no sense but been seeing the non-owner
|
||||
# release error even though this is definitely the task
|
||||
# that locked?
|
||||
owner = cls._debug_lock.statistics().owner
|
||||
# if (
|
||||
# owner
|
||||
# and
|
||||
# cls.remote_task_in_debug is None
|
||||
# ):
|
||||
# raise RuntimeError(
|
||||
# 'Stale `Lock` detected, no remote task active!?\n'
|
||||
# f'|_{owner}\n'
|
||||
# # f'{Lock}'
|
||||
# ) from rte
|
||||
|
||||
if owner:
|
||||
raise rte
|
||||
|
||||
# OW suppress, can't member why tho .. XD
|
||||
# something somethin corrupts a cancel-scope
|
||||
# somewhere..
|
||||
|
||||
try:
|
||||
# sometimes the ``trio`` might already be terminated in
|
||||
# which case this call will raise.
|
||||
if DebugStatus.repl_release is not None:
|
||||
DebugStatus.repl_release.set()
|
||||
|
||||
finally:
|
||||
cls.repl = None
|
||||
cls.global_actor_in_debug = None
|
||||
|
||||
# restore original sigint handler
|
||||
DebugStatus.unshield_sigint()
|
||||
# actor-local state, irrelevant for non-root.
|
||||
DebugStatus.repl_task = None
|
||||
|
||||
|
||||
# TODO: actually use this instead throughout for subs!
|
||||
class DebugStatus:
|
||||
'''
|
||||
Singleton-state for debugging machinery in a subactor.
|
||||
|
||||
Composes conc primitives for syncing with a root actor to
|
||||
acquire the tree-global (TTY) `Lock` such that only ever one
|
||||
actor's task can have the REPL active at a given time.
|
||||
|
||||
Methods to shield the process' `SIGINT` handler are used
|
||||
whenever a local task is an active REPL.
|
||||
|
||||
'''
|
||||
repl: PdbREPL|None = None
|
||||
repl_task: Task|None = None
|
||||
req_cs: trio.CancelScope|None = None
|
||||
repl_release: trio.Event|None = None
|
||||
|
||||
lock_status: LockStatus|None = None
|
||||
|
||||
_orig_sigint_handler: Callable | None = None
|
||||
_trio_handler: (
|
||||
Callable[[int, FrameType|None], Any]
|
||||
|int
|
||||
| None
|
||||
) = None
|
||||
|
||||
|
||||
@classmethod
|
||||
def repr(cls) -> str:
|
||||
fields: str = (
|
||||
f'repl: {cls.repl}\n'
|
||||
f'repl_task: {cls.repl_task}\n'
|
||||
f'repl_release: {cls.repl_release}\n'
|
||||
f'req_cs: {cls.req_cs}\n'
|
||||
)
|
||||
body: str = textwrap.indent(
|
||||
fields,
|
||||
prefix=' |_',
|
||||
)
|
||||
return (
|
||||
f'<{cls.__name__}(\n'
|
||||
f'{body}'
|
||||
')>'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def shield_sigint(cls):
|
||||
'''
|
||||
|
@ -339,77 +411,6 @@ class Lock:
|
|||
# is not threading.main_thread()
|
||||
# )
|
||||
|
||||
@classmethod
|
||||
def release(cls):
|
||||
try:
|
||||
if not cls.is_main_trio_thread():
|
||||
trio.from_thread.run_sync(
|
||||
cls._debug_lock.release
|
||||
)
|
||||
else:
|
||||
cls._debug_lock.release()
|
||||
|
||||
except RuntimeError as rte:
|
||||
# uhhh makes no sense but been seeing the non-owner
|
||||
# release error even though this is definitely the task
|
||||
# that locked?
|
||||
owner = cls._debug_lock.statistics().owner
|
||||
# if (
|
||||
# owner
|
||||
# and
|
||||
# cls.remote_task_in_debug is None
|
||||
# ):
|
||||
# raise RuntimeError(
|
||||
# 'Stale `Lock` detected, no remote task active!?\n'
|
||||
# f'|_{owner}\n'
|
||||
# # f'{Lock}'
|
||||
# ) from rte
|
||||
|
||||
if owner:
|
||||
raise rte
|
||||
|
||||
# OW suppress, can't member why tho .. XD
|
||||
# something somethin corrupts a cancel-scope
|
||||
# somewhere..
|
||||
|
||||
try:
|
||||
# sometimes the ``trio`` might already be terminated in
|
||||
# which case this call will raise.
|
||||
if cls.local_pdb_complete is not None:
|
||||
cls.local_pdb_complete.set()
|
||||
|
||||
finally:
|
||||
# restore original sigint handler
|
||||
cls.unshield_sigint()
|
||||
cls.repl = None
|
||||
|
||||
# actor-local state, irrelevant for non-root.
|
||||
cls.global_actor_in_debug = None
|
||||
cls.local_task_in_debug = None
|
||||
|
||||
|
||||
# TODO: actually use this instead throughout for subs!
|
||||
class DebugStatus:
|
||||
'''
|
||||
Singleton-state for debugging machinery in a subactor.
|
||||
|
||||
Composes conc primitives for syncing with a root actor to
|
||||
acquire the tree-global (TTY) `Lock` such that only ever one
|
||||
actor's task can have the REPL active at a given time.
|
||||
|
||||
'''
|
||||
repl: PdbREPL|None = None
|
||||
lock_status: LockStatus|None = None
|
||||
|
||||
repl_task: Task|None = None
|
||||
# local_task_in_debug: Task|None = None
|
||||
|
||||
req_cs: trio.CancelScope|None = None
|
||||
# _debugger_request_cs: trio.CancelScope|None = None
|
||||
|
||||
repl_release: trio.Event|None = None
|
||||
# local_pdb_complete: trio.Event|None = None
|
||||
|
||||
|
||||
class TractorConfig(pdbp.DefaultConfig):
|
||||
'''
|
||||
|
@ -445,6 +446,7 @@ class PdbREPL(pdbp.Pdb):
|
|||
|
||||
status = DebugStatus
|
||||
|
||||
|
||||
# def preloop(self):
|
||||
# print('IN PRELOOP')
|
||||
# super().preloop()
|
||||
|
@ -660,16 +662,19 @@ async def lock_tty_for_child(
|
|||
highly reliable at releasing the mutex complete!
|
||||
|
||||
'''
|
||||
|
||||
req_task_uid: tuple = tuple(subactor_task_uid)
|
||||
if req_task_uid in Lock._blocked:
|
||||
raise RuntimeError(
|
||||
f'Double lock request!?\n'
|
||||
f'The same remote task already has an active request for TTY lock ??\n\n'
|
||||
f'task uid: {req_task_uid}\n'
|
||||
f'subactor uid: {subactor_uid}\n'
|
||||
)
|
||||
f'subactor uid: {subactor_uid}\n\n'
|
||||
|
||||
Lock._blocked.add(req_task_uid)
|
||||
'This might be mean that the requesting task '
|
||||
'in `wait_for_parent_stdin_hijack()` may have crashed?\n'
|
||||
'Consider that an internal bug exists given the TTY '
|
||||
'`Lock`ing IPC dialog..\n'
|
||||
)
|
||||
|
||||
root_task_name: str = current_task().name
|
||||
if tuple(subactor_uid) in Lock._blocked:
|
||||
|
@ -695,8 +700,9 @@ async def lock_tty_for_child(
|
|||
f'subactor_uid: {subactor_uid}\n'
|
||||
f'remote task: {subactor_task_uid}\n'
|
||||
)
|
||||
Lock.shield_sigint()
|
||||
DebugStatus.shield_sigint()
|
||||
try:
|
||||
Lock._blocked.add(req_task_uid)
|
||||
with (
|
||||
# NOTE: though a cs is created for every subactor lock
|
||||
# REQUEST in this ctx-child task, only the root-task
|
||||
|
@ -708,6 +714,9 @@ async def lock_tty_for_child(
|
|||
# used to do so!
|
||||
trio.CancelScope(shield=True) as debug_lock_cs,
|
||||
|
||||
# TODO: make this ONLY limit the pld_spec such that we
|
||||
# can on-error-decode-`.pld: Raw` fields in
|
||||
# `Context._deliver_msg()`?
|
||||
_codec.limit_msg_spec(
|
||||
payload_spec=__msg_spec__,
|
||||
) as codec,
|
||||
|
@ -763,8 +772,9 @@ async def lock_tty_for_child(
|
|||
|
||||
finally:
|
||||
debug_lock_cs.cancel()
|
||||
Lock._blocked.remove(req_task_uid)
|
||||
Lock.set_locking_task_cs(None)
|
||||
Lock.unshield_sigint()
|
||||
DebugStatus.unshield_sigint()
|
||||
|
||||
|
||||
@cm
|
||||
|
@ -817,7 +827,7 @@ async def wait_for_parent_stdin_hijack(
|
|||
trio.CancelScope(shield=True) as cs,
|
||||
apply_debug_codec(),
|
||||
):
|
||||
Lock._debugger_request_cs = cs
|
||||
DebugStatus.req_cs = cs
|
||||
try:
|
||||
# TODO: merge into sync async with ?
|
||||
async with get_root() as portal:
|
||||
|
@ -829,7 +839,7 @@ async def wait_for_parent_stdin_hijack(
|
|||
|
||||
) as (ctx, resp):
|
||||
log.pdb(
|
||||
'Subactor locked TTY per msg\n'
|
||||
'Subactor locked TTY with msg\n\n'
|
||||
f'{resp}\n'
|
||||
)
|
||||
assert resp.subactor_uid == actor_uid
|
||||
|
@ -837,12 +847,12 @@ async def wait_for_parent_stdin_hijack(
|
|||
|
||||
async with ctx.open_stream() as stream:
|
||||
try: # to unblock local caller
|
||||
assert Lock.local_pdb_complete
|
||||
assert DebugStatus.repl_release
|
||||
task_status.started(cs)
|
||||
|
||||
# wait for local task to exit and
|
||||
# release the REPL
|
||||
await Lock.local_pdb_complete.wait()
|
||||
await DebugStatus.repl_release.wait()
|
||||
|
||||
finally:
|
||||
await stream.send(
|
||||
|
@ -867,12 +877,12 @@ async def wait_for_parent_stdin_hijack(
|
|||
raise
|
||||
|
||||
finally:
|
||||
Lock.local_task_in_debug = None
|
||||
DebugStatus.repl_task = None
|
||||
log.debug('Exiting debugger TTY lock request func from child')
|
||||
|
||||
|
||||
log.cancel('Reverting SIGINT handler!')
|
||||
Lock.unshield_sigint()
|
||||
DebugStatus.unshield_sigint()
|
||||
|
||||
|
||||
|
||||
|
@ -901,7 +911,7 @@ def mk_mpdb() -> PdbREPL:
|
|||
# in which case schedule the SIGINT shielding override
|
||||
# to in the main thread.
|
||||
# https://docs.python.org/3/library/signal.html#signals-and-threads
|
||||
Lock.shield_sigint()
|
||||
DebugStatus.shield_sigint()
|
||||
|
||||
# XXX: These are the important flags mentioned in
|
||||
# https://github.com/python-trio/trio/issues/1155
|
||||
|
@ -1036,7 +1046,8 @@ def shield_sigint_handler(
|
|||
)
|
||||
|
||||
log.warning(message)
|
||||
Lock.unshield_sigint()
|
||||
# Lock.unshield_sigint()
|
||||
DebugStatus.unshield_sigint()
|
||||
case_handled = True
|
||||
|
||||
else:
|
||||
|
@ -1064,7 +1075,7 @@ def shield_sigint_handler(
|
|||
if maybe_stale_lock_cs:
|
||||
lock_cs.cancel()
|
||||
|
||||
Lock.unshield_sigint()
|
||||
DebugStatus.unshield_sigint()
|
||||
case_handled = True
|
||||
|
||||
# child actor that has locked the debugger
|
||||
|
@ -1086,11 +1097,11 @@ def shield_sigint_handler(
|
|||
f'{uid_in_debug}\n'
|
||||
'Allowing SIGINT propagation..'
|
||||
)
|
||||
Lock.unshield_sigint()
|
||||
DebugStatus.unshield_sigint()
|
||||
# do_cancel()
|
||||
case_handled = True
|
||||
|
||||
task: str|None = Lock.local_task_in_debug
|
||||
task: str|None = DebugStatus.repl_task
|
||||
if (
|
||||
task
|
||||
and
|
||||
|
@ -1124,7 +1135,7 @@ def shield_sigint_handler(
|
|||
+
|
||||
'Reverting handler to `trio` default!\n'
|
||||
)
|
||||
Lock.unshield_sigint()
|
||||
DebugStatus.unshield_sigint()
|
||||
case_handled = True
|
||||
|
||||
# XXX ensure that the reverted-to-handler actually is
|
||||
|
@ -1200,32 +1211,15 @@ def _set_trace(
|
|||
pdb
|
||||
and actor is not None
|
||||
)
|
||||
# or shield
|
||||
):
|
||||
msg: str = _pause_msg
|
||||
if shield:
|
||||
# log.warning(
|
||||
msg = (
|
||||
'\n\n'
|
||||
' ------ - ------\n'
|
||||
'Debugger invoked with `shield=True` so an extra\n'
|
||||
'`trio.CancelScope.__exit__()` frame is shown..\n'
|
||||
'\n'
|
||||
'Try going up one frame to see your pause point!\n'
|
||||
'\n'
|
||||
' SORRY we need to fix this!\n'
|
||||
' ------ - ------\n\n'
|
||||
) + msg
|
||||
|
||||
# pdbp.set_trace()
|
||||
# TODO: maybe print the actor supervion tree up to the
|
||||
# root here? Bo
|
||||
|
||||
log.pdb(
|
||||
f'{msg}\n'
|
||||
f'{_pause_msg}\n'
|
||||
'|\n'
|
||||
# TODO: make an `Actor.__repr()__`
|
||||
# f'|_ {current_task()} @ {actor.name}\n'
|
||||
f'|_ {current_task()}\n'
|
||||
f'|_ {current_task()} @ {actor.uid}\n'
|
||||
)
|
||||
# no f!#$&* idea, but when we're in async land
|
||||
# we need 2x frames up?
|
||||
|
@ -1286,11 +1280,11 @@ async def _pause(
|
|||
# task_name: str = task.name
|
||||
|
||||
if (
|
||||
not Lock.local_pdb_complete
|
||||
not DebugStatus.repl_release
|
||||
or
|
||||
Lock.local_pdb_complete.is_set()
|
||||
DebugStatus.repl_release.is_set()
|
||||
):
|
||||
Lock.local_pdb_complete = trio.Event()
|
||||
DebugStatus.repl_release = trio.Event()
|
||||
|
||||
if debug_func is not None:
|
||||
debug_func = partial(debug_func)
|
||||
|
@ -1333,71 +1327,14 @@ async def _pause(
|
|||
Lock.release()
|
||||
raise
|
||||
|
||||
except BaseException:
|
||||
log.exception(
|
||||
'Failed to engage debugger via `_pause()` ??\n'
|
||||
)
|
||||
raise
|
||||
try:
|
||||
if is_root_process():
|
||||
|
||||
if is_root_process():
|
||||
|
||||
# we also wait in the root-parent for any child that
|
||||
# may have the tty locked prior
|
||||
# TODO: wait, what about multiple root tasks acquiring it though?
|
||||
if Lock.global_actor_in_debug == actor.uid:
|
||||
# re-entrant root process already has it: noop.
|
||||
log.warning(
|
||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||
f'ignoring..'
|
||||
)
|
||||
await trio.lowlevel.checkpoint()
|
||||
return
|
||||
|
||||
# XXX: since we need to enter pdb synchronously below,
|
||||
# we have to release the lock manually from pdb completion
|
||||
# callbacks. Can't think of a nicer way then this atm.
|
||||
if Lock._debug_lock.locked():
|
||||
log.warning(
|
||||
'attempting to shield-acquire active TTY lock'
|
||||
f' owned by {Lock.global_actor_in_debug}'
|
||||
)
|
||||
|
||||
# must shield here to avoid hitting a ``Cancelled`` and
|
||||
# a child getting stuck bc we clobbered the tty
|
||||
with trio.CancelScope(shield=True):
|
||||
await Lock._debug_lock.acquire()
|
||||
else:
|
||||
# may be cancelled
|
||||
await Lock._debug_lock.acquire()
|
||||
|
||||
Lock.global_actor_in_debug = actor.uid
|
||||
Lock.local_task_in_debug = task
|
||||
Lock.repl = pdb
|
||||
|
||||
# enter REPL from root, no TTY locking IPC ctx necessary
|
||||
_enter_repl_sync(debug_func)
|
||||
return # next branch is mutex and for subactors
|
||||
|
||||
# TODO: need a more robust check for the "root" actor
|
||||
elif (
|
||||
not is_root_process()
|
||||
and actor._parent_chan # a connected child
|
||||
):
|
||||
if Lock.local_task_in_debug:
|
||||
|
||||
# Recurrence entry case: this task already has the lock and
|
||||
# is likely recurrently entering a breakpoint
|
||||
#
|
||||
# NOTE: noop on recurrent entry case but we want to trigger
|
||||
# a checkpoint to allow other actors error-propagate and
|
||||
# potetially avoid infinite re-entries in some
|
||||
# subactor that would otherwise not bubble until the
|
||||
# next checkpoint was hit.
|
||||
if (
|
||||
(repl_task := Lock.local_task_in_debug)
|
||||
and
|
||||
repl_task is task
|
||||
):
|
||||
# we also wait in the root-parent for any child that
|
||||
# may have the tty locked prior
|
||||
# TODO: wait, what about multiple root tasks acquiring it though?
|
||||
if Lock.global_actor_in_debug == actor.uid:
|
||||
# re-entrant root process already has it: noop.
|
||||
log.warning(
|
||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||
f'ignoring..'
|
||||
|
@ -1405,79 +1342,137 @@ async def _pause(
|
|||
await trio.lowlevel.checkpoint()
|
||||
return
|
||||
|
||||
# if **this** actor is already in debug REPL we want
|
||||
# to maintain actor-local-task mutex access, so block
|
||||
# here waiting for the control to be released - this
|
||||
# -> allows for recursive entries to `tractor.pause()`
|
||||
log.warning(
|
||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||
f'waiting for release..'
|
||||
)
|
||||
await Lock.local_pdb_complete.wait()
|
||||
await trio.sleep(0.1)
|
||||
|
||||
# mark local actor as "in debug mode" to avoid recurrent
|
||||
# entries/requests to the root process
|
||||
Lock.local_task_in_debug = task
|
||||
|
||||
# this **must** be awaited by the caller and is done using the
|
||||
# root nursery so that the debugger can continue to run without
|
||||
# being restricted by the scope of a new task nursery.
|
||||
|
||||
# TODO: if we want to debug a trio.Cancelled triggered exception
|
||||
# we have to figure out how to avoid having the service nursery
|
||||
# cancel on this task start? I *think* this works below:
|
||||
# ```python
|
||||
# actor._service_n.cancel_scope.shield = shield
|
||||
# ```
|
||||
# but not entirely sure if that's a sane way to implement it?
|
||||
|
||||
# NOTE: MUST it here bc multiple tasks are spawned by any
|
||||
# one sub-actor AND there will be a race between when the
|
||||
# root locking task delivers the `Started(pld=LockStatus)`
|
||||
# and when the REPL is actually entered here. SO ensure
|
||||
# the codec is set before either are run!
|
||||
#
|
||||
with (
|
||||
# _codec.limit_msg_spec(
|
||||
# payload_spec=__msg_spec__,
|
||||
# ) as debug_codec,
|
||||
trio.CancelScope(shield=shield),
|
||||
):
|
||||
# async with trio.open_nursery() as tn:
|
||||
# tn.cancel_scope.shield = True
|
||||
try:
|
||||
# cs: trio.CancelScope = await tn.start(
|
||||
cs: trio.CancelScope = await actor._service_n.start(
|
||||
wait_for_parent_stdin_hijack,
|
||||
actor.uid,
|
||||
(task.name, id(task)),
|
||||
# XXX: since we need to enter pdb synchronously below,
|
||||
# we have to release the lock manually from pdb completion
|
||||
# callbacks. Can't think of a nicer way then this atm.
|
||||
if Lock._debug_lock.locked():
|
||||
log.warning(
|
||||
'attempting to shield-acquire active TTY lock'
|
||||
f' owned by {Lock.global_actor_in_debug}'
|
||||
)
|
||||
# our locker task should be the one in ctx
|
||||
# with the root actor
|
||||
assert Lock._debugger_request_cs is cs
|
||||
|
||||
# XXX used by the SIGINT handler to check if
|
||||
# THIS actor is in REPL interaction
|
||||
Lock.repl = pdb
|
||||
# must shield here to avoid hitting a ``Cancelled`` and
|
||||
# a child getting stuck bc we clobbered the tty
|
||||
with trio.CancelScope(shield=True):
|
||||
await Lock._debug_lock.acquire()
|
||||
else:
|
||||
# may be cancelled
|
||||
await Lock._debug_lock.acquire()
|
||||
|
||||
except RuntimeError:
|
||||
Lock.release()
|
||||
Lock.global_actor_in_debug = actor.uid
|
||||
DebugStatus.repl_task = task
|
||||
DebugStatus.repl = Lock.repl = pdb
|
||||
|
||||
if actor._cancel_called:
|
||||
# service nursery won't be usable and we
|
||||
# don't want to lock up the root either way since
|
||||
# we're in (the midst of) cancellation.
|
||||
# enter REPL from root, no TTY locking IPC ctx necessary
|
||||
_enter_repl_sync(debug_func)
|
||||
return # next branch is mutex and for subactors
|
||||
|
||||
# TODO: need a more robust check for the "root" actor
|
||||
elif (
|
||||
not is_root_process()
|
||||
and actor._parent_chan # a connected child
|
||||
):
|
||||
if DebugStatus.repl_task:
|
||||
|
||||
# Recurrence entry case: this task already has the lock and
|
||||
# is likely recurrently entering a breakpoint
|
||||
#
|
||||
# NOTE: noop on recurrent entry case but we want to trigger
|
||||
# a checkpoint to allow other actors error-propagate and
|
||||
# potetially avoid infinite re-entries in some
|
||||
# subactor that would otherwise not bubble until the
|
||||
# next checkpoint was hit.
|
||||
if (
|
||||
(repl_task := DebugStatus.repl_task)
|
||||
and
|
||||
repl_task is task
|
||||
):
|
||||
log.warning(
|
||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||
f'ignoring..'
|
||||
)
|
||||
await trio.lowlevel.checkpoint()
|
||||
return
|
||||
|
||||
raise
|
||||
# if **this** actor is already in debug REPL we want
|
||||
# to maintain actor-local-task mutex access, so block
|
||||
# here waiting for the control to be released - this
|
||||
# -> allows for recursive entries to `tractor.pause()`
|
||||
log.warning(
|
||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||
f'waiting for release..'
|
||||
)
|
||||
await DebugStatus.repl_release.wait()
|
||||
await trio.sleep(0.1)
|
||||
|
||||
# enter REPL
|
||||
# mark local actor as "in debug mode" to avoid recurrent
|
||||
# entries/requests to the root process
|
||||
DebugStatus.repl_task = task
|
||||
|
||||
try:
|
||||
_enter_repl_sync(debug_func)
|
||||
finally:
|
||||
Lock.unshield_sigint()
|
||||
# this **must** be awaited by the caller and is done using the
|
||||
# root nursery so that the debugger can continue to run without
|
||||
# being restricted by the scope of a new task nursery.
|
||||
|
||||
# TODO: if we want to debug a trio.Cancelled triggered exception
|
||||
# we have to figure out how to avoid having the service nursery
|
||||
# cancel on this task start? I *think* this works below:
|
||||
# ```python
|
||||
# actor._service_n.cancel_scope.shield = shield
|
||||
# ```
|
||||
# but not entirely sure if that's a sane way to implement it?
|
||||
|
||||
# NOTE: MUST it here bc multiple tasks are spawned by any
|
||||
# one sub-actor AND there will be a race between when the
|
||||
# root locking task delivers the `Started(pld=LockStatus)`
|
||||
# and when the REPL is actually entered here. SO ensure
|
||||
# the codec is set before either are run!
|
||||
#
|
||||
with (
|
||||
# _codec.limit_msg_spec(
|
||||
# payload_spec=__msg_spec__,
|
||||
# ) as debug_codec,
|
||||
trio.CancelScope(shield=shield),
|
||||
):
|
||||
# async with trio.open_nursery() as tn:
|
||||
# tn.cancel_scope.shield = True
|
||||
try:
|
||||
# cs: trio.CancelScope = await tn.start(
|
||||
cs: trio.CancelScope = await actor._service_n.start(
|
||||
wait_for_parent_stdin_hijack,
|
||||
actor.uid,
|
||||
(task.name, id(task)),
|
||||
)
|
||||
# our locker task should be the one in ctx
|
||||
# with the root actor
|
||||
assert DebugStatus.req_cs is cs
|
||||
|
||||
# XXX used by the SIGINT handler to check if
|
||||
# THIS actor is in REPL interaction
|
||||
Lock.repl = pdb
|
||||
|
||||
except RuntimeError:
|
||||
Lock.release()
|
||||
|
||||
if actor._cancel_called:
|
||||
# service nursery won't be usable and we
|
||||
# don't want to lock up the root either way since
|
||||
# we're in (the midst of) cancellation.
|
||||
return
|
||||
|
||||
raise
|
||||
|
||||
# enter REPL
|
||||
|
||||
try:
|
||||
_enter_repl_sync(debug_func)
|
||||
finally:
|
||||
DebugStatus.unshield_sigint()
|
||||
|
||||
except BaseException:
|
||||
log.exception(
|
||||
'Failed to engage debugger via `_pause()` ??\n'
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
# XXX: apparently we can't do this without showing this frame
|
||||
|
@ -1527,45 +1522,16 @@ async def pause(
|
|||
'''
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
if shield:
|
||||
# NOTE XXX: even hard coding this inside the `class CancelScope:`
|
||||
# doesn't seem to work for me!?
|
||||
# ^ XXX ^
|
||||
with trio.CancelScope(
|
||||
shield=shield,
|
||||
) as cs:
|
||||
|
||||
# def _exit(self, *args, **kwargs):
|
||||
# __tracebackhide__: bool = True
|
||||
# super().__exit__(*args, **kwargs)
|
||||
|
||||
# trio.CancelScope.__enter__.__tracebackhide__ = True
|
||||
# trio.CancelScope.__exit__.__tracebackhide__ = True
|
||||
|
||||
# import types
|
||||
# with trio.CancelScope(shield=shield) as cs:
|
||||
# cs.__exit__ = types.MethodType(_exit, cs)
|
||||
# cs.__exit__.__tracebackhide__ = True
|
||||
|
||||
# TODO: LOL, solved this with the `pdb.hideframe` stuff
|
||||
# at top-of-mod.. so I guess we can just only use this
|
||||
# block right?
|
||||
with trio.CancelScope(
|
||||
shield=shield,
|
||||
) as cs:
|
||||
print(f'debug cs is {cs}\n')
|
||||
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
|
||||
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
|
||||
|
||||
# NOTE: so the caller can always cancel even if shielded
|
||||
task_status.started(cs)
|
||||
return await _pause(
|
||||
debug_func=debug_func,
|
||||
shield=True,
|
||||
task_status=task_status,
|
||||
**_pause_kwargs
|
||||
)
|
||||
else:
|
||||
# NOTE: so the caller can always manually cancel even
|
||||
# if shielded!
|
||||
task_status.started(cs)
|
||||
return await _pause(
|
||||
debug_func=debug_func,
|
||||
shield=False,
|
||||
shield=shield,
|
||||
task_status=task_status,
|
||||
**_pause_kwargs
|
||||
)
|
||||
|
@ -1682,7 +1648,7 @@ def pause_from_sync(
|
|||
)
|
||||
)
|
||||
# TODO: maybe the `trio.current_task()` id/name if avail?
|
||||
Lock.local_task_in_debug: str = str(threading.current_thread())
|
||||
DebugStatus.repl_task: str = str(threading.current_thread())
|
||||
|
||||
else: # we are presumably the `trio.run()` + main thread
|
||||
greenback.await_(
|
||||
|
@ -1692,7 +1658,7 @@ def pause_from_sync(
|
|||
hide_tb=hide_tb,
|
||||
)
|
||||
)
|
||||
Lock.local_task_in_debug: str = current_task()
|
||||
DebugStatus.repl_task: str = current_task()
|
||||
|
||||
# TODO: ensure we aggressively make the user aware about
|
||||
# entering the global ``breakpoint()`` built-in from sync
|
||||
|
@ -1754,7 +1720,8 @@ def _post_mortem(
|
|||
log.pdb(
|
||||
f'{_crash_msg}\n'
|
||||
'|\n'
|
||||
f'|_ {current_task()}\n'
|
||||
# f'|_ {current_task()}\n'
|
||||
f'|_ {current_task()} @ {actor.uid}\n'
|
||||
|
||||
# f'|_ @{actor.uid}\n'
|
||||
# TODO: make an `Actor.__repr()__`
|
||||
|
|
Loading…
Reference in New Issue