forked from goodboy/tractor
Use `DebugStatus` around subactor lock requests
Breaks out all the (sub)actor local conc primitives from `Lock` (which is now only used in and by the root actor) such that there's an explicit distinction between a task that's "consuming" the `Lock` (remotely) vs. the root-side service tasks which do the actual acquire on behalf of the requesters. `DebugStatus` changeover deats: ------ - ------ - move all the actor-local vars over `DebugStatus` including: - move `_trio_handler` and `_orig_sigint_handler` - `local_task_in_debug` now `repl_task` - `_debugger_request_cs` now `req_cs` - `local_pdb_complete` now `repl_release` - drop all ^ fields from `Lock.repr()` obvi.. - move over the `.[un]shield_sigint()` and `.is_main_trio_thread()` methods. - add some new attrs/meths: - `DebugStatus.repl` for the currently running `Pdb` in-actor singleton. - `.repr()` for pprint of state (like `Lock`). - Note: that even when a root-actor task is in REPL, the `DebugStatus` is still used for certain actor-local state mgmt, such as SIGINT handler shielding. - obvi change all lock-requester code bits to now use a `DebugStatus` in their local actor-state instead of `Lock`, i.e. change usage from `Lock` in `._runtime` and `._root`. - use new `Lock.get_locking_task_cs()` API in when checking for sub-in-debug from `._runtime.Actor._stream_handler()`. Unrelated to topic-at-hand tweaks: ------ - ------ - drop the commented bits about hiding `@[a]cm` stack frames from `_debug.pause()` and simplify to only one block with the `shield` passthrough since we already solved the issue with cancel-scopes using `@pdbp.hideframe` B) - this includes all the extra logging about the extra frame for the user (good thing i put in that wasted effort back then eh..) - put the `try/except BaseException` with `log.exception()` around the whole of `._pause()` to ensure we don't miss in-func errors which can cause hangs.. - allow passing in `portal: Portal` to `Actor.start_remote_task()` such that `Portal` task spawning methods are always denoted correctly in terms of `Context.side`. - lotsa logging tweaks, decreasing a bit of noise from `.runtime()`s.runtime_to_msgspec
parent
d0e7610073
commit
77a15ebf19
|
@ -135,7 +135,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
# on our debugger lock state.
|
# on our debugger lock state.
|
||||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
|
||||||
# mark top most level process as root actor
|
# mark top most level process as root actor
|
||||||
_state._runtime_vars['_is_root'] = True
|
_state._runtime_vars['_is_root'] = True
|
||||||
|
|
|
@ -267,10 +267,13 @@ class Actor:
|
||||||
self._listeners: list[trio.abc.Listener] = []
|
self._listeners: list[trio.abc.Listener] = []
|
||||||
self._parent_chan: Channel|None = None
|
self._parent_chan: Channel|None = None
|
||||||
self._forkserver_info: tuple|None = None
|
self._forkserver_info: tuple|None = None
|
||||||
|
|
||||||
|
# track each child/sub-actor in it's locally
|
||||||
|
# supervising nursery
|
||||||
self._actoruid2nursery: dict[
|
self._actoruid2nursery: dict[
|
||||||
tuple[str, str],
|
tuple[str, str], # sub-`Actor.uid`
|
||||||
ActorNursery|None,
|
ActorNursery|None,
|
||||||
] = {} # type: ignore # noqa
|
] = {}
|
||||||
|
|
||||||
# when provided, init the registry addresses property from
|
# when provided, init the registry addresses property from
|
||||||
# input via the validator.
|
# input via the validator.
|
||||||
|
@ -659,12 +662,18 @@ class Actor:
|
||||||
|
|
||||||
# TODO: NEEEDS TO BE TESTED!
|
# TODO: NEEEDS TO BE TESTED!
|
||||||
# actually, no idea if this ever even enters.. XD
|
# actually, no idea if this ever even enters.. XD
|
||||||
|
#
|
||||||
|
# XXX => YES IT DOES, when i was testing ctl-c
|
||||||
|
# from broken debug TTY locking due to
|
||||||
|
# msg-spec races on application using RunVar...
|
||||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||||
if (
|
if (
|
||||||
pdb_user_uid
|
pdb_user_uid
|
||||||
and local_nursery
|
and local_nursery
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
|
entry: tuple|None = local_nursery._children.get(
|
||||||
|
tuple(pdb_user_uid)
|
||||||
|
)
|
||||||
if entry:
|
if entry:
|
||||||
proc: trio.Process
|
proc: trio.Process
|
||||||
_, proc, _ = entry
|
_, proc, _ = entry
|
||||||
|
@ -674,10 +683,10 @@ class Actor:
|
||||||
and poll() is None
|
and poll() is None
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Root actor reports no-more-peers, BUT '
|
'Root actor reports no-more-peers, BUT\n'
|
||||||
'a DISCONNECTED child still has the debug '
|
'a DISCONNECTED child still has the debug '
|
||||||
'lock!\n'
|
'lock!\n\n'
|
||||||
f'root uid: {self.uid}\n'
|
# f'root uid: {self.uid}\n'
|
||||||
f'last disconnected child uid: {uid}\n'
|
f'last disconnected child uid: {uid}\n'
|
||||||
f'locking child uid: {pdb_user_uid}\n'
|
f'locking child uid: {pdb_user_uid}\n'
|
||||||
)
|
)
|
||||||
|
@ -703,9 +712,8 @@ class Actor:
|
||||||
# if a now stale local task has the TTY lock still
|
# if a now stale local task has the TTY lock still
|
||||||
# we cancel it to allow servicing other requests for
|
# we cancel it to allow servicing other requests for
|
||||||
# the lock.
|
# the lock.
|
||||||
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
|
|
||||||
if (
|
if (
|
||||||
db_cs
|
(db_cs := pdb_lock.get_locking_task_cs())
|
||||||
and not db_cs.cancel_called
|
and not db_cs.cancel_called
|
||||||
and uid == pdb_user_uid
|
and uid == pdb_user_uid
|
||||||
):
|
):
|
||||||
|
@ -742,7 +750,7 @@ class Actor:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
f'<= sender: {uid}\n'
|
f'<= sender: {uid}\n\n'
|
||||||
# XXX don't need right since it's always in msg?
|
# XXX don't need right since it's always in msg?
|
||||||
# f'=> cid: {cid}\n\n'
|
# f'=> cid: {cid}\n\n'
|
||||||
|
|
||||||
|
@ -796,7 +804,7 @@ class Actor:
|
||||||
cid,
|
cid,
|
||||||
# side,
|
# side,
|
||||||
)]
|
)]
|
||||||
log.runtime(
|
log.debug(
|
||||||
f'Retreived cached IPC ctx for\n'
|
f'Retreived cached IPC ctx for\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'cid:{cid}\n'
|
f'cid:{cid}\n'
|
||||||
|
@ -835,10 +843,14 @@ class Actor:
|
||||||
nsf: NamespacePath,
|
nsf: NamespacePath,
|
||||||
kwargs: dict,
|
kwargs: dict,
|
||||||
|
|
||||||
|
# determines `Context.side: str`
|
||||||
|
portal: Portal|None = None,
|
||||||
|
|
||||||
# IPC channel config
|
# IPC channel config
|
||||||
msg_buffer_size: int|None = None,
|
msg_buffer_size: int|None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
load_nsf: bool = False,
|
load_nsf: bool = False,
|
||||||
|
ack_timeout: float = 3,
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -863,10 +875,12 @@ class Actor:
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
ctx._portal = portal
|
||||||
|
|
||||||
if (
|
if (
|
||||||
'self' in nsf
|
'self' in nsf
|
||||||
or not load_nsf
|
or
|
||||||
|
not load_nsf
|
||||||
):
|
):
|
||||||
ns, _, func = nsf.partition(':')
|
ns, _, func = nsf.partition(':')
|
||||||
else:
|
else:
|
||||||
|
@ -874,42 +888,29 @@ class Actor:
|
||||||
# -[ ] but, how to do `self:<Actor.meth>`??
|
# -[ ] but, how to do `self:<Actor.meth>`??
|
||||||
ns, func = nsf.to_tuple()
|
ns, func = nsf.to_tuple()
|
||||||
|
|
||||||
log.runtime(
|
msg = msgtypes.Start(
|
||||||
'Sending cmd to\n'
|
|
||||||
f'peer: {chan.uid} => \n'
|
|
||||||
'\n'
|
|
||||||
f'=> {ns}.{func}({kwargs})\n'
|
|
||||||
)
|
|
||||||
await chan.send(
|
|
||||||
msgtypes.Start(
|
|
||||||
ns=ns,
|
ns=ns,
|
||||||
func=func,
|
func=func,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
uid=self.uid,
|
uid=self.uid,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
)
|
)
|
||||||
|
log.runtime(
|
||||||
|
'Sending RPC start msg\n\n'
|
||||||
|
f'=> peer: {chan.uid}\n'
|
||||||
|
f' |_ {ns}.{func}({kwargs})\n'
|
||||||
)
|
)
|
||||||
# {'cmd': (
|
await chan.send(msg)
|
||||||
# ns,
|
|
||||||
# func,
|
|
||||||
# kwargs,
|
|
||||||
# self.uid,
|
|
||||||
# cid,
|
|
||||||
# )}
|
|
||||||
# )
|
|
||||||
|
|
||||||
# Wait on first response msg and validate; this should be
|
|
||||||
# immediate.
|
|
||||||
# first_msg: dict = await ctx._recv_chan.receive()
|
|
||||||
# functype: str = first_msg.get('functype')
|
|
||||||
|
|
||||||
|
# NOTE wait on first `StartAck` response msg and validate;
|
||||||
|
# this should be immediate and does not (yet) wait for the
|
||||||
|
# remote child task to sync via `Context.started()`.
|
||||||
|
with trio.fail_after(ack_timeout):
|
||||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
||||||
try:
|
try:
|
||||||
functype: str = first_msg.functype
|
functype: str = first_msg.functype
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise unpack_error(first_msg, chan)
|
raise unpack_error(first_msg, chan)
|
||||||
# if 'error' in first_msg:
|
|
||||||
# raise unpack_error(first_msg, chan)
|
|
||||||
|
|
||||||
if functype not in (
|
if functype not in (
|
||||||
'asyncfunc',
|
'asyncfunc',
|
||||||
|
@ -917,7 +918,7 @@ class Actor:
|
||||||
'context',
|
'context',
|
||||||
):
|
):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'{first_msg} is an invalid response packet?'
|
f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx._remote_func_type = functype
|
ctx._remote_func_type = functype
|
||||||
|
@ -1162,7 +1163,7 @@ class Actor:
|
||||||
|
|
||||||
# kill any debugger request task to avoid deadlock
|
# kill any debugger request task to avoid deadlock
|
||||||
# with the root actor in this tree
|
# with the root actor in this tree
|
||||||
dbcs = _debug.Lock._debugger_request_cs
|
dbcs = _debug.DebugStatus.req_cs
|
||||||
if dbcs is not None:
|
if dbcs is not None:
|
||||||
msg += (
|
msg += (
|
||||||
'>> Cancelling active debugger request..\n'
|
'>> Cancelling active debugger request..\n'
|
||||||
|
@ -1237,9 +1238,9 @@ class Actor:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# NOTE: during msging race conditions this will often
|
# NOTE: during msging race conditions this will often
|
||||||
# emit, some examples:
|
# emit, some examples:
|
||||||
# - callee returns a result before cancel-msg/ctxc-raised
|
# - child returns a result before cancel-msg/ctxc-raised
|
||||||
# - callee self raises ctxc before caller send request,
|
# - child self raises ctxc before parent send request,
|
||||||
# - callee errors prior to cancel req.
|
# - child errors prior to cancel req.
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancel request invalid, RPC task already completed?\n\n'
|
'Cancel request invalid, RPC task already completed?\n\n'
|
||||||
f'<= canceller: {requesting_uid}\n\n'
|
f'<= canceller: {requesting_uid}\n\n'
|
||||||
|
@ -1305,12 +1306,12 @@ class Actor:
|
||||||
f'|_{ctx}\n'
|
f'|_{ctx}\n'
|
||||||
)
|
)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on RPC task to cancel\n'
|
'Waiting on RPC task to cancel\n\n'
|
||||||
f'{flow_info}'
|
f'{flow_info}'
|
||||||
)
|
)
|
||||||
await is_complete.wait()
|
await is_complete.wait()
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Sucessfully cancelled RPC task\n'
|
f'Sucessfully cancelled RPC task\n\n'
|
||||||
f'{flow_info}'
|
f'{flow_info}'
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
@ -1536,8 +1537,8 @@ async def async_main(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
# on our debugger lock state.
|
# on our debugger state.
|
||||||
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
|
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
|
||||||
is_registered: bool = False
|
is_registered: bool = False
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -160,12 +160,6 @@ class Lock:
|
||||||
# placeholder for function to set a ``trio.Event`` on debugger exit
|
# placeholder for function to set a ``trio.Event`` on debugger exit
|
||||||
# pdb_release_hook: Callable | None = None
|
# pdb_release_hook: Callable | None = None
|
||||||
|
|
||||||
_trio_handler: (
|
|
||||||
Callable[[int, FrameType|None], Any]
|
|
||||||
|int
|
|
||||||
| None
|
|
||||||
) = None
|
|
||||||
|
|
||||||
remote_task_in_debug: str|None = None
|
remote_task_in_debug: str|None = None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -188,12 +182,6 @@ class Lock:
|
||||||
|
|
||||||
Lock._locking_task_cs = cs
|
Lock._locking_task_cs = cs
|
||||||
|
|
||||||
# SUBACTOR ONLY
|
|
||||||
# ------ - -------
|
|
||||||
local_task_in_debug: Task|None = None
|
|
||||||
_debugger_request_cs: trio.CancelScope|None = None
|
|
||||||
local_pdb_complete: trio.Event|None = None
|
|
||||||
|
|
||||||
# ROOT ONLY
|
# ROOT ONLY
|
||||||
# ------ - -------
|
# ------ - -------
|
||||||
# the root-actor-ONLY singletons for,
|
# the root-actor-ONLY singletons for,
|
||||||
|
@ -214,16 +202,12 @@ class Lock:
|
||||||
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||||
_blocked: set[tuple[str, str]] = set() # `Actor.uid` block list
|
_blocked: set[tuple[str, str]] = set() # `Actor.uid` block list
|
||||||
|
|
||||||
# TODO: should go on `PbpREPL`?
|
|
||||||
_orig_sigint_handler: Callable | None = None
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def repr(cls) -> str:
|
def repr(cls) -> str:
|
||||||
|
|
||||||
# both root and subs
|
# both root and subs
|
||||||
fields: str = (
|
fields: str = (
|
||||||
f'repl: {cls.repl}\n'
|
f'repl: {cls.repl}\n'
|
||||||
f'local_repl_task: {cls.local_task_in_debug}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if is_root_process():
|
if is_root_process():
|
||||||
|
@ -238,12 +222,6 @@ class Lock:
|
||||||
f'_debug_lock: {cls._debug_lock}\n'
|
f'_debug_lock: {cls._debug_lock}\n'
|
||||||
f'lock_stats: {lock_stats}\n'
|
f'lock_stats: {lock_stats}\n'
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
fields += (
|
|
||||||
f'local_task_in_debug: {cls.local_task_in_debug}\n'
|
|
||||||
f'local_pdb_complete: {cls.local_pdb_complete}\n'
|
|
||||||
f'_debugger_request_cs: {cls._debugger_request_cs}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
body: str = textwrap.indent(
|
body: str = textwrap.indent(
|
||||||
fields,
|
fields,
|
||||||
|
@ -255,7 +233,101 @@ class Lock:
|
||||||
')>'
|
')>'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: move to PdbREPL!
|
@classmethod
|
||||||
|
def release(cls):
|
||||||
|
try:
|
||||||
|
if not DebugStatus.is_main_trio_thread():
|
||||||
|
trio.from_thread.run_sync(
|
||||||
|
cls._debug_lock.release
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
cls._debug_lock.release()
|
||||||
|
|
||||||
|
except RuntimeError as rte:
|
||||||
|
# uhhh makes no sense but been seeing the non-owner
|
||||||
|
# release error even though this is definitely the task
|
||||||
|
# that locked?
|
||||||
|
owner = cls._debug_lock.statistics().owner
|
||||||
|
# if (
|
||||||
|
# owner
|
||||||
|
# and
|
||||||
|
# cls.remote_task_in_debug is None
|
||||||
|
# ):
|
||||||
|
# raise RuntimeError(
|
||||||
|
# 'Stale `Lock` detected, no remote task active!?\n'
|
||||||
|
# f'|_{owner}\n'
|
||||||
|
# # f'{Lock}'
|
||||||
|
# ) from rte
|
||||||
|
|
||||||
|
if owner:
|
||||||
|
raise rte
|
||||||
|
|
||||||
|
# OW suppress, can't member why tho .. XD
|
||||||
|
# something somethin corrupts a cancel-scope
|
||||||
|
# somewhere..
|
||||||
|
|
||||||
|
try:
|
||||||
|
# sometimes the ``trio`` might already be terminated in
|
||||||
|
# which case this call will raise.
|
||||||
|
if DebugStatus.repl_release is not None:
|
||||||
|
DebugStatus.repl_release.set()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cls.repl = None
|
||||||
|
cls.global_actor_in_debug = None
|
||||||
|
|
||||||
|
# restore original sigint handler
|
||||||
|
DebugStatus.unshield_sigint()
|
||||||
|
# actor-local state, irrelevant for non-root.
|
||||||
|
DebugStatus.repl_task = None
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: actually use this instead throughout for subs!
|
||||||
|
class DebugStatus:
|
||||||
|
'''
|
||||||
|
Singleton-state for debugging machinery in a subactor.
|
||||||
|
|
||||||
|
Composes conc primitives for syncing with a root actor to
|
||||||
|
acquire the tree-global (TTY) `Lock` such that only ever one
|
||||||
|
actor's task can have the REPL active at a given time.
|
||||||
|
|
||||||
|
Methods to shield the process' `SIGINT` handler are used
|
||||||
|
whenever a local task is an active REPL.
|
||||||
|
|
||||||
|
'''
|
||||||
|
repl: PdbREPL|None = None
|
||||||
|
repl_task: Task|None = None
|
||||||
|
req_cs: trio.CancelScope|None = None
|
||||||
|
repl_release: trio.Event|None = None
|
||||||
|
|
||||||
|
lock_status: LockStatus|None = None
|
||||||
|
|
||||||
|
_orig_sigint_handler: Callable | None = None
|
||||||
|
_trio_handler: (
|
||||||
|
Callable[[int, FrameType|None], Any]
|
||||||
|
|int
|
||||||
|
| None
|
||||||
|
) = None
|
||||||
|
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def repr(cls) -> str:
|
||||||
|
fields: str = (
|
||||||
|
f'repl: {cls.repl}\n'
|
||||||
|
f'repl_task: {cls.repl_task}\n'
|
||||||
|
f'repl_release: {cls.repl_release}\n'
|
||||||
|
f'req_cs: {cls.req_cs}\n'
|
||||||
|
)
|
||||||
|
body: str = textwrap.indent(
|
||||||
|
fields,
|
||||||
|
prefix=' |_',
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
f'<{cls.__name__}(\n'
|
||||||
|
f'{body}'
|
||||||
|
')>'
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def shield_sigint(cls):
|
def shield_sigint(cls):
|
||||||
'''
|
'''
|
||||||
|
@ -339,77 +411,6 @@ class Lock:
|
||||||
# is not threading.main_thread()
|
# is not threading.main_thread()
|
||||||
# )
|
# )
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def release(cls):
|
|
||||||
try:
|
|
||||||
if not cls.is_main_trio_thread():
|
|
||||||
trio.from_thread.run_sync(
|
|
||||||
cls._debug_lock.release
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
cls._debug_lock.release()
|
|
||||||
|
|
||||||
except RuntimeError as rte:
|
|
||||||
# uhhh makes no sense but been seeing the non-owner
|
|
||||||
# release error even though this is definitely the task
|
|
||||||
# that locked?
|
|
||||||
owner = cls._debug_lock.statistics().owner
|
|
||||||
# if (
|
|
||||||
# owner
|
|
||||||
# and
|
|
||||||
# cls.remote_task_in_debug is None
|
|
||||||
# ):
|
|
||||||
# raise RuntimeError(
|
|
||||||
# 'Stale `Lock` detected, no remote task active!?\n'
|
|
||||||
# f'|_{owner}\n'
|
|
||||||
# # f'{Lock}'
|
|
||||||
# ) from rte
|
|
||||||
|
|
||||||
if owner:
|
|
||||||
raise rte
|
|
||||||
|
|
||||||
# OW suppress, can't member why tho .. XD
|
|
||||||
# something somethin corrupts a cancel-scope
|
|
||||||
# somewhere..
|
|
||||||
|
|
||||||
try:
|
|
||||||
# sometimes the ``trio`` might already be terminated in
|
|
||||||
# which case this call will raise.
|
|
||||||
if cls.local_pdb_complete is not None:
|
|
||||||
cls.local_pdb_complete.set()
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# restore original sigint handler
|
|
||||||
cls.unshield_sigint()
|
|
||||||
cls.repl = None
|
|
||||||
|
|
||||||
# actor-local state, irrelevant for non-root.
|
|
||||||
cls.global_actor_in_debug = None
|
|
||||||
cls.local_task_in_debug = None
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: actually use this instead throughout for subs!
|
|
||||||
class DebugStatus:
|
|
||||||
'''
|
|
||||||
Singleton-state for debugging machinery in a subactor.
|
|
||||||
|
|
||||||
Composes conc primitives for syncing with a root actor to
|
|
||||||
acquire the tree-global (TTY) `Lock` such that only ever one
|
|
||||||
actor's task can have the REPL active at a given time.
|
|
||||||
|
|
||||||
'''
|
|
||||||
repl: PdbREPL|None = None
|
|
||||||
lock_status: LockStatus|None = None
|
|
||||||
|
|
||||||
repl_task: Task|None = None
|
|
||||||
# local_task_in_debug: Task|None = None
|
|
||||||
|
|
||||||
req_cs: trio.CancelScope|None = None
|
|
||||||
# _debugger_request_cs: trio.CancelScope|None = None
|
|
||||||
|
|
||||||
repl_release: trio.Event|None = None
|
|
||||||
# local_pdb_complete: trio.Event|None = None
|
|
||||||
|
|
||||||
|
|
||||||
class TractorConfig(pdbp.DefaultConfig):
|
class TractorConfig(pdbp.DefaultConfig):
|
||||||
'''
|
'''
|
||||||
|
@ -445,6 +446,7 @@ class PdbREPL(pdbp.Pdb):
|
||||||
|
|
||||||
status = DebugStatus
|
status = DebugStatus
|
||||||
|
|
||||||
|
|
||||||
# def preloop(self):
|
# def preloop(self):
|
||||||
# print('IN PRELOOP')
|
# print('IN PRELOOP')
|
||||||
# super().preloop()
|
# super().preloop()
|
||||||
|
@ -660,16 +662,19 @@ async def lock_tty_for_child(
|
||||||
highly reliable at releasing the mutex complete!
|
highly reliable at releasing the mutex complete!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
req_task_uid: tuple = tuple(subactor_task_uid)
|
req_task_uid: tuple = tuple(subactor_task_uid)
|
||||||
if req_task_uid in Lock._blocked:
|
if req_task_uid in Lock._blocked:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
f'Double lock request!?\n'
|
||||||
f'The same remote task already has an active request for TTY lock ??\n\n'
|
f'The same remote task already has an active request for TTY lock ??\n\n'
|
||||||
f'task uid: {req_task_uid}\n'
|
f'task uid: {req_task_uid}\n'
|
||||||
f'subactor uid: {subactor_uid}\n'
|
f'subactor uid: {subactor_uid}\n\n'
|
||||||
)
|
|
||||||
|
|
||||||
Lock._blocked.add(req_task_uid)
|
'This might be mean that the requesting task '
|
||||||
|
'in `wait_for_parent_stdin_hijack()` may have crashed?\n'
|
||||||
|
'Consider that an internal bug exists given the TTY '
|
||||||
|
'`Lock`ing IPC dialog..\n'
|
||||||
|
)
|
||||||
|
|
||||||
root_task_name: str = current_task().name
|
root_task_name: str = current_task().name
|
||||||
if tuple(subactor_uid) in Lock._blocked:
|
if tuple(subactor_uid) in Lock._blocked:
|
||||||
|
@ -695,8 +700,9 @@ async def lock_tty_for_child(
|
||||||
f'subactor_uid: {subactor_uid}\n'
|
f'subactor_uid: {subactor_uid}\n'
|
||||||
f'remote task: {subactor_task_uid}\n'
|
f'remote task: {subactor_task_uid}\n'
|
||||||
)
|
)
|
||||||
Lock.shield_sigint()
|
DebugStatus.shield_sigint()
|
||||||
try:
|
try:
|
||||||
|
Lock._blocked.add(req_task_uid)
|
||||||
with (
|
with (
|
||||||
# NOTE: though a cs is created for every subactor lock
|
# NOTE: though a cs is created for every subactor lock
|
||||||
# REQUEST in this ctx-child task, only the root-task
|
# REQUEST in this ctx-child task, only the root-task
|
||||||
|
@ -708,6 +714,9 @@ async def lock_tty_for_child(
|
||||||
# used to do so!
|
# used to do so!
|
||||||
trio.CancelScope(shield=True) as debug_lock_cs,
|
trio.CancelScope(shield=True) as debug_lock_cs,
|
||||||
|
|
||||||
|
# TODO: make this ONLY limit the pld_spec such that we
|
||||||
|
# can on-error-decode-`.pld: Raw` fields in
|
||||||
|
# `Context._deliver_msg()`?
|
||||||
_codec.limit_msg_spec(
|
_codec.limit_msg_spec(
|
||||||
payload_spec=__msg_spec__,
|
payload_spec=__msg_spec__,
|
||||||
) as codec,
|
) as codec,
|
||||||
|
@ -763,8 +772,9 @@ async def lock_tty_for_child(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
debug_lock_cs.cancel()
|
debug_lock_cs.cancel()
|
||||||
|
Lock._blocked.remove(req_task_uid)
|
||||||
Lock.set_locking_task_cs(None)
|
Lock.set_locking_task_cs(None)
|
||||||
Lock.unshield_sigint()
|
DebugStatus.unshield_sigint()
|
||||||
|
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
|
@ -817,7 +827,7 @@ async def wait_for_parent_stdin_hijack(
|
||||||
trio.CancelScope(shield=True) as cs,
|
trio.CancelScope(shield=True) as cs,
|
||||||
apply_debug_codec(),
|
apply_debug_codec(),
|
||||||
):
|
):
|
||||||
Lock._debugger_request_cs = cs
|
DebugStatus.req_cs = cs
|
||||||
try:
|
try:
|
||||||
# TODO: merge into sync async with ?
|
# TODO: merge into sync async with ?
|
||||||
async with get_root() as portal:
|
async with get_root() as portal:
|
||||||
|
@ -829,7 +839,7 @@ async def wait_for_parent_stdin_hijack(
|
||||||
|
|
||||||
) as (ctx, resp):
|
) as (ctx, resp):
|
||||||
log.pdb(
|
log.pdb(
|
||||||
'Subactor locked TTY per msg\n'
|
'Subactor locked TTY with msg\n\n'
|
||||||
f'{resp}\n'
|
f'{resp}\n'
|
||||||
)
|
)
|
||||||
assert resp.subactor_uid == actor_uid
|
assert resp.subactor_uid == actor_uid
|
||||||
|
@ -837,12 +847,12 @@ async def wait_for_parent_stdin_hijack(
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
try: # to unblock local caller
|
try: # to unblock local caller
|
||||||
assert Lock.local_pdb_complete
|
assert DebugStatus.repl_release
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
|
|
||||||
# wait for local task to exit and
|
# wait for local task to exit and
|
||||||
# release the REPL
|
# release the REPL
|
||||||
await Lock.local_pdb_complete.wait()
|
await DebugStatus.repl_release.wait()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
await stream.send(
|
await stream.send(
|
||||||
|
@ -867,12 +877,12 @@ async def wait_for_parent_stdin_hijack(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
Lock.local_task_in_debug = None
|
DebugStatus.repl_task = None
|
||||||
log.debug('Exiting debugger TTY lock request func from child')
|
log.debug('Exiting debugger TTY lock request func from child')
|
||||||
|
|
||||||
|
|
||||||
log.cancel('Reverting SIGINT handler!')
|
log.cancel('Reverting SIGINT handler!')
|
||||||
Lock.unshield_sigint()
|
DebugStatus.unshield_sigint()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -901,7 +911,7 @@ def mk_mpdb() -> PdbREPL:
|
||||||
# in which case schedule the SIGINT shielding override
|
# in which case schedule the SIGINT shielding override
|
||||||
# to in the main thread.
|
# to in the main thread.
|
||||||
# https://docs.python.org/3/library/signal.html#signals-and-threads
|
# https://docs.python.org/3/library/signal.html#signals-and-threads
|
||||||
Lock.shield_sigint()
|
DebugStatus.shield_sigint()
|
||||||
|
|
||||||
# XXX: These are the important flags mentioned in
|
# XXX: These are the important flags mentioned in
|
||||||
# https://github.com/python-trio/trio/issues/1155
|
# https://github.com/python-trio/trio/issues/1155
|
||||||
|
@ -1036,7 +1046,8 @@ def shield_sigint_handler(
|
||||||
)
|
)
|
||||||
|
|
||||||
log.warning(message)
|
log.warning(message)
|
||||||
Lock.unshield_sigint()
|
# Lock.unshield_sigint()
|
||||||
|
DebugStatus.unshield_sigint()
|
||||||
case_handled = True
|
case_handled = True
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -1064,7 +1075,7 @@ def shield_sigint_handler(
|
||||||
if maybe_stale_lock_cs:
|
if maybe_stale_lock_cs:
|
||||||
lock_cs.cancel()
|
lock_cs.cancel()
|
||||||
|
|
||||||
Lock.unshield_sigint()
|
DebugStatus.unshield_sigint()
|
||||||
case_handled = True
|
case_handled = True
|
||||||
|
|
||||||
# child actor that has locked the debugger
|
# child actor that has locked the debugger
|
||||||
|
@ -1086,11 +1097,11 @@ def shield_sigint_handler(
|
||||||
f'{uid_in_debug}\n'
|
f'{uid_in_debug}\n'
|
||||||
'Allowing SIGINT propagation..'
|
'Allowing SIGINT propagation..'
|
||||||
)
|
)
|
||||||
Lock.unshield_sigint()
|
DebugStatus.unshield_sigint()
|
||||||
# do_cancel()
|
# do_cancel()
|
||||||
case_handled = True
|
case_handled = True
|
||||||
|
|
||||||
task: str|None = Lock.local_task_in_debug
|
task: str|None = DebugStatus.repl_task
|
||||||
if (
|
if (
|
||||||
task
|
task
|
||||||
and
|
and
|
||||||
|
@ -1124,7 +1135,7 @@ def shield_sigint_handler(
|
||||||
+
|
+
|
||||||
'Reverting handler to `trio` default!\n'
|
'Reverting handler to `trio` default!\n'
|
||||||
)
|
)
|
||||||
Lock.unshield_sigint()
|
DebugStatus.unshield_sigint()
|
||||||
case_handled = True
|
case_handled = True
|
||||||
|
|
||||||
# XXX ensure that the reverted-to-handler actually is
|
# XXX ensure that the reverted-to-handler actually is
|
||||||
|
@ -1200,32 +1211,15 @@ def _set_trace(
|
||||||
pdb
|
pdb
|
||||||
and actor is not None
|
and actor is not None
|
||||||
)
|
)
|
||||||
# or shield
|
|
||||||
):
|
):
|
||||||
msg: str = _pause_msg
|
|
||||||
if shield:
|
|
||||||
# log.warning(
|
|
||||||
msg = (
|
|
||||||
'\n\n'
|
|
||||||
' ------ - ------\n'
|
|
||||||
'Debugger invoked with `shield=True` so an extra\n'
|
|
||||||
'`trio.CancelScope.__exit__()` frame is shown..\n'
|
|
||||||
'\n'
|
|
||||||
'Try going up one frame to see your pause point!\n'
|
|
||||||
'\n'
|
|
||||||
' SORRY we need to fix this!\n'
|
|
||||||
' ------ - ------\n\n'
|
|
||||||
) + msg
|
|
||||||
|
|
||||||
# pdbp.set_trace()
|
|
||||||
# TODO: maybe print the actor supervion tree up to the
|
# TODO: maybe print the actor supervion tree up to the
|
||||||
# root here? Bo
|
# root here? Bo
|
||||||
|
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{msg}\n'
|
f'{_pause_msg}\n'
|
||||||
'|\n'
|
'|\n'
|
||||||
# TODO: make an `Actor.__repr()__`
|
# TODO: make an `Actor.__repr()__`
|
||||||
# f'|_ {current_task()} @ {actor.name}\n'
|
f'|_ {current_task()} @ {actor.uid}\n'
|
||||||
f'|_ {current_task()}\n'
|
|
||||||
)
|
)
|
||||||
# no f!#$&* idea, but when we're in async land
|
# no f!#$&* idea, but when we're in async land
|
||||||
# we need 2x frames up?
|
# we need 2x frames up?
|
||||||
|
@ -1286,11 +1280,11 @@ async def _pause(
|
||||||
# task_name: str = task.name
|
# task_name: str = task.name
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not Lock.local_pdb_complete
|
not DebugStatus.repl_release
|
||||||
or
|
or
|
||||||
Lock.local_pdb_complete.is_set()
|
DebugStatus.repl_release.is_set()
|
||||||
):
|
):
|
||||||
Lock.local_pdb_complete = trio.Event()
|
DebugStatus.repl_release = trio.Event()
|
||||||
|
|
||||||
if debug_func is not None:
|
if debug_func is not None:
|
||||||
debug_func = partial(debug_func)
|
debug_func = partial(debug_func)
|
||||||
|
@ -1333,12 +1327,7 @@ async def _pause(
|
||||||
Lock.release()
|
Lock.release()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except BaseException:
|
try:
|
||||||
log.exception(
|
|
||||||
'Failed to engage debugger via `_pause()` ??\n'
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
if is_root_process():
|
if is_root_process():
|
||||||
|
|
||||||
# we also wait in the root-parent for any child that
|
# we also wait in the root-parent for any child that
|
||||||
|
@ -1371,8 +1360,8 @@ async def _pause(
|
||||||
await Lock._debug_lock.acquire()
|
await Lock._debug_lock.acquire()
|
||||||
|
|
||||||
Lock.global_actor_in_debug = actor.uid
|
Lock.global_actor_in_debug = actor.uid
|
||||||
Lock.local_task_in_debug = task
|
DebugStatus.repl_task = task
|
||||||
Lock.repl = pdb
|
DebugStatus.repl = Lock.repl = pdb
|
||||||
|
|
||||||
# enter REPL from root, no TTY locking IPC ctx necessary
|
# enter REPL from root, no TTY locking IPC ctx necessary
|
||||||
_enter_repl_sync(debug_func)
|
_enter_repl_sync(debug_func)
|
||||||
|
@ -1383,7 +1372,7 @@ async def _pause(
|
||||||
not is_root_process()
|
not is_root_process()
|
||||||
and actor._parent_chan # a connected child
|
and actor._parent_chan # a connected child
|
||||||
):
|
):
|
||||||
if Lock.local_task_in_debug:
|
if DebugStatus.repl_task:
|
||||||
|
|
||||||
# Recurrence entry case: this task already has the lock and
|
# Recurrence entry case: this task already has the lock and
|
||||||
# is likely recurrently entering a breakpoint
|
# is likely recurrently entering a breakpoint
|
||||||
|
@ -1394,7 +1383,7 @@ async def _pause(
|
||||||
# subactor that would otherwise not bubble until the
|
# subactor that would otherwise not bubble until the
|
||||||
# next checkpoint was hit.
|
# next checkpoint was hit.
|
||||||
if (
|
if (
|
||||||
(repl_task := Lock.local_task_in_debug)
|
(repl_task := DebugStatus.repl_task)
|
||||||
and
|
and
|
||||||
repl_task is task
|
repl_task is task
|
||||||
):
|
):
|
||||||
|
@ -1413,12 +1402,12 @@ async def _pause(
|
||||||
f'{task.name}@{actor.uid} already has TTY lock\n'
|
f'{task.name}@{actor.uid} already has TTY lock\n'
|
||||||
f'waiting for release..'
|
f'waiting for release..'
|
||||||
)
|
)
|
||||||
await Lock.local_pdb_complete.wait()
|
await DebugStatus.repl_release.wait()
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
# mark local actor as "in debug mode" to avoid recurrent
|
# mark local actor as "in debug mode" to avoid recurrent
|
||||||
# entries/requests to the root process
|
# entries/requests to the root process
|
||||||
Lock.local_task_in_debug = task
|
DebugStatus.repl_task = task
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# this **must** be awaited by the caller and is done using the
|
||||||
# root nursery so that the debugger can continue to run without
|
# root nursery so that the debugger can continue to run without
|
||||||
|
@ -1455,7 +1444,7 @@ async def _pause(
|
||||||
)
|
)
|
||||||
# our locker task should be the one in ctx
|
# our locker task should be the one in ctx
|
||||||
# with the root actor
|
# with the root actor
|
||||||
assert Lock._debugger_request_cs is cs
|
assert DebugStatus.req_cs is cs
|
||||||
|
|
||||||
# XXX used by the SIGINT handler to check if
|
# XXX used by the SIGINT handler to check if
|
||||||
# THIS actor is in REPL interaction
|
# THIS actor is in REPL interaction
|
||||||
|
@ -1477,7 +1466,13 @@ async def _pause(
|
||||||
try:
|
try:
|
||||||
_enter_repl_sync(debug_func)
|
_enter_repl_sync(debug_func)
|
||||||
finally:
|
finally:
|
||||||
Lock.unshield_sigint()
|
DebugStatus.unshield_sigint()
|
||||||
|
|
||||||
|
except BaseException:
|
||||||
|
log.exception(
|
||||||
|
'Failed to engage debugger via `_pause()` ??\n'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
# XXX: apparently we can't do this without showing this frame
|
# XXX: apparently we can't do this without showing this frame
|
||||||
|
@ -1527,45 +1522,16 @@ async def pause(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = True
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
if shield:
|
|
||||||
# NOTE XXX: even hard coding this inside the `class CancelScope:`
|
|
||||||
# doesn't seem to work for me!?
|
|
||||||
# ^ XXX ^
|
|
||||||
|
|
||||||
# def _exit(self, *args, **kwargs):
|
|
||||||
# __tracebackhide__: bool = True
|
|
||||||
# super().__exit__(*args, **kwargs)
|
|
||||||
|
|
||||||
# trio.CancelScope.__enter__.__tracebackhide__ = True
|
|
||||||
# trio.CancelScope.__exit__.__tracebackhide__ = True
|
|
||||||
|
|
||||||
# import types
|
|
||||||
# with trio.CancelScope(shield=shield) as cs:
|
|
||||||
# cs.__exit__ = types.MethodType(_exit, cs)
|
|
||||||
# cs.__exit__.__tracebackhide__ = True
|
|
||||||
|
|
||||||
# TODO: LOL, solved this with the `pdb.hideframe` stuff
|
|
||||||
# at top-of-mod.. so I guess we can just only use this
|
|
||||||
# block right?
|
|
||||||
with trio.CancelScope(
|
with trio.CancelScope(
|
||||||
shield=shield,
|
shield=shield,
|
||||||
) as cs:
|
) as cs:
|
||||||
print(f'debug cs is {cs}\n')
|
|
||||||
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
|
|
||||||
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
|
|
||||||
|
|
||||||
# NOTE: so the caller can always cancel even if shielded
|
# NOTE: so the caller can always manually cancel even
|
||||||
|
# if shielded!
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
return await _pause(
|
return await _pause(
|
||||||
debug_func=debug_func,
|
debug_func=debug_func,
|
||||||
shield=True,
|
shield=shield,
|
||||||
task_status=task_status,
|
|
||||||
**_pause_kwargs
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return await _pause(
|
|
||||||
debug_func=debug_func,
|
|
||||||
shield=False,
|
|
||||||
task_status=task_status,
|
task_status=task_status,
|
||||||
**_pause_kwargs
|
**_pause_kwargs
|
||||||
)
|
)
|
||||||
|
@ -1682,7 +1648,7 @@ def pause_from_sync(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO: maybe the `trio.current_task()` id/name if avail?
|
# TODO: maybe the `trio.current_task()` id/name if avail?
|
||||||
Lock.local_task_in_debug: str = str(threading.current_thread())
|
DebugStatus.repl_task: str = str(threading.current_thread())
|
||||||
|
|
||||||
else: # we are presumably the `trio.run()` + main thread
|
else: # we are presumably the `trio.run()` + main thread
|
||||||
greenback.await_(
|
greenback.await_(
|
||||||
|
@ -1692,7 +1658,7 @@ def pause_from_sync(
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
Lock.local_task_in_debug: str = current_task()
|
DebugStatus.repl_task: str = current_task()
|
||||||
|
|
||||||
# TODO: ensure we aggressively make the user aware about
|
# TODO: ensure we aggressively make the user aware about
|
||||||
# entering the global ``breakpoint()`` built-in from sync
|
# entering the global ``breakpoint()`` built-in from sync
|
||||||
|
@ -1754,7 +1720,8 @@ def _post_mortem(
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{_crash_msg}\n'
|
f'{_crash_msg}\n'
|
||||||
'|\n'
|
'|\n'
|
||||||
f'|_ {current_task()}\n'
|
# f'|_ {current_task()}\n'
|
||||||
|
f'|_ {current_task()} @ {actor.uid}\n'
|
||||||
|
|
||||||
# f'|_ @{actor.uid}\n'
|
# f'|_ @{actor.uid}\n'
|
||||||
# TODO: make an `Actor.__repr()__`
|
# TODO: make an `Actor.__repr()__`
|
||||||
|
|
Loading…
Reference in New Issue