forked from goodboy/tractor
1
0
Fork 0

First draft workin minus non-main-thread usage!

pause_from_sync_w_greenback
Tyler Goodlet 2024-03-20 19:13:13 -04:00
parent 8e66f45e23
commit c04d77a3c9
2 changed files with 172 additions and 74 deletions

View File

@ -0,0 +1,69 @@
import trio
import tractor
def sync_pause():
tractor.pause_from_sync()
@tractor.context
async def start_n_sync_pause(
ctx: tractor.Context,
):
# sync to requesting peer
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
print(f'entering SYNC PAUSE in {actor.uid}')
sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}')
async def main() -> None:
from tractor._rpc import maybe_import_gb
async with tractor.open_nursery(
debug_mode=True,
) as an:
# TODO: where to put this?
# => just inside `open_root_actor()` yah?
await maybe_import_gb()
p: tractor.Portal = await an.start_actor(
'subactor',
enable_modules=[__name__],
# infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
)
# TODO: 3 sub-actor usage cases:
# -[ ] via a `.run_in_actor()` call
# -[ ] via a `.run()`
# -[ ] via a `.open_context()`
#
async with p.open_context(
start_n_sync_pause,
) as (ctx, first):
assert first is None
await tractor.pause()
sync_pause()
# TODO: make this work!!
await trio.to_thread.run_sync(
sync_pause,
abandon_on_cancel=False,
)
await ctx.cancel()
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
await p.cancel_actor()
if __name__ == '__main__':
trio.run(main)

View File

@ -46,7 +46,7 @@ import pdbp
import tractor import tractor
import trio import trio
from trio.lowlevel import current_task from trio.lowlevel import current_task
from trio_typing import ( from trio import (
TaskStatus, TaskStatus,
# Task, # Task,
) )
@ -400,7 +400,6 @@ async def wait_for_parent_stdin_hijack(
# this syncs to child's ``Context.started()`` call. # this syncs to child's ``Context.started()`` call.
async with portal.open_context( async with portal.open_context(
lock_tty_for_child, lock_tty_for_child,
subactor_uid=actor_uid, subactor_uid=actor_uid,
@ -682,7 +681,10 @@ def _set_trace(
async def _pause( async def _pause(
debug_func: Callable = _set_trace, debug_func: Callable = _set_trace,
release_lock_signal: trio.Event | None = None,
# NOTE: must be passed in the `.pause_from_sync()` case!
pdb: MultiActorPdb|None = None,
undo_sigint: Callable|None = None,
# TODO: allow caller to pause despite task cancellation, # TODO: allow caller to pause despite task cancellation,
# exactly the same as wrapping with: # exactly the same as wrapping with:
@ -691,8 +693,7 @@ async def _pause(
# => the REMAINING ISSUE is that the scope's .__exit__() frame # => the REMAINING ISSUE is that the scope's .__exit__() frame
# is always show in the debugger on entry.. and there seems to # is always show in the debugger on entry.. and there seems to
# be no way to override it?.. # be no way to override it?..
# shield: bool = False, #
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
@ -707,7 +708,6 @@ async def _pause(
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = True
actor = current_actor() actor = current_actor()
pdb, undo_sigint = mk_mpdb()
task_name: str = trio.lowlevel.current_task().name task_name: str = trio.lowlevel.current_task().name
if ( if (
@ -716,10 +716,15 @@ async def _pause(
): ):
Lock.local_pdb_complete = trio.Event() Lock.local_pdb_complete = trio.Event()
if debug_func is not None:
debug_func = partial( debug_func = partial(
debug_func, debug_func,
) )
if pdb is None:
assert undo_sigint is None, 'You must pass both!?!'
pdb, undo_sigint = mk_mpdb()
# TODO: need a more robust check for the "root" actor # TODO: need a more robust check for the "root" actor
if ( if (
not is_root_process() not is_root_process()
@ -761,12 +766,14 @@ async def _pause(
# ``` # ```
# but not entirely sure if that's a sane way to implement it? # but not entirely sure if that's a sane way to implement it?
try: try:
print("ACQUIRING TTY LOCK from CHILD")
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await actor._service_n.start( await actor._service_n.start(
wait_for_parent_stdin_hijack, wait_for_parent_stdin_hijack,
actor.uid, actor.uid,
) )
Lock.repl = pdb Lock.repl = pdb
except RuntimeError: except RuntimeError:
Lock.release() Lock.release()
@ -779,11 +786,13 @@ async def _pause(
raise raise
elif is_root_process(): elif is_root_process():
print("ROOT TTY LOCK BRANCH")
# we also wait in the root-parent for any child that # we also wait in the root-parent for any child that
# may have the tty locked prior # may have the tty locked prior
# TODO: wait, what about multiple root tasks acquiring it though? # TODO: wait, what about multiple root tasks acquiring it though?
if Lock.global_actor_in_debug == actor.uid: if Lock.global_actor_in_debug == actor.uid:
print("ROOT ALREADY HAS TTY?")
# re-entrant root process already has it: noop. # re-entrant root process already has it: noop.
return return
@ -797,11 +806,14 @@ async def _pause(
# must shield here to avoid hitting a ``Cancelled`` and # must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty # a child getting stuck bc we clobbered the tty
print("ACQUIRING TTY LOCK from ROOT")
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await Lock._debug_lock.acquire() await Lock._debug_lock.acquire()
else: else:
# may be cancelled # may be cancelled
print("ROOT TRYING LOCK ACQUIRE")
await Lock._debug_lock.acquire() await Lock._debug_lock.acquire()
print("ROOT LOCKED TTY")
Lock.global_actor_in_debug = actor.uid Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task_name Lock.local_task_in_debug = task_name
@ -811,16 +823,11 @@ async def _pause(
# TODO: do we want to support using this **just** for the # TODO: do we want to support using this **just** for the
# locking / common code (prolly to help address #320)? # locking / common code (prolly to help address #320)?
# #
# if debug_func is None: if debug_func is None:
# assert release_lock_signal, ( task_status.started(Lock)
# 'Must pass `release_lock_signal: trio.Event` if no ' print("ROOT .started(Lock) now!")
# 'trace func provided!'
# )
# print(f"{actor.uid} ENTERING WAIT")
# with trio.CancelScope(shield=True):
# await release_lock_signal.wait()
# else: else:
# block here one (at the appropriate frame *up*) where # block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio. # ``breakpoint()`` was awaited and begin handling stdio.
log.debug('Entering sync world of the `pdb` REPL..') log.debug('Entering sync world of the `pdb` REPL..')
@ -862,8 +869,7 @@ async def _pause(
async def pause( async def pause(
debug_func: Callable = _set_trace, debug_func: Callable|None = _set_trace,
release_lock_signal: trio.Event | None = None,
# TODO: allow caller to pause despite task cancellation, # TODO: allow caller to pause despite task cancellation,
# exactly the same as wrapping with: # exactly the same as wrapping with:
@ -872,10 +878,11 @@ async def pause(
# => the REMAINING ISSUE is that the scope's .__exit__() frame # => the REMAINING ISSUE is that the scope's .__exit__() frame
# is always show in the debugger on entry.. and there seems to # is always show in the debugger on entry.. and there seems to
# be no way to override it?.. # be no way to override it?..
# shield: bool = False, #
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
**_pause_kwargs,
) -> None: ) -> None:
''' '''
@ -920,16 +927,16 @@ async def pause(
task_status.started(cs) task_status.started(cs)
return await _pause( return await _pause(
debug_func=debug_func, debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=True, shield=True,
task_status=task_status, task_status=task_status,
**_pause_kwargs
) )
else: else:
return await _pause( return await _pause(
debug_func=debug_func, debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=False, shield=False,
task_status=task_status, task_status=task_status,
**_pause_kwargs
) )
@ -938,46 +945,64 @@ async def pause(
# TODO: allow pausing from sync code. # TODO: allow pausing from sync code.
# normally by remapping python's builtin breakpoint() hook to this # normally by remapping python's builtin breakpoint() hook to this
# runtime aware version which takes care of all . # runtime aware version which takes care of all .
def pause_from_sync() -> None: def pause_from_sync(
print("ENTER SYNC PAUSE") hide_tb: bool = True
) -> None:
__tracebackhide__: bool = hide_tb
actor: tractor.Actor = current_actor( actor: tractor.Actor = current_actor(
err_on_no_runtime=False, err_on_no_runtime=False,
) )
if actor: print(
f'{actor.uid}: JUST ENTERED `tractor.pause_from_sync()`'
f'|_{actor}\n'
)
if not actor:
raise RuntimeError(
'Not inside the `tractor`-runtime?\n'
'`tractor.pause_from_sync()` is not functional without a wrapping\n'
'- `async with tractor.open_nursery()` or,\n'
'- `async with tractor.open_root_actor()`\n'
)
try: try:
import greenback import greenback
# __tracebackhide__ = True except ModuleNotFoundError:
raise RuntimeError(
'The `greenback` lib is required to use `tractor.pause_from_sync()`!\n'
'https://github.com/oremanj/greenback\n'
)
# out = greenback.await_(
# task_can_release_tty_lock = trio.Event() # actor._service_n.start(partial(
# pause,
# debug_func=None,
# release_lock_signal=task_can_release_tty_lock,
# ))
# )
# spawn bg task which will lock out the TTY, we poll # spawn bg task which will lock out the TTY, we poll
# just below until the release event is reporting that task as # just below until the release event is reporting that task as
# waiting.. not the most ideal but works for now ;) # waiting.. not the most ideal but works for now ;)
db, undo_sigint = mk_mpdb()
greenback.await_( greenback.await_(
actor._service_n.start(partial( pause(
pause,
debug_func=None, debug_func=None,
# release_lock_signal=task_can_release_tty_lock, pdb=db,
)) undo_sigint=undo_sigint,
)
) )
except ModuleNotFoundError:
log.warning('NO GREENBACK FOUND')
else:
log.warning('Not inside actor-runtime')
db, undo_sigint = mk_mpdb()
Lock.local_task_in_debug = 'sync' Lock.local_task_in_debug = 'sync'
# db.config.enable_hidden_frames = True
# we entered the global ``breakpoint()`` built-in from sync # TODO: ensure we aggressively make the user aware about
# entering the global ``breakpoint()`` built-in from sync
# code? # code?
frame: FrameType | None = sys._getframe() frame: FrameType | None = sys._getframe()
# print(f'FRAME: {str(frame)}')
# assert not db._is_hidden(frame)
frame: FrameType = frame.f_back # type: ignore frame: FrameType = frame.f_back # type: ignore
# db.config.enable_hidden_frames = True
# assert not db._is_hidden(frame)
# print(f'FRAME: {str(frame)}') # print(f'FRAME: {str(frame)}')
# if not db._is_hidden(frame): # if not db._is_hidden(frame):
# pdbp.set_trace() # pdbp.set_trace()
@ -985,17 +1010,21 @@ def pause_from_sync() -> None:
# (frame, frame.f_lineno) # (frame, frame.f_lineno)
# ) # )
db.set_trace(frame=frame) db.set_trace(frame=frame)
# NOTE XXX: see the `@pdbp.hideframe` decoration
# on `Lock.unshield_sigint()`.. I have NO CLUE why # XXX NOTE XXX no other LOC can be here without it
# showing up in the REPL's last stack frame !?!
# -[ ] tried to use `@pdbp.hideframe` decoration but
# still doesn't work
#
# FROM BEFORE: on `Lock.unshield_sigint()`.. I have NO CLUE why
# the next instruction's def frame is being shown # the next instruction's def frame is being shown
# in the tb but it seems to be something wonky with # in the tb but it seems to be something wonky with
# the way `pdb` core works? # the way `pdb` core works?
#
# NOTE: not needed any more anyway since it's all in
# `Lock.release()` now!
# undo_sigint() # undo_sigint()
# Lock.global_actor_in_debug = actor.uid
# Lock.release()
# task_can_release_tty_lock.set()
# using the "pause" semantics instead since # using the "pause" semantics instead since
# that better covers actually somewhat "pausing the runtime" # that better covers actually somewhat "pausing the runtime"