forked from goodboy/tractor
1
0
Fork 0

Fix up var naming and typing

windows_bi_streaming
Tyler Goodlet 2021-05-12 12:01:43 -04:00
parent 7eb76e8d97
commit b0bcb430bf
1 changed files with 48 additions and 40 deletions
tractor

View File

@ -1,11 +1,12 @@
""" """
Multi-core debugging for da peeps! Multi-core debugging for da peeps!
""" """
import bdb import bdb
import sys import sys
from functools import partial from functools import partial
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator from typing import Tuple, Optional, Callable, AsyncIterator
import tractor import tractor
import trio import trio
@ -30,16 +31,21 @@ log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem'] __all__ = ['breakpoint', 'post_mortem']
# TODO: wrap all these in a static global class: ``DebugLock`` maybe?
# placeholder for function to set a ``trio.Event`` on debugger exit # placeholder for function to set a ``trio.Event`` on debugger exit
_pdb_release_hook: Optional[Callable] = None _pdb_release_hook: Optional[Callable] = None
# actor-wide variable pointing to current task name using debugger # actor-wide variable pointing to current task name using debugger
_in_debug = False _local_task_in_debug: Optional[str] = None
# actor tree-wide actor uid that supposedly has the tty lock
_global_actor_in_debug: Optional[Tuple[str, str]] = None
# lock in root actor preventing multi-access to local tty # lock in root actor preventing multi-access to local tty
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_debug_lock._uid = None _pdb_complete: Optional[trio.Event] = None
_pdb_complete: trio.Event = None
# XXX: set by the current task waiting on the root tty lock # XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message # and must be cancelled if this actor is cancelled via message
@ -62,19 +68,19 @@ class PdbwTeardown(pdbpp.Pdb):
# TODO: figure out how to dissallow recursive .set_trace() entry # TODO: figure out how to dissallow recursive .set_trace() entry
# since that'll cause deadlock for us. # since that'll cause deadlock for us.
def set_continue(self): def set_continue(self):
global _in_debug
try: try:
super().set_continue() super().set_continue()
finally: finally:
_in_debug = False global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook() _pdb_release_hook()
def set_quit(self): def set_quit(self):
global _in_debug
try: try:
super().set_quit() super().set_quit()
finally: finally:
_in_debug = False global _local_task_in_debug
_local_task_in_debug = None
_pdb_release_hook() _pdb_release_hook()
@ -120,21 +126,22 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
"""Acquire a actor local FIFO lock meant to mutex entry to a local """Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering by multiple processes. debugger entry point to avoid tty clobbering by multiple processes.
""" """
global _debug_lock global _debug_lock, _global_actor_in_debug
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
log.runtime( log.debug(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
async with _debug_lock: async with _debug_lock:
_debug_lock._uid = uid # _debug_lock._uid = uid
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}") _global_actor_in_debug = uid
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
yield yield
_debug_lock._uid = None _global_actor_in_debug = None
log.runtime(f"TTY lock released, remote task: {task_name}:{uid}") log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
# @contextmanager # @contextmanager
@ -151,10 +158,10 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
@tractor.context @tractor.context
async def _hijack_stdin_relay_to_child( async def _hijack_stdin_relay_to_child(
ctx: tractor.context, ctx: tractor.Context,
subactor_uid: Tuple[str, str] subactor_uid: Tuple[str, str]
) -> AsyncIterator[str]: ) -> None:
global _pdb_complete global _pdb_complete
@ -168,7 +175,7 @@ async def _hijack_stdin_relay_to_child(
f"remote task: {task_name}:{subactor_uid}" f"remote task: {task_name}:{subactor_uid}"
) )
log.runtime(f"Actor {subactor_uid} is WAITING on stdin hijack lock") log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
async with _acquire_debug_lock(subactor_uid): async with _acquire_debug_lock(subactor_uid):
@ -176,30 +183,29 @@ async def _hijack_stdin_relay_to_child(
# indicate to child that we've locked stdio # indicate to child that we've locked stdio
await ctx.started('Locked') await ctx.started('Locked')
log.runtime(f"Actor {subactor_uid} ACQUIRED stdin hijack lock") log.runtime( # type: ignore
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# wait for unlock pdb by child # wait for unlock pdb by child
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
assert await stream.receive() == 'Unlock' assert await stream.receive() == 'Unlock'
log.runtime( log.debug(
f"TTY lock released, remote task: {task_name}:{subactor_uid}") f"TTY lock released, remote task: {task_name}:{subactor_uid}")
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
# XXX: We only make this sync in case someone wants to async def _breakpoint(debug_func) -> None:
# overload the ``breakpoint()`` built-in.
async def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery """``tractor`` breakpoint entry for engaging pdb machinery
in subactors. in subactors.
""" """
actor = tractor.current_actor() actor = tractor.current_actor()
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
global _pdb_complete global _pdb_complete, _pdb_release_hook
global _pdb_release_hook global _local_task_in_debug, _global_actor_in_debug
global _in_debug
async def wait_for_parent_stdin_hijack( async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
@ -232,8 +238,8 @@ async def _breakpoint(debug_func) -> Awaitable[None]:
finally: finally:
log.debug(f"Exiting debugger for actor {actor}") log.debug(f"Exiting debugger for actor {actor}")
global _in_debug global _local_task_in_debug
_in_debug = False _local_task_in_debug = None
log.debug(f"Child {actor} released parent stdio lock") log.debug(f"Child {actor} released parent stdio lock")
if not _pdb_complete or _pdb_complete.is_set(): if not _pdb_complete or _pdb_complete.is_set():
@ -241,8 +247,8 @@ async def _breakpoint(debug_func) -> Awaitable[None]:
# TODO: need a more robust check for the "root" actor # TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process(): if actor._parent_chan and not is_root_process():
if _in_debug: if _local_task_in_debug:
if _in_debug == task_name: if _local_task_in_debug == task_name:
# this task already has the lock and is # this task already has the lock and is
# likely recurrently entering a breakpoint # likely recurrently entering a breakpoint
return return
@ -250,14 +256,14 @@ async def _breakpoint(debug_func) -> Awaitable[None]:
# if **this** actor is already in debug mode block here # if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows # waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()` # support for recursive entries to `tractor.breakpoint()`
log.warning( log.warning(f"{actor.uid} already has a debug lock, waiting...")
f"Actor {actor.uid} already has a debug lock, waiting...")
await _pdb_complete.wait() await _pdb_complete.wait()
await trio.sleep(0.1) await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent # mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process # entries/requests to the root process
_in_debug = task_name _local_task_in_debug = task_name
# assign unlock callback for debugger teardown hooks # assign unlock callback for debugger teardown hooks
_pdb_release_hook = _pdb_complete.set _pdb_release_hook = _pdb_complete.set
@ -276,7 +282,7 @@ async def _breakpoint(debug_func) -> Awaitable[None]:
# TODO: wait, what about multiple root tasks acquiring # TODO: wait, what about multiple root tasks acquiring
# it though.. shrug? # it though.. shrug?
# root process (us) already has it; ignore # root process (us) already has it; ignore
if _debug_lock._uid == actor.uid: if _global_actor_in_debug == actor.uid:
return return
# XXX: since we need to enter pdb synchronously below, # XXX: since we need to enter pdb synchronously below,
@ -284,15 +290,17 @@ async def _breakpoint(debug_func) -> Awaitable[None]:
# callbacks. Can't think of a nicer way then this atm. # callbacks. Can't think of a nicer way then this atm.
await _debug_lock.acquire() await _debug_lock.acquire()
_debug_lock._uid = actor.uid _global_actor_in_debug = actor.uid
_local_task_in_debug = task_name
# the lock must be released on pdb completion # the lock must be released on pdb completion
def teardown(): def teardown():
global _pdb_complete global _pdb_complete, _debug_lock
global _debug_lock global _global_actor_in_debug, _local_task_in_debug
_debug_lock.release() _debug_lock.release()
_debug_lock._uid = None _global_actor_in_debug = None
_local_task_in_debug = None
_pdb_complete.set() _pdb_complete.set()
_pdb_release_hook = teardown _pdb_release_hook = teardown
@ -321,7 +329,7 @@ def _set_trace(actor=None):
pdb = _mk_pdb() pdb = _mk_pdb()
if actor is not None: if actor is not None:
log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") # type: ignore
pdb.set_trace( pdb.set_trace(
# start 2 levels up in user code # start 2 levels up in user code
@ -330,8 +338,8 @@ def _set_trace(actor=None):
else: else:
# we entered the global ``breakpoint()`` built-in from sync code # we entered the global ``breakpoint()`` built-in from sync code
global _in_debug, _pdb_release_hook global _local_task_in_debug, _pdb_release_hook
_in_debug = 'sync' _local_task_in_debug = 'sync'
def nuttin(): def nuttin():
pass pass