forked from goodboy/tractor
1
0
Fork 0

Use `DebugStatus` around subactor lock requests

Breaks out all the (sub)actor local conc primitives from `Lock` (which
is now only used in and by the root actor) such that there's an explicit
distinction between a task that's "consuming" the `Lock` (remotely) vs.
the root-side service tasks which do the actual acquire on behalf of the
requesters.

`DebugStatus` changeover deats:
------ - ------
- move all the actor-local vars over `DebugStatus` including:
  - move `_trio_handler` and `_orig_sigint_handler`
  - `local_task_in_debug` now `repl_task`
  - `_debugger_request_cs` now `req_cs`
  - `local_pdb_complete` now `repl_release`
- drop all ^ fields from `Lock.repr()` obvi..
- move over the `.[un]shield_sigint()` and
  `.is_main_trio_thread()` methods.
- add some new attrs/meths:
  - `DebugStatus.repl` for the currently running `Pdb` in-actor
    singleton.
  - `.repr()` for pprint of state (like `Lock`).
- Note: that even when a root-actor task is in REPL, the `DebugStatus`
  is still used for certain actor-local state mgmt, such as SIGINT
  handler shielding.
- obvi change all lock-requester code bits to now use a `DebugStatus` in
  their local actor-state instead of `Lock`, i.e. change usage from
  `Lock` in `._runtime` and `._root`.
- use new `Lock.get_locking_task_cs()` API in when checking for
  sub-in-debug from `._runtime.Actor._stream_handler()`.

Unrelated to topic-at-hand tweaks:
------ - ------
- drop the commented bits about hiding `@[a]cm` stack frames from
  `_debug.pause()` and simplify to only one block with the `shield`
  passthrough since we already solved the issue with cancel-scopes using
  `@pdbp.hideframe` B)
  - this includes all the extra logging about the extra frame for the
    user (good thing i put in that wasted effort back then eh..)
- put the `try/except BaseException` with `log.exception()` around the
  whole of `._pause()` to ensure we don't miss in-func errors which can
  cause hangs..
- allow passing in `portal: Portal` to
  `Actor.start_remote_task()` such that `Portal` task spawning methods
  are always denoted correctly in terms of `Context.side`.
- lotsa logging tweaks, decreasing a bit of noise from `.runtime()`s.
runtime_to_msgspec
Tyler Goodlet 2024-04-18 12:47:28 -04:00
parent d0e7610073
commit 77a15ebf19
3 changed files with 322 additions and 354 deletions

View File

@ -135,7 +135,7 @@ async def open_root_actor(
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor # mark top most level process as root actor
_state._runtime_vars['_is_root'] = True _state._runtime_vars['_is_root'] = True

View File

@ -267,10 +267,13 @@ class Actor:
self._listeners: list[trio.abc.Listener] = [] self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel|None = None self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None self._forkserver_info: tuple|None = None
# track each child/sub-actor in it's locally
# supervising nursery
self._actoruid2nursery: dict[ self._actoruid2nursery: dict[
tuple[str, str], tuple[str, str], # sub-`Actor.uid`
ActorNursery|None, ActorNursery|None,
] = {} # type: ignore # noqa ] = {}
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
# input via the validator. # input via the validator.
@ -659,12 +662,18 @@ class Actor:
# TODO: NEEEDS TO BE TESTED! # TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD # actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if ( if (
pdb_user_uid pdb_user_uid
and local_nursery and local_nursery
): ):
entry: tuple|None = local_nursery._children.get(pdb_user_uid) entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
if entry: if entry:
proc: trio.Process proc: trio.Process
_, proc, _ = entry _, proc, _ = entry
@ -674,10 +683,10 @@ class Actor:
and poll() is None and poll() is None
): ):
log.cancel( log.cancel(
'Root actor reports no-more-peers, BUT ' 'Root actor reports no-more-peers, BUT\n'
'a DISCONNECTED child still has the debug ' 'a DISCONNECTED child still has the debug '
'lock!\n' 'lock!\n\n'
f'root uid: {self.uid}\n' # f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n' f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n' f'locking child uid: {pdb_user_uid}\n'
) )
@ -703,9 +712,8 @@ class Actor:
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if ( if (
db_cs (db_cs := pdb_lock.get_locking_task_cs())
and not db_cs.cancel_called and not db_cs.cancel_called
and uid == pdb_user_uid and uid == pdb_user_uid
): ):
@ -742,7 +750,7 @@ class Actor:
except KeyError: except KeyError:
log.warning( log.warning(
'Ignoring invalid IPC ctx msg!\n\n' 'Ignoring invalid IPC ctx msg!\n\n'
f'<= sender: {uid}\n' f'<= sender: {uid}\n\n'
# XXX don't need right since it's always in msg? # XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n' # f'=> cid: {cid}\n\n'
@ -796,7 +804,7 @@ class Actor:
cid, cid,
# side, # side,
)] )]
log.runtime( log.debug(
f'Retreived cached IPC ctx for\n' f'Retreived cached IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
@ -835,10 +843,14 @@ class Actor:
nsf: NamespacePath, nsf: NamespacePath,
kwargs: dict, kwargs: dict,
# determines `Context.side: str`
portal: Portal|None = None,
# IPC channel config # IPC channel config
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
ack_timeout: float = 3,
) -> Context: ) -> Context:
''' '''
@ -863,10 +875,12 @@ class Actor:
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
ctx._portal = portal
if ( if (
'self' in nsf 'self' in nsf
or not load_nsf or
not load_nsf
): ):
ns, _, func = nsf.partition(':') ns, _, func = nsf.partition(':')
else: else:
@ -874,42 +888,29 @@ class Actor:
# -[ ] but, how to do `self:<Actor.meth>`?? # -[ ] but, how to do `self:<Actor.meth>`??
ns, func = nsf.to_tuple() ns, func = nsf.to_tuple()
msg = msgtypes.Start(
ns=ns,
func=func,
kwargs=kwargs,
uid=self.uid,
cid=cid,
)
log.runtime( log.runtime(
'Sending cmd to\n' 'Sending RPC start msg\n\n'
f'peer: {chan.uid} => \n' f'=> peer: {chan.uid}\n'
'\n' f' |_ {ns}.{func}({kwargs})\n'
f'=> {ns}.{func}({kwargs})\n'
) )
await chan.send( await chan.send(msg)
msgtypes.Start(
ns=ns,
func=func,
kwargs=kwargs,
uid=self.uid,
cid=cid,
)
)
# {'cmd': (
# ns,
# func,
# kwargs,
# self.uid,
# cid,
# )}
# )
# Wait on first response msg and validate; this should be # NOTE wait on first `StartAck` response msg and validate;
# immediate. # this should be immediate and does not (yet) wait for the
# first_msg: dict = await ctx._recv_chan.receive() # remote child task to sync via `Context.started()`.
# functype: str = first_msg.get('functype') with trio.fail_after(ack_timeout):
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
try: try:
functype: str = first_msg.functype functype: str = first_msg.functype
except AttributeError: except AttributeError:
raise unpack_error(first_msg, chan) raise unpack_error(first_msg, chan)
# if 'error' in first_msg:
# raise unpack_error(first_msg, chan)
if functype not in ( if functype not in (
'asyncfunc', 'asyncfunc',
@ -917,7 +918,7 @@ class Actor:
'context', 'context',
): ):
raise ValueError( raise ValueError(
f'{first_msg} is an invalid response packet?' f'Invalid `StartAck.functype: str = {first_msg!r}` ??'
) )
ctx._remote_func_type = functype ctx._remote_func_type = functype
@ -1162,7 +1163,7 @@ class Actor:
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
dbcs = _debug.Lock._debugger_request_cs dbcs = _debug.DebugStatus.req_cs
if dbcs is not None: if dbcs is not None:
msg += ( msg += (
'>> Cancelling active debugger request..\n' '>> Cancelling active debugger request..\n'
@ -1237,9 +1238,9 @@ class Actor:
except KeyError: except KeyError:
# NOTE: during msging race conditions this will often # NOTE: during msging race conditions this will often
# emit, some examples: # emit, some examples:
# - callee returns a result before cancel-msg/ctxc-raised # - child returns a result before cancel-msg/ctxc-raised
# - callee self raises ctxc before caller send request, # - child self raises ctxc before parent send request,
# - callee errors prior to cancel req. # - child errors prior to cancel req.
log.cancel( log.cancel(
'Cancel request invalid, RPC task already completed?\n\n' 'Cancel request invalid, RPC task already completed?\n\n'
f'<= canceller: {requesting_uid}\n\n' f'<= canceller: {requesting_uid}\n\n'
@ -1302,15 +1303,15 @@ class Actor:
flow_info: str = ( flow_info: str = (
f'<= canceller: {requesting_uid}\n' f'<= canceller: {requesting_uid}\n'
f'=> ipc-parent: {parent_chan}\n' f'=> ipc-parent: {parent_chan}\n'
f' |_{ctx}\n' f'|_{ctx}\n'
) )
log.runtime( log.runtime(
'Waiting on RPC task to cancel\n' 'Waiting on RPC task to cancel\n\n'
f'{flow_info}' f'{flow_info}'
) )
await is_complete.wait() await is_complete.wait()
log.runtime( log.runtime(
f'Sucessfully cancelled RPC task\n' f'Sucessfully cancelled RPC task\n\n'
f'{flow_info}' f'{flow_info}'
) )
return True return True
@ -1536,8 +1537,8 @@ async def async_main(
''' '''
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
is_registered: bool = False is_registered: bool = False
try: try:

View File

@ -160,12 +160,6 @@ class Lock:
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Callable | None = None # pdb_release_hook: Callable | None = None
_trio_handler: (
Callable[[int, FrameType|None], Any]
|int
| None
) = None
remote_task_in_debug: str|None = None remote_task_in_debug: str|None = None
@staticmethod @staticmethod
@ -188,12 +182,6 @@ class Lock:
Lock._locking_task_cs = cs Lock._locking_task_cs = cs
# SUBACTOR ONLY
# ------ - -------
local_task_in_debug: Task|None = None
_debugger_request_cs: trio.CancelScope|None = None
local_pdb_complete: trio.Event|None = None
# ROOT ONLY # ROOT ONLY
# ------ - ------- # ------ - -------
# the root-actor-ONLY singletons for, # the root-actor-ONLY singletons for,
@ -214,16 +202,12 @@ class Lock:
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_blocked: set[tuple[str, str]] = set() # `Actor.uid` block list _blocked: set[tuple[str, str]] = set() # `Actor.uid` block list
# TODO: should go on `PbpREPL`?
_orig_sigint_handler: Callable | None = None
@classmethod @classmethod
def repr(cls) -> str: def repr(cls) -> str:
# both root and subs # both root and subs
fields: str = ( fields: str = (
f'repl: {cls.repl}\n' f'repl: {cls.repl}\n'
f'local_repl_task: {cls.local_task_in_debug}\n'
) )
if is_root_process(): if is_root_process():
@ -238,12 +222,6 @@ class Lock:
f'_debug_lock: {cls._debug_lock}\n' f'_debug_lock: {cls._debug_lock}\n'
f'lock_stats: {lock_stats}\n' f'lock_stats: {lock_stats}\n'
) )
else:
fields += (
f'local_task_in_debug: {cls.local_task_in_debug}\n'
f'local_pdb_complete: {cls.local_pdb_complete}\n'
f'_debugger_request_cs: {cls._debugger_request_cs}\n'
)
body: str = textwrap.indent( body: str = textwrap.indent(
fields, fields,
@ -255,7 +233,101 @@ class Lock:
')>' ')>'
) )
# TODO: move to PdbREPL! @classmethod
def release(cls):
try:
if not DebugStatus.is_main_trio_thread():
trio.from_thread.run_sync(
cls._debug_lock.release
)
else:
cls._debug_lock.release()
except RuntimeError as rte:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
# if (
# owner
# and
# cls.remote_task_in_debug is None
# ):
# raise RuntimeError(
# 'Stale `Lock` detected, no remote task active!?\n'
# f'|_{owner}\n'
# # f'{Lock}'
# ) from rte
if owner:
raise rte
# OW suppress, can't member why tho .. XD
# something somethin corrupts a cancel-scope
# somewhere..
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
if DebugStatus.repl_release is not None:
DebugStatus.repl_release.set()
finally:
cls.repl = None
cls.global_actor_in_debug = None
# restore original sigint handler
DebugStatus.unshield_sigint()
# actor-local state, irrelevant for non-root.
DebugStatus.repl_task = None
# TODO: actually use this instead throughout for subs!
class DebugStatus:
'''
Singleton-state for debugging machinery in a subactor.
Composes conc primitives for syncing with a root actor to
acquire the tree-global (TTY) `Lock` such that only ever one
actor's task can have the REPL active at a given time.
Methods to shield the process' `SIGINT` handler are used
whenever a local task is an active REPL.
'''
repl: PdbREPL|None = None
repl_task: Task|None = None
req_cs: trio.CancelScope|None = None
repl_release: trio.Event|None = None
lock_status: LockStatus|None = None
_orig_sigint_handler: Callable | None = None
_trio_handler: (
Callable[[int, FrameType|None], Any]
|int
| None
) = None
@classmethod
def repr(cls) -> str:
fields: str = (
f'repl: {cls.repl}\n'
f'repl_task: {cls.repl_task}\n'
f'repl_release: {cls.repl_release}\n'
f'req_cs: {cls.req_cs}\n'
)
body: str = textwrap.indent(
fields,
prefix=' |_',
)
return (
f'<{cls.__name__}(\n'
f'{body}'
')>'
)
@classmethod @classmethod
def shield_sigint(cls): def shield_sigint(cls):
''' '''
@ -339,77 +411,6 @@ class Lock:
# is not threading.main_thread() # is not threading.main_thread()
# ) # )
@classmethod
def release(cls):
try:
if not cls.is_main_trio_thread():
trio.from_thread.run_sync(
cls._debug_lock.release
)
else:
cls._debug_lock.release()
except RuntimeError as rte:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
# if (
# owner
# and
# cls.remote_task_in_debug is None
# ):
# raise RuntimeError(
# 'Stale `Lock` detected, no remote task active!?\n'
# f'|_{owner}\n'
# # f'{Lock}'
# ) from rte
if owner:
raise rte
# OW suppress, can't member why tho .. XD
# something somethin corrupts a cancel-scope
# somewhere..
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
if cls.local_pdb_complete is not None:
cls.local_pdb_complete.set()
finally:
# restore original sigint handler
cls.unshield_sigint()
cls.repl = None
# actor-local state, irrelevant for non-root.
cls.global_actor_in_debug = None
cls.local_task_in_debug = None
# TODO: actually use this instead throughout for subs!
class DebugStatus:
'''
Singleton-state for debugging machinery in a subactor.
Composes conc primitives for syncing with a root actor to
acquire the tree-global (TTY) `Lock` such that only ever one
actor's task can have the REPL active at a given time.
'''
repl: PdbREPL|None = None
lock_status: LockStatus|None = None
repl_task: Task|None = None
# local_task_in_debug: Task|None = None
req_cs: trio.CancelScope|None = None
# _debugger_request_cs: trio.CancelScope|None = None
repl_release: trio.Event|None = None
# local_pdb_complete: trio.Event|None = None
class TractorConfig(pdbp.DefaultConfig): class TractorConfig(pdbp.DefaultConfig):
''' '''
@ -445,6 +446,7 @@ class PdbREPL(pdbp.Pdb):
status = DebugStatus status = DebugStatus
# def preloop(self): # def preloop(self):
# print('IN PRELOOP') # print('IN PRELOOP')
# super().preloop() # super().preloop()
@ -660,16 +662,19 @@ async def lock_tty_for_child(
highly reliable at releasing the mutex complete! highly reliable at releasing the mutex complete!
''' '''
req_task_uid: tuple = tuple(subactor_task_uid) req_task_uid: tuple = tuple(subactor_task_uid)
if req_task_uid in Lock._blocked: if req_task_uid in Lock._blocked:
raise RuntimeError( raise RuntimeError(
f'Double lock request!?\n'
f'The same remote task already has an active request for TTY lock ??\n\n' f'The same remote task already has an active request for TTY lock ??\n\n'
f'task uid: {req_task_uid}\n' f'task uid: {req_task_uid}\n'
f'subactor uid: {subactor_uid}\n' f'subactor uid: {subactor_uid}\n\n'
)
Lock._blocked.add(req_task_uid) 'This might be mean that the requesting task '
'in `wait_for_parent_stdin_hijack()` may have crashed?\n'
'Consider that an internal bug exists given the TTY '
'`Lock`ing IPC dialog..\n'
)
root_task_name: str = current_task().name root_task_name: str = current_task().name
if tuple(subactor_uid) in Lock._blocked: if tuple(subactor_uid) in Lock._blocked:
@ -695,8 +700,9 @@ async def lock_tty_for_child(
f'subactor_uid: {subactor_uid}\n' f'subactor_uid: {subactor_uid}\n'
f'remote task: {subactor_task_uid}\n' f'remote task: {subactor_task_uid}\n'
) )
Lock.shield_sigint() DebugStatus.shield_sigint()
try: try:
Lock._blocked.add(req_task_uid)
with ( with (
# NOTE: though a cs is created for every subactor lock # NOTE: though a cs is created for every subactor lock
# REQUEST in this ctx-child task, only the root-task # REQUEST in this ctx-child task, only the root-task
@ -708,6 +714,9 @@ async def lock_tty_for_child(
# used to do so! # used to do so!
trio.CancelScope(shield=True) as debug_lock_cs, trio.CancelScope(shield=True) as debug_lock_cs,
# TODO: make this ONLY limit the pld_spec such that we
# can on-error-decode-`.pld: Raw` fields in
# `Context._deliver_msg()`?
_codec.limit_msg_spec( _codec.limit_msg_spec(
payload_spec=__msg_spec__, payload_spec=__msg_spec__,
) as codec, ) as codec,
@ -763,8 +772,9 @@ async def lock_tty_for_child(
finally: finally:
debug_lock_cs.cancel() debug_lock_cs.cancel()
Lock._blocked.remove(req_task_uid)
Lock.set_locking_task_cs(None) Lock.set_locking_task_cs(None)
Lock.unshield_sigint() DebugStatus.unshield_sigint()
@cm @cm
@ -817,7 +827,7 @@ async def wait_for_parent_stdin_hijack(
trio.CancelScope(shield=True) as cs, trio.CancelScope(shield=True) as cs,
apply_debug_codec(), apply_debug_codec(),
): ):
Lock._debugger_request_cs = cs DebugStatus.req_cs = cs
try: try:
# TODO: merge into sync async with ? # TODO: merge into sync async with ?
async with get_root() as portal: async with get_root() as portal:
@ -829,7 +839,7 @@ async def wait_for_parent_stdin_hijack(
) as (ctx, resp): ) as (ctx, resp):
log.pdb( log.pdb(
'Subactor locked TTY per msg\n' 'Subactor locked TTY with msg\n\n'
f'{resp}\n' f'{resp}\n'
) )
assert resp.subactor_uid == actor_uid assert resp.subactor_uid == actor_uid
@ -837,12 +847,12 @@ async def wait_for_parent_stdin_hijack(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
try: # to unblock local caller try: # to unblock local caller
assert Lock.local_pdb_complete assert DebugStatus.repl_release
task_status.started(cs) task_status.started(cs)
# wait for local task to exit and # wait for local task to exit and
# release the REPL # release the REPL
await Lock.local_pdb_complete.wait() await DebugStatus.repl_release.wait()
finally: finally:
await stream.send( await stream.send(
@ -867,12 +877,12 @@ async def wait_for_parent_stdin_hijack(
raise raise
finally: finally:
Lock.local_task_in_debug = None DebugStatus.repl_task = None
log.debug('Exiting debugger TTY lock request func from child') log.debug('Exiting debugger TTY lock request func from child')
log.cancel('Reverting SIGINT handler!') log.cancel('Reverting SIGINT handler!')
Lock.unshield_sigint() DebugStatus.unshield_sigint()
@ -901,7 +911,7 @@ def mk_mpdb() -> PdbREPL:
# in which case schedule the SIGINT shielding override # in which case schedule the SIGINT shielding override
# to in the main thread. # to in the main thread.
# https://docs.python.org/3/library/signal.html#signals-and-threads # https://docs.python.org/3/library/signal.html#signals-and-threads
Lock.shield_sigint() DebugStatus.shield_sigint()
# XXX: These are the important flags mentioned in # XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155 # https://github.com/python-trio/trio/issues/1155
@ -1036,7 +1046,8 @@ def shield_sigint_handler(
) )
log.warning(message) log.warning(message)
Lock.unshield_sigint() # Lock.unshield_sigint()
DebugStatus.unshield_sigint()
case_handled = True case_handled = True
else: else:
@ -1064,7 +1075,7 @@ def shield_sigint_handler(
if maybe_stale_lock_cs: if maybe_stale_lock_cs:
lock_cs.cancel() lock_cs.cancel()
Lock.unshield_sigint() DebugStatus.unshield_sigint()
case_handled = True case_handled = True
# child actor that has locked the debugger # child actor that has locked the debugger
@ -1086,11 +1097,11 @@ def shield_sigint_handler(
f'{uid_in_debug}\n' f'{uid_in_debug}\n'
'Allowing SIGINT propagation..' 'Allowing SIGINT propagation..'
) )
Lock.unshield_sigint() DebugStatus.unshield_sigint()
# do_cancel() # do_cancel()
case_handled = True case_handled = True
task: str|None = Lock.local_task_in_debug task: str|None = DebugStatus.repl_task
if ( if (
task task
and and
@ -1124,7 +1135,7 @@ def shield_sigint_handler(
+ +
'Reverting handler to `trio` default!\n' 'Reverting handler to `trio` default!\n'
) )
Lock.unshield_sigint() DebugStatus.unshield_sigint()
case_handled = True case_handled = True
# XXX ensure that the reverted-to-handler actually is # XXX ensure that the reverted-to-handler actually is
@ -1200,32 +1211,15 @@ def _set_trace(
pdb pdb
and actor is not None and actor is not None
) )
# or shield
): ):
msg: str = _pause_msg
if shield:
# log.warning(
msg = (
'\n\n'
' ------ - ------\n'
'Debugger invoked with `shield=True` so an extra\n'
'`trio.CancelScope.__exit__()` frame is shown..\n'
'\n'
'Try going up one frame to see your pause point!\n'
'\n'
' SORRY we need to fix this!\n'
' ------ - ------\n\n'
) + msg
# pdbp.set_trace()
# TODO: maybe print the actor supervion tree up to the # TODO: maybe print the actor supervion tree up to the
# root here? Bo # root here? Bo
log.pdb( log.pdb(
f'{msg}\n' f'{_pause_msg}\n'
'|\n' '|\n'
# TODO: make an `Actor.__repr()__` # TODO: make an `Actor.__repr()__`
# f'|_ {current_task()} @ {actor.name}\n' f'|_ {current_task()} @ {actor.uid}\n'
f'|_ {current_task()}\n'
) )
# no f!#$&* idea, but when we're in async land # no f!#$&* idea, but when we're in async land
# we need 2x frames up? # we need 2x frames up?
@ -1286,11 +1280,11 @@ async def _pause(
# task_name: str = task.name # task_name: str = task.name
if ( if (
not Lock.local_pdb_complete not DebugStatus.repl_release
or or
Lock.local_pdb_complete.is_set() DebugStatus.repl_release.is_set()
): ):
Lock.local_pdb_complete = trio.Event() DebugStatus.repl_release = trio.Event()
if debug_func is not None: if debug_func is not None:
debug_func = partial(debug_func) debug_func = partial(debug_func)
@ -1333,71 +1327,14 @@ async def _pause(
Lock.release() Lock.release()
raise raise
except BaseException: try:
log.exception( if is_root_process():
'Failed to engage debugger via `_pause()` ??\n'
)
raise
if is_root_process(): # we also wait in the root-parent for any child that
# may have the tty locked prior
# we also wait in the root-parent for any child that # TODO: wait, what about multiple root tasks acquiring it though?
# may have the tty locked prior if Lock.global_actor_in_debug == actor.uid:
# TODO: wait, what about multiple root tasks acquiring it though? # re-entrant root process already has it: noop.
if Lock.global_actor_in_debug == actor.uid:
# re-entrant root process already has it: noop.
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'ignoring..'
)
await trio.lowlevel.checkpoint()
return
# XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm.
if Lock._debug_lock.locked():
log.warning(
'attempting to shield-acquire active TTY lock'
f' owned by {Lock.global_actor_in_debug}'
)
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task
Lock.repl = pdb
# enter REPL from root, no TTY locking IPC ctx necessary
_enter_repl_sync(debug_func)
return # next branch is mutex and for subactors
# TODO: need a more robust check for the "root" actor
elif (
not is_root_process()
and actor._parent_chan # a connected child
):
if Lock.local_task_in_debug:
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
#
# NOTE: noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some
# subactor that would otherwise not bubble until the
# next checkpoint was hit.
if (
(repl_task := Lock.local_task_in_debug)
and
repl_task is task
):
log.warning( log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n' f'{task.name}@{actor.uid} already has TTY lock\n'
f'ignoring..' f'ignoring..'
@ -1405,79 +1342,137 @@ async def _pause(
await trio.lowlevel.checkpoint() await trio.lowlevel.checkpoint()
return return
# if **this** actor is already in debug REPL we want # XXX: since we need to enter pdb synchronously below,
# to maintain actor-local-task mutex access, so block # we have to release the lock manually from pdb completion
# here waiting for the control to be released - this # callbacks. Can't think of a nicer way then this atm.
# -> allows for recursive entries to `tractor.pause()` if Lock._debug_lock.locked():
log.warning( log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n' 'attempting to shield-acquire active TTY lock'
f'waiting for release..' f' owned by {Lock.global_actor_in_debug}'
)
await Lock.local_pdb_complete.wait()
await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
Lock.local_task_in_debug = task
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# TODO: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE: MUST it here bc multiple tasks are spawned by any
# one sub-actor AND there will be a race between when the
# root locking task delivers the `Started(pld=LockStatus)`
# and when the REPL is actually entered here. SO ensure
# the codec is set before either are run!
#
with (
# _codec.limit_msg_spec(
# payload_spec=__msg_spec__,
# ) as debug_codec,
trio.CancelScope(shield=shield),
):
# async with trio.open_nursery() as tn:
# tn.cancel_scope.shield = True
try:
# cs: trio.CancelScope = await tn.start(
cs: trio.CancelScope = await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
(task.name, id(task)),
) )
# our locker task should be the one in ctx
# with the root actor
assert Lock._debugger_request_cs is cs
# XXX used by the SIGINT handler to check if # must shield here to avoid hitting a ``Cancelled`` and
# THIS actor is in REPL interaction # a child getting stuck bc we clobbered the tty
Lock.repl = pdb with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire()
else:
# may be cancelled
await Lock._debug_lock.acquire()
except RuntimeError: Lock.global_actor_in_debug = actor.uid
Lock.release() DebugStatus.repl_task = task
DebugStatus.repl = Lock.repl = pdb
if actor._cancel_called: # enter REPL from root, no TTY locking IPC ctx necessary
# service nursery won't be usable and we _enter_repl_sync(debug_func)
# don't want to lock up the root either way since return # next branch is mutex and for subactors
# we're in (the midst of) cancellation.
# TODO: need a more robust check for the "root" actor
elif (
not is_root_process()
and actor._parent_chan # a connected child
):
if DebugStatus.repl_task:
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
#
# NOTE: noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some
# subactor that would otherwise not bubble until the
# next checkpoint was hit.
if (
(repl_task := DebugStatus.repl_task)
and
repl_task is task
):
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'ignoring..'
)
await trio.lowlevel.checkpoint()
return return
raise # if **this** actor is already in debug REPL we want
# to maintain actor-local-task mutex access, so block
# here waiting for the control to be released - this
# -> allows for recursive entries to `tractor.pause()`
log.warning(
f'{task.name}@{actor.uid} already has TTY lock\n'
f'waiting for release..'
)
await DebugStatus.repl_release.wait()
await trio.sleep(0.1)
# enter REPL # mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
DebugStatus.repl_task = task
try: # this **must** be awaited by the caller and is done using the
_enter_repl_sync(debug_func) # root nursery so that the debugger can continue to run without
finally: # being restricted by the scope of a new task nursery.
Lock.unshield_sigint()
# TODO: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE: MUST it here bc multiple tasks are spawned by any
# one sub-actor AND there will be a race between when the
# root locking task delivers the `Started(pld=LockStatus)`
# and when the REPL is actually entered here. SO ensure
# the codec is set before either are run!
#
with (
# _codec.limit_msg_spec(
# payload_spec=__msg_spec__,
# ) as debug_codec,
trio.CancelScope(shield=shield),
):
# async with trio.open_nursery() as tn:
# tn.cancel_scope.shield = True
try:
# cs: trio.CancelScope = await tn.start(
cs: trio.CancelScope = await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
(task.name, id(task)),
)
# our locker task should be the one in ctx
# with the root actor
assert DebugStatus.req_cs is cs
# XXX used by the SIGINT handler to check if
# THIS actor is in REPL interaction
Lock.repl = pdb
except RuntimeError:
Lock.release()
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
return
raise
# enter REPL
try:
_enter_repl_sync(debug_func)
finally:
DebugStatus.unshield_sigint()
except BaseException:
log.exception(
'Failed to engage debugger via `_pause()` ??\n'
)
raise
# XXX: apparently we can't do this without showing this frame # XXX: apparently we can't do this without showing this frame
@ -1527,45 +1522,16 @@ async def pause(
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
if shield: with trio.CancelScope(
# NOTE XXX: even hard coding this inside the `class CancelScope:` shield=shield,
# doesn't seem to work for me!? ) as cs:
# ^ XXX ^
# def _exit(self, *args, **kwargs): # NOTE: so the caller can always manually cancel even
# __tracebackhide__: bool = True # if shielded!
# super().__exit__(*args, **kwargs) task_status.started(cs)
# trio.CancelScope.__enter__.__tracebackhide__ = True
# trio.CancelScope.__exit__.__tracebackhide__ = True
# import types
# with trio.CancelScope(shield=shield) as cs:
# cs.__exit__ = types.MethodType(_exit, cs)
# cs.__exit__.__tracebackhide__ = True
# TODO: LOL, solved this with the `pdb.hideframe` stuff
# at top-of-mod.. so I guess we can just only use this
# block right?
with trio.CancelScope(
shield=shield,
) as cs:
print(f'debug cs is {cs}\n')
# setattr(cs.__exit__.__func__, '__tracebackhide__', True)
# setattr(cs.__enter__.__func__, '__tracebackhide__', True)
# NOTE: so the caller can always cancel even if shielded
task_status.started(cs)
return await _pause(
debug_func=debug_func,
shield=True,
task_status=task_status,
**_pause_kwargs
)
else:
return await _pause( return await _pause(
debug_func=debug_func, debug_func=debug_func,
shield=False, shield=shield,
task_status=task_status, task_status=task_status,
**_pause_kwargs **_pause_kwargs
) )
@ -1682,7 +1648,7 @@ def pause_from_sync(
) )
) )
# TODO: maybe the `trio.current_task()` id/name if avail? # TODO: maybe the `trio.current_task()` id/name if avail?
Lock.local_task_in_debug: str = str(threading.current_thread()) DebugStatus.repl_task: str = str(threading.current_thread())
else: # we are presumably the `trio.run()` + main thread else: # we are presumably the `trio.run()` + main thread
greenback.await_( greenback.await_(
@ -1692,7 +1658,7 @@ def pause_from_sync(
hide_tb=hide_tb, hide_tb=hide_tb,
) )
) )
Lock.local_task_in_debug: str = current_task() DebugStatus.repl_task: str = current_task()
# TODO: ensure we aggressively make the user aware about # TODO: ensure we aggressively make the user aware about
# entering the global ``breakpoint()`` built-in from sync # entering the global ``breakpoint()`` built-in from sync
@ -1754,7 +1720,8 @@ def _post_mortem(
log.pdb( log.pdb(
f'{_crash_msg}\n' f'{_crash_msg}\n'
'|\n' '|\n'
f'|_ {current_task()}\n' # f'|_ {current_task()}\n'
f'|_ {current_task()} @ {actor.uid}\n'
# f'|_ @{actor.uid}\n' # f'|_ @{actor.uid}\n'
# TODO: make an `Actor.__repr()__` # TODO: make an `Actor.__repr()__`