forked from goodboy/tractor
1
0
Fork 0

Use `DebugStatus` around subactor lock requests

Breaks out all the (sub)actor local conc primitives from `Lock` (which
is now only used in and by the root actor) such that there's an explicit
distinction between a task that's "consuming" the `Lock` (remotely) vs.
the root-side service tasks which do the actual acquire on behalf of the
requesters.

`DebugStatus` changeover deats:
------ - ------
- move all the actor-local vars over `DebugStatus` including:
  - move `_trio_handler` and `_orig_sigint_handler`
  - `local_task_in_debug` now `repl_task`
  - `_debugger_request_cs` now `req_cs`
  - `local_pdb_complete` now `repl_release`
- drop all ^ fields from `Lock.repr()` obvi..
- move over the `.[un]shield_sigint()` and
  `.is_main_trio_thread()` methods.
- add some new attrs/meths:
  - `DebugStatus.repl` for the currently running `Pdb` in-actor
    singleton.
  - `.repr()` for pprint of state (like `Lock`).
- Note: that even when a root-actor task is in REPL, the `DebugStatus`
  is still used for certain actor-local state mgmt, such as SIGINT
  handler shielding.
- obvi change all lock-requester code bits to now use a `DebugStatus` in
  their local actor-state instead of `Lock`, i.e. change usage from
  `Lock` in `._runtime` and `._root`.
- use new `Lock.get_locking_task_cs()` API in when checking for
  sub-in-debug from `._runtime.Actor._stream_handler()`.

Unrelated to topic-at-hand tweaks:
------ - ------
- drop the commented bits about hiding `@[a]cm` stack frames from
  `_debug.pause()` and simplify to only one block with the `shield`
  passthrough since we already solved the issue with cancel-scopes using
  `@pdbp.hideframe` B)
  - this includes all the extra logging about the extra frame for the
    user (good thing i put in that wasted effort back then eh..)
- put the `try/except BaseException` with `log.exception()` around the
  whole of `._pause()` to ensure we don't miss in-func errors which can
  cause hangs..
- allow passing in `portal: Portal` to
  `Actor.start_remote_task()` such that `Portal` task spawning methods
  are always denoted correctly in terms of `Context.side`.
- lotsa logging tweaks, decreasing a bit of noise from `.runtime()`s.
runtime_to_msgspec
Tyler Goodlet 2024-04-18 12:47:28 -04:00
parent d0e7610073
commit 77a15ebf19
3 changed files with 322 additions and 354 deletions

View File

@ -135,7 +135,7 @@ async def open_root_actor(
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor # mark top most level process as root actor
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True

View File

@ -267,10 +267,13 @@ class Actor:
self._listeners: list[trio.abc.Listener] = [] self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel|None = None self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None self._forkserver_info: tuple|None = None
# track each child/sub-actor in it's locally
# supervising nursery
self._actoruid2nursery: dict[ self._actoruid2nursery: dict[
tuple[str, str], tuple[str, str], # sub-`Actor.uid`
ActorNursery|None, ActorNursery|None,
] = {} # type: ignore # noqa ] = {}
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
# input via the validator. # input via the validator.
@ -659,12 +662,18 @@ class Actor:
# TODO: NEEEDS TO BE TESTED! # TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD # actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if ( if (
pdb_user_uid pdb_user_uid
and local_nursery and local_nursery
): ):
entry: tuple|None = local_nursery._children.get(pdb_user_uid) entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
if entry: if entry:
proc: trio.Process proc: trio.Process
_, proc, _ = entry _, proc, _ = entry
@ -674,10 +683,10 @@ class Actor:
and poll() is None and poll() is None
): ):
log.cancel( log.cancel(
'Root actor reports no-more-peers, BUT ' 'Root actor reports no-more-peers, BUT\n'
'a DISCONNECTED child still has the debug ' 'a DISCONNECTED child still has the debug '
'lock!\n' 'lock!\n\n'
f'root uid: {self.uid}\n' # f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n' f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n' f'locking child uid: {pdb_user_uid}\n'
) )
@ -703,9 +712,8 @@ class Actor:
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if ( if (
db_cs (db_cs := pdb_lock.get_locking_task_cs())
and not db_cs.cancel_called and not db_cs.cancel_called
and uid == pdb_user_uid and uid == pdb_user_uid
): ):
@ -742,7 +750,7 @@ class Actor:
except KeyError: except KeyError:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n' f'<= sender: {uid}\n\n'
# XXX don't need right since it's always in msg? # XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n' # f'=> cid: {cid}\n\n'
@ -796,7 +804,7 @@ class Actor:
cid, cid,
# side, # side,
)] )]
log.runtime( log.debug(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
@ -835,10 +843,14 @@ class Actor:
nsf: NamespacePath, nsf: NamespacePath,
kwargs: dict, kwargs: dict,
# determines `Context.side: str`
portal: Portal|None = None,
# IPC channel config # IPC channel config
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
ack_timeout: float = 3,
) -> Context: ) -> Context:
''' '''
@ -863,10 +875,12 @@ class Actor:
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
ctx._portal = portal
if ( if (
'self' in nsf 'self' in nsf
or not load_nsf or
not load_nsf
): ):
ns, _, func = nsf.partition(':') ns, _, func = nsf.partition(':')
else: else:
@ -874,42 +888,29 @@ class Actor:
# -[ ] but, how to do `self:<Actor.meth>`?? # -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple() ns, func = nsf.to_tuple()
log.runtime( msg = msgtypes.Start(
'Sending cmd to\n'
f'peer: {chan.uid} => \n'
'\n'
f'=> {ns}.{func}({kwargs})\n'
)
await chan.send(
msgtypes.Start(
ns=ns, ns=ns,
func=func, func=func,
kwargs=kwargs, kwargs=kwargs,
uid=self.uid, uid=self.uid,
cid=cid, cid=cid,
) )
log.runtime(
'Sending RPC start msg\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n'
) )
# {'cmd': ( await chan.send(msg)
# ns,
# func,
# kwargs,
# self.uid,
# cid,
# )}
# )
# Wait on first response msg and validate; this should be
# immediate.
# first_msg: dict = await ctx._recv_chan.receive()
# functype: str = first_msg.get('functype')
# NOTE wait on first `StartAck` response msg and validate;
# this should be immediate and does not (yet) wait for the
# remote child task to sync via `Context.started()`.
with trio.fail_after(ack_timeout):
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
try: try:
functype: str = first_msg.functype functype: str = first_msg.functype
except AttributeError: except AttributeError:
raise unpack_error(first_msg, chan) raise unpack_error(first_msg, chan)
# if 'error' in first_msg:
# raise unpack_error(first_msg, chan)
if functype not in ( if functype not in (
'asyncfunc', 'asyncfunc',
@ -917,7 +918,7 @@ class Actor:
'context', 'context',
): ):
raise ValueError( raise ValueError(
f'{first_msg} is an invalid response packet?' f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
) )
ctx._remote_func_type = functype ctx._remote_func_type = functype
@ -1162,7 +1163,7 @@ class Actor:
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
dbcs = _debug.Lock._debugger_request_cs dbcs = _debug.DebugStatus.req_cs
if dbcs is not None: if dbcs is not None:
msg += ( msg += (
'>> Cancelling active debugger request..\n' '>> Cancelling active debugger request..\n'
@ -1237,9 +1238,9 @@ class Actor:
except KeyError: except KeyError:
# NOTE: during msging race conditions this will often # NOTE: during msging race conditions this will often
# emit, some examples: # emit, some examples:
# - callee returns a result before cancel-msg/ctxc-raised # - child returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request, # - child self raises ctxc before parent send request,
# - callee errors prior to cancel req. # - child errors prior to cancel req.
log.cancel( log.cancel(
'Cancel request invalid, RPC task already completed?\n\n' 'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n' f'<= canceller: {requesting_uid}\n\n'
@ -1305,12 +1306,12 @@ class Actor:
f'|_{ctx}\n' f'|_{ctx}\n'
) )
log.runtime( log.runtime(
'Waiting on RPC task to cancel\n' 'Waiting on RPC task to cancel\n\n'
f'{flow_info}' f'{flow_info}'
) )
await is_complete.wait() await is_complete.wait()
log.runtime( log.runtime(
f'Sucessfully cancelled RPC task\n' f'Sucessfully cancelled RPC task\n\n'
f'{flow_info}' f'{flow_info}'
) )
return True return True
@ -1536,8 +1537,8 @@ async def async_main(
''' '''
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
is_registered: bool = False is_registered: bool = False
try: try:

View File

@ -160,12 +160,6 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Callable | None = None # pdb_release_hook: Callable | None = None
_trio_handler: (
Callable[[int, FrameType|None], Any]
|int
| None
) = None
remote_task_in_debug: str|None = None remote_task_in_debug: str|None = None
@staticmethod @staticmethod
@ -188,12 +182,6 @@ class Lock:
Lock._locking_task_cs = cs Lock._locking_task_cs = cs
# SUBACTOR ONLY
# ------ - -------
local_task_in_debug: Task|None = None
_debugger_request_cs: trio.CancelScope|None = None
local_pdb_complete: trio.Event|None = None
# ROOT ONLY # ROOT ONLY
# ------ - ------- # ------ - -------
# the root-actor-ONLY singletons for, # the root-actor-ONLY singletons for,
@ -214,16 +202,12 @@ class Lock:
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_blocked: set[tuple[str, str]] = set() # `Actor.uid` block list _blocked: set[tuple[str, str]] = set() # `Actor.uid` block list
# TODO: should go on `PbpREPL`?
_orig_sigint_handler: Callable | None = None
@classmethod @classmethod
def repr(cls) -> str: def repr(cls) -> str:
# both root and subs # both root and subs
fields: str = ( fields: str = (
f'repl: {cls.repl}\n' f'repl: {cls.repl}\n'
f'local_repl_task: {cls.local_task_in_debug}\n'
) )
if is_root_process(): if is_root_process():
@ -238,12 +222,6 @@ class Lock:
f'_debug_lock: {cls._debug_lock}\n' f'_debug_lock: {cls._debug_lock}\n'
f'lock_stats: {lock_stats}\n' f'lock_stats: {lock_stats}\n'
) )
else:
fields += (
f'local_task_in_debug: {cls.local_task_in_debug}\n'
f'local_pdb_complete: {cls.local_pdb_complete}\n'
f'_debugger_request_cs: {cls._debugger_request_cs}\n'
)
body: str = textwrap.indent( body: str = textwrap.indent(
fields, fields,
@ -255,7 +233,101 @@ class Lock:
')>' ')>'
) )
# TODO: move to PdbREPL! @classmethod
def release(cls):
try:
if not DebugStatus.is_main_trio_thread():
trio.from_thread.run_sync(
cls._debug_lock.release
)
else:
cls._debug_lock.release()
except RuntimeError as rte:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
# if (
# owner
# and
# cls.remote_task_in_debug is None
# ):
# raise RuntimeError(
# 'Stale `Lock` detected, no remote task active!?\n'
# f'|_{owner}\n'
# # f'{Lock}'
# ) from rte
if owner:
raise rte
# OW suppress, can't member why tho .. XD
# something somethin corrupts a cancel-scope
# somewhere..
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
if DebugStatus.repl_release is not None:
DebugStatus.repl_release.set()
finally:
cls.repl = None
cls.global_actor_in_debug = None
# restore original sigint handler
DebugStatus.unshield_sigint()
# actor-local state, irrelevant for non-root.
DebugStatus.repl_task = None
# TODO: actually use this instead throughout for subs!
class DebugStatus:
'''
Singleton-state for debugging machinery in a subactor.
Composes conc primitives for syncing with a root actor to
acquire the tree-global (TTY) `Lock` such that only ever one
actor's task can have the REPL active at a given time.
Methods to shield the process' `SIGINT` handler are used
whenever a local task is an active REPL.
'''
repl: PdbREPL|None = None
repl_task: Task|None = None
req_cs: trio.CancelScope|None = None
repl_release: trio.Event|None = None
lock_status: LockStatus|None = None
_orig_sigint_handler: Callable | None = None
_trio_handler: (
Callable[[int, FrameType|None], Any]
|int
| None
) = None
@classmethod
def repr(cls) -> str:
fields: str = (
f'repl: {cls.repl}\n'
f'repl_task: {cls.repl_task}\n'
f'repl_release: {cls.repl_release}\n'
f'req_cs: {cls.req_cs}\n'
)
body: str = textwrap.indent(
fields,
prefix=' |_',
)
return (
f'<{cls.__name__}(\n'
f'{body}'
')>'
)
@classmethod @classmethod
def shield_sigint(cls): def shield_sigint(cls):
''' '''
@ -339,77 +411,6 @@ class Lock:
# is not threading.main_thread() # is not threading.main_thread()
# ) # )
@classmethod
def release(cls):
try:
if not cls.is_main_trio_thread():
trio.from_thread.run_sync(
cls._debug_lock.release
)
else:
cls._debug_lock.release()
except RuntimeError as rte:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
# if (
# owner
# and
# cls.remote_task_in_debug is None
# ):
# raise RuntimeError(
# 'Stale `Lock` detected, no remote task active!?\n'
# f'|_{owner}\n'
# # f'{Lock}'
# ) from rte
if owner:
raise rte
# OW suppress, can't member why tho .. XD
# something somethin corrupts a cancel-scope
# somewhere..
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
if cls.local_pdb_complete is not None:
cls.local_pdb_complete.set()
finally:
# restore original sigint handler
cls.unshield_sigint()
cls.repl = None
# actor-local state, irrelevant for non-root.
cls.global_actor_in_debug = None
cls.local_task_in_debug = None
# TODO: actually use this instead throughout for subs!
class DebugStatus:
'''
Singleton-state for debugging machinery in a subactor.
Composes conc primitives for syncing with a root actor to
acquire the tree-global (TTY) `Lock` such that only ever one
actor's task can have the REPL active at a given time.
'''
repl: PdbREPL|None = None
lock_status: LockStatus|None = None
repl_task: Task|None = None
# local_task_in_debug: Task|None = None
req_cs: trio.CancelScope|None = None
# _debugger_request_cs: trio.CancelScope|None = None
repl_release: trio.Event|None = None
# local_pdb_complete: trio.Event|None = None
class TractorConfig(pdbp.DefaultConfig): class TractorConfig(pdbp.DefaultConfig):
''' '''
@ -445,6 +446,7 @@ class PdbREPL(pdbp.Pdb):
status = DebugStatus status = DebugStatus
# def preloop(self): # def preloop(self):
# print('IN PRELOOP') # print('IN PRELOOP')
# super().preloop() # super().preloop()
@ -660,16 +662,19 @@ async def lock_tty_for_child(
highly reliable at releasing the mutex complete! highly reliable at releasing the mutex complete!
''' '''
req_task_uid: tuple = tuple(subactor_task_uid) req_task_uid: tuple = tuple(subactor_task_uid)
if req_task_uid in Lock._blocked: if req_task_uid in Lock._blocked:
raise RuntimeError( raise RuntimeError(
f'Double lock request!?\n'
f'The same remote task already has an active request for TTY lock ??\n\n' f'The same remote task already has an active request for TTY lock ??\n\n'
f'task uid: {req_task_uid}\n' f'task uid: {req_task_uid}\n'
f'subactor uid: {subactor_uid}\n' f'subactor uid: {subactor_uid}\n\n'
)
Lock._blocked.add(req_task_uid) 'This might be mean that the requesting task '
'in `wait_for_parent_stdin_hijack()` may have crashed?\n'
'Consider that an internal bug exists given the TTY '
'`Lock`ing IPC dialog..\n'
)
root_task_name: str = current_task().name root_task_name: str = current_task().name
if tuple(subactor_uid) in Lock._blocked: if tuple(subactor_uid) in Lock._blocked:
@ -695,8 +700,9 @@ async def lock_tty_for_child(
f'subactor_uid: {subactor_uid}\n' f'subactor_uid: {subactor_uid}\n'
f'remote task: {subactor_task_uid}\n' f'remote task: {subactor_task_uid}\n'
) )
Lock.shield_sigint() DebugStatus.shield_sigint()
try: try:
Lock._blocked.add(req_task_uid)
with ( with (
# NOTE: though a cs is created for every subactor lock # NOTE: though a cs is created for every subactor lock
# REQUEST in this ctx-child task, only the root-task # REQUEST in this ctx-child task, only the root-task
@ -708,6 +714,9 @@ async def lock_tty_for_child(
# used to do so! # used to do so!
trio.CancelScope(shield=True) as debug_lock_cs, trio.CancelScope(shield=True) as debug_lock_cs,
# TODO: make this ONLY limit the pld_spec such that we
# can on-error-decode-`.pld: Raw` fields in
# `Context._deliver_msg()`?
_codec.limit_msg_spec( _codec.limit_msg_spec(
payload_spec=__msg_spec__, payload_spec=__msg_spec__,
) as codec, ) as codec,
@ -763,8 +772,9 @@ async def lock_tty_for_child(
finally: finally:
debug_lock_cs.cancel() debug_lock_cs.cancel()
Lock._blocked.remove(req_task_uid)
Lock.set_locking_task_cs(None) Lock.set_locking_task_cs(None)
Lock.unshield_sigint() DebugStatus.unshield_sigint()
@cm @cm
@ -817,7 +827,7 @@ async def wait_for_parent_stdin_hijack(
trio.CancelScope(shield=True) as cs, trio.CancelScope(shield=True) as cs,
apply_debug_codec(), apply_debug_codec(),
): ):
Lock._debugger_request_cs = cs DebugStatus.req_cs = cs
try: try:
# TODO: merge into sync async with ? # TODO: merge into sync async with ?
async with get_root() as portal: async with get_root() as portal:
@ -829,7 +839,7 @@ async def wait_for_parent_stdin_hijack(
) as (ctx, resp): ) as (ctx, resp):
log.pdb( log.pdb(
'Subactor locked TTY per msg\n' 'Subactor locked TTY with msg\n\n'
f'{resp}\n' f'{resp}\n'
) )
assert resp.subactor_uid == actor_uid assert resp.subactor_uid == actor_uid
@ -837,12 +847,12 @@ async def wait_for_parent_stdin_hijack(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
try: # to unblock local caller try: # to unblock local caller
assert Lock.local_pdb_complete assert DebugStatus.repl_release
task_status.started(cs) task_status.started(cs)
# wait for local task to exit and # wait for local task to exit and
# release the REPL # release the REPL
await Lock.local_pdb_complete.wait() await DebugStatus.repl_release.wait()
finally: finally:
await stream.send( await stream.send(
@ -867,12 +877,12 @@ async def wait_for_parent_stdin_hijack(
raise raise
finally: finally:
Lock.local_task_in_debug = None DebugStatus.repl_task = None
log.debug('Exiting debugger TTY lock request func from child') log.debug('Exiting debugger TTY lock request func from child')
log.cancel('Reverting SIGINT handler!') log.cancel('Reverting SIGINT handler!')
Lock.unshield_sigint() DebugStatus.unshield_sigint()
@ -901,7 +911,7 @@ def mk_mpdb() -> PdbREPL:
# in which case schedule the SIGINT shielding override # in which case schedule the SIGINT shielding override
# to in the main thread. # to in the main thread.
# https://docs.python.org/3/library/signal.html#signals-and-threads # https://docs.python.org/3/library/signal.html#signals-and-threads
Lock.shield_sigint() DebugStatus.shield_sigint()
# XXX: These are the important flags mentioned in # XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155 # https://github.com/python-trio/trio/issues/1155
@ -1036,7 +1046,8 @@ def shield_sigint_handler(
) )
log.warning(message) log.warning(message)
Lock.unshield_sigint() # Lock.unshield_sigint()
DebugStatus.unshield_sigint()
case_handled = True case_handled = True
else: else:
@ -1064,7 +1075,7 @@ def shield_sigint_handler(
if maybe_stale_lock_cs: if maybe_stale_lock_cs:
lock_cs.cancel() lock_cs.cancel()
Lock.unshield_sigint() DebugStatus.unshield_sigint()
case_handled = True case_handled = True
# child actor that has locked the debugger # child actor that has locked the debugger
@ -1086,11 +1097,11 @@ def shield_sigint_handler(
f'{uid_in_debug}\n' f'{uid_in_debug}\n'
'Allowing SIGINT propagation..' 'Allowing SIGINT propagation..'
) )
Lock.unshield_sigint() DebugStatus.unshield_sigint()
# do_cancel() # do_cancel()
case_handled = True case_handled = True
task: str|None = Lock.local_task_in_debug task: str|None = DebugStatus.repl_task
if ( if (
task task
and and
@ -1124,7 +1135,7 @@ def shield_sigint_handler(
+ +
'Reverting handler to `trio` default!\n' 'Reverting handler to `trio` default!\n'
) )
Lock.unshield_sigint() DebugStatus.unshield_sigint()
case_handled = True case_handled = True
# XXX ensure that the reverted-to-handler actually is # XXX ensure that the reverted-to-handler actually is
@ -1200,32 +1211,15 @@ def _set_trace(
pdb pdb
and actor is not None and actor is not None
) )
# or shield
): ):
msg: str = _pause_msg
if shield:
# log.warning(
msg = (
'\n\n'
' ------ - ------\n'
'Debugger invoked with `shield=True` so an extra\n'
'`trio.CancelScope.__exit__()` frame is shown..\n'
'\n'
'Try going up one frame to see your pause point!\n'
'\n'
' SORRY we need to fix this!\n'
' ------ - ------\n\n'
) + msg
# pdbp.set_trace()
# TODO: maybe print the actor supervion tree up to the # TODO: maybe print the actor supervion tree up to the
# root here? Bo # root here? Bo
log.pdb( log.pdb(
f'{msg}\n' f'{_pause_msg}\n'
'|\n' '|\n'
# TODO: make an `Actor.__repr()__` # TODO: make an `Actor.__repr()__`
# f'|_ {current_task()} @ {actor.name}\n' f'|_ {current_task()} @ {actor.uid}\n'
f'|_ {current_task()}\n'
) )
# no f!#$&* idea, but when we're in async land # no f!#$&* idea, but when we're in async land
# we need 2x frames up? # we need 2x frames up?
@ -1286,11 +1280,11 @@ async def _pause(
# task_name: str = task.name # task_name: str = task.name
if ( if (
not Lock.local_pdb_complete not DebugStatus.repl_release
or or
Lock.local_pdb_complete.is_set() DebugStatus.repl_release.is_set()
): ):
Lock.local_pdb_complete = trio.Event() DebugStatus.repl_release = trio.Event()
if debug_func is not None: if debug_func is not None:
debug_func = partial(debug_func) debug_func = partial(debug_func)
@ -1333,12 +1327,7 @@ async def _pause(
Lock.release() Lock.release()
raise raise
except BaseException: try:
log.exception(
'Failed to engage debugger via `_pause()` ??\n'
)
raise
if is_root_process(): if is_root_process():
# we also wait in the root-parent for any child that # we also wait in the root-parent for any child that
@ -1371,8 +1360,8 @@ async def _pause(
await Lock._debug_lock.acquire() await Lock._debug_lock.acquire()
Lock.global_actor_in_debug = actor.uid Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task DebugStatus.repl_task = task
Lock.repl = pdb DebugStatus.repl = Lock.repl = pdb
# enter REPL from root, no TTY locking IPC ctx necessary # enter REPL from root, no TTY locking IPC ctx necessary
_enter_repl_sync(debug_func) _enter_repl_sync(debug_func)
@ -1383,7 +1372,7 @@ async def _pause(
not is_root_process() not is_root_process()
and actor._parent_chan # a connected child and actor._parent_chan # a connected child
): ):
if Lock.local_task_in_debug: if DebugStatus.repl_task:
# Recurrence entry case: this task already has the lock and # Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint # is likely recurrently entering a breakpoint
@ -1394,7 +1383,7 @@ async def _pause(
# subactor that would otherwise not bubble until the # subactor that would otherwise not bubble until the
# next checkpoint was hit. # next checkpoint was hit.
if ( if (
(repl_task := Lock.local_task_in_debug) (repl_task := DebugStatus.repl_task)
and and
repl_task is task repl_task is task
): ):
@ -1413,12 +1402,12 @@ async def _pause(
f'{task.name}@{actor.uid} already has TTY lock\n' f'{task.name}@{actor.uid} already has TTY lock\n'
f'waiting for release..' f'waiting for release..'
) )
await Lock.local_pdb_complete.wait() await DebugStatus.repl_release.wait()
await trio.sleep(0.1) await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent # mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process # entries/requests to the root process
Lock.local_task_in_debug = task DebugStatus.repl_task = task
# this **must** be awaited by the caller and is done using the # this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without # root nursery so that the debugger can continue to run without
@ -1455,7 +1444,7 @@ async def _pause(
) )
# our locker task should be the one in ctx # our locker task should be the one in ctx
# with the root actor # with the root actor
assert Lock._debugger_request_cs is cs assert DebugStatus.req_cs is cs
# XXX used by the SIGINT handler to check if # XXX used by the SIGINT handler to check if
# THIS actor is in REPL interaction # THIS actor is in REPL interaction
@ -1477,7 +1466,13 @@ async def _pause(
try: try:
_enter_repl_sync(debug_func) _enter_repl_sync(debug_func)
finally: finally:
Lock.unshield_sigint() DebugStatus.unshield_sigint()
except BaseException:
log.exception(
'Failed to engage debugger via `_pause()` ??\n'
)
raise
# XXX: apparently we can't do this without showing this frame # XXX: apparently we can't do this without showing this frame
@ -1527,45 +1522,16 @@ async def pause(
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
if shield:
# NOTE XXX: even hard coding this inside the `class CancelScope:`
# doesn't seem to work for me!?
# ^ XXX ^
# def _exit(self, *args, **kwargs):
# __tracebackhide__: bool = True
# super().__exit__(*args, **kwargs)
# trio.CancelScope.__enter__.__tracebackhide__ = True
# trio.CancelScope.__exit__.__tracebackhide__ = True
# import types
# with trio.CancelScope(shield=shield) as cs:
# cs.__exit__ = types.MethodType(_exit, cs)
# cs.__exit__.__tracebackhide__ = True
# TODO: LOL, solved this with the `pdb.hideframe` stuff
# at top-of-mod.. so I guess we can just only use this
# block right?
with trio.CancelScope( with trio.CancelScope(
shield=shield, shield=shield,
) as cs: ) as cs:
print(f'debug cs is {cs}\n')
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
# NOTE: so the caller can always cancel even if shielded # NOTE: so the caller can always manually cancel even
# if shielded!
task_status.started(cs) task_status.started(cs)
return await _pause( return await _pause(
debug_func=debug_func, debug_func=debug_func,
shield=True, shield=shield,
task_status=task_status,
**_pause_kwargs
)
else:
return await _pause(
debug_func=debug_func,
shield=False,
task_status=task_status, task_status=task_status,
**_pause_kwargs **_pause_kwargs
) )
@ -1682,7 +1648,7 @@ def pause_from_sync(
) )
) )
# TODO: maybe the `trio.current_task()` id/name if avail? # TODO: maybe the `trio.current_task()` id/name if avail?
Lock.local_task_in_debug: str = str(threading.current_thread()) DebugStatus.repl_task: str = str(threading.current_thread())
else: # we are presumably the `trio.run()` + main thread else: # we are presumably the `trio.run()` + main thread
greenback.await_( greenback.await_(
@ -1692,7 +1658,7 @@ def pause_from_sync(
hide_tb=hide_tb, hide_tb=hide_tb,
) )
) )
Lock.local_task_in_debug: str = current_task() DebugStatus.repl_task: str = current_task()
# TODO: ensure we aggressively make the user aware about # TODO: ensure we aggressively make the user aware about
# entering the global ``breakpoint()`` built-in from sync # entering the global ``breakpoint()`` built-in from sync
@ -1754,7 +1720,8 @@ def _post_mortem(
log.pdb( log.pdb(
f'{_crash_msg}\n' f'{_crash_msg}\n'
'|\n' '|\n'
f'|_ {current_task()}\n' # f'|_ {current_task()}\n'
f'|_ {current_task()} @ {actor.uid}\n'
# f'|_ @{actor.uid}\n' # f'|_ @{actor.uid}\n'
# TODO: make an `Actor.__repr()__` # TODO: make an `Actor.__repr()__`