Compare commits
9 Commits
main
...
asyncio_de
Author | SHA1 | Date |
---|---|---|
|
697900deb1 | |
|
2e55c124b1 | |
|
0f21c8ba6a | |
|
7b7410bc0f | |
|
b59cba74cd | |
|
7e39ef7ed1 | |
|
c8ea0fdf53 | |
|
885319e9ae | |
|
b815b61707 |
|
@ -3,8 +3,8 @@
|
|||
|gh_actions|
|
||||
|docs|
|
||||
|
||||
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
|
||||
built on trio_.
|
||||
``tractor`` is a `structured concurrent`_, (optionally
|
||||
distributed_) multi-processing_ runtime built on trio_.
|
||||
|
||||
Fundamentally, ``tractor`` gives you parallelism via
|
||||
``trio``-"*actors*": independent Python processes (aka
|
||||
|
@ -17,11 +17,20 @@ protocol" constructed on top of multiple Pythons each running a ``trio``
|
|||
scheduled runtime - a call to ``trio.run()``.
|
||||
|
||||
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
||||
but likely *does not* look like what *you* probably think an "actor
|
||||
model" looks like, and that's *intentional*.
|
||||
but likely **does not** look like what **you** probably *think* an "actor
|
||||
model" looks like, and that's **intentional**.
|
||||
|
||||
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
|
||||
A great place to start is the `trio docs`_ and this `blog post`_.
|
||||
|
||||
Where do i start!?
|
||||
------------------
|
||||
The first step to grok ``tractor`` is to get an intermediate
|
||||
knowledge of ``trio`` and **structured concurrency** B)
|
||||
|
||||
Some great places to start are,
|
||||
- the seminal `blog post`_
|
||||
- obviously the `trio docs`_
|
||||
- wikipedia's nascent SC_ page
|
||||
- the fancy diagrams @ libdill-docs_
|
||||
|
||||
|
||||
Features
|
||||
|
@ -593,6 +602,7 @@ matrix seems too hip, we're also mostly all in the the `trio gitter
|
|||
channel`_!
|
||||
|
||||
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
||||
.. _distributed: https://en.wikipedia.org/wiki/Distributed_computing
|
||||
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
|
||||
.. _trio: https://github.com/python-trio/trio
|
||||
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
|
||||
|
@ -611,8 +621,9 @@ channel`_!
|
|||
.. _trio docs: https://trio.readthedocs.io/en/latest/
|
||||
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
||||
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||
.. _SC: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||
.. _libdill-docs: https://sustrik.github.io/libdill/structured-concurrency.html
|
||||
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
|
||||
.. _async generators: https://www.python.org/dev/peps/pep-0525/
|
||||
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
import asyncio
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import to_asyncio
|
||||
|
||||
|
||||
async def aio_sleep_forever():
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
||||
async def bp_then_error(
|
||||
to_trio: trio.MemorySendChannel,
|
||||
from_trio: asyncio.Queue,
|
||||
|
||||
raise_after_bp: bool = True,
|
||||
|
||||
) -> None:
|
||||
|
||||
# sync with ``trio``-side (caller) task
|
||||
to_trio.send_nowait('start')
|
||||
|
||||
# NOTE: what happens here inside the hook needs some refinement..
|
||||
# => seems like it's still `._debug._set_trace()` but
|
||||
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
|
||||
# some further, at least, meta-data about the task/actoq in debug
|
||||
# in terms of making it clear it's asyncio mucking about.
|
||||
breakpoint()
|
||||
|
||||
# short checkpoint / delay
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
if raise_after_bp:
|
||||
raise ValueError('blah')
|
||||
|
||||
# TODO: test case with this so that it gets cancelled?
|
||||
else:
|
||||
# XXX NOTE: this is required in order to get the SIGINT-ignored
|
||||
# hang case documented in the module script section!
|
||||
await aio_sleep_forever()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def trio_ctx(
|
||||
ctx: tractor.Context,
|
||||
bp_before_started: bool = False,
|
||||
):
|
||||
|
||||
# this will block until the ``asyncio`` task sends a "first"
|
||||
# message, see first line in above func.
|
||||
async with (
|
||||
|
||||
to_asyncio.open_channel_from(
|
||||
bp_then_error,
|
||||
raise_after_bp=not bp_before_started,
|
||||
) as (first, chan),
|
||||
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
|
||||
assert first == 'start'
|
||||
|
||||
if bp_before_started:
|
||||
await tractor.breakpoint()
|
||||
|
||||
await ctx.started(first)
|
||||
|
||||
n.start_soon(
|
||||
to_asyncio.run_task,
|
||||
aio_sleep_forever,
|
||||
)
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def main(
|
||||
bps_all_over: bool = False,
|
||||
|
||||
) -> None:
|
||||
|
||||
async with tractor.open_nursery() as n:
|
||||
|
||||
p = await n.start_actor(
|
||||
'aio_daemon',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
debug_mode=True,
|
||||
loglevel='cancel',
|
||||
)
|
||||
|
||||
async with p.open_context(
|
||||
trio_ctx,
|
||||
bp_before_started=bps_all_over,
|
||||
) as (ctx, first):
|
||||
|
||||
assert first == 'start'
|
||||
|
||||
if bps_all_over:
|
||||
await tractor.breakpoint()
|
||||
|
||||
# await trio.sleep_forever()
|
||||
await ctx.cancel()
|
||||
assert 0
|
||||
|
||||
# TODO: case where we cancel from trio-side while asyncio task
|
||||
# has debugger lock?
|
||||
# await p.cancel_actor()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# works fine B)
|
||||
trio.run(main)
|
||||
|
||||
# will hang and ignores SIGINT !!
|
||||
# NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it
|
||||
# manually..
|
||||
# trio.run(main, True)
|
|
@ -21,7 +21,6 @@ tractor: structured concurrent ``trio``-"actors".
|
|||
from exceptiongroup import BaseExceptionGroup
|
||||
|
||||
from ._clustering import open_actor_cluster
|
||||
from ._ipc import Channel
|
||||
from ._context import (
|
||||
Context, # the type
|
||||
context, # a func-decorator
|
||||
|
@ -48,6 +47,8 @@ from ._exceptions import (
|
|||
)
|
||||
from ._debug import (
|
||||
breakpoint,
|
||||
pause,
|
||||
pause_from_sync,
|
||||
post_mortem,
|
||||
)
|
||||
from . import msg
|
||||
|
@ -55,31 +56,35 @@ from ._root import (
|
|||
run_daemon,
|
||||
open_root_actor,
|
||||
)
|
||||
from ._ipc import Channel
|
||||
from ._portal import Portal
|
||||
from ._runtime import Actor
|
||||
|
||||
|
||||
__all__ = [
|
||||
'Actor',
|
||||
'BaseExceptionGroup',
|
||||
'Channel',
|
||||
'Context',
|
||||
'ContextCancelled',
|
||||
'ModuleNotExposed',
|
||||
'MsgStream',
|
||||
'BaseExceptionGroup',
|
||||
'Portal',
|
||||
'RemoteActorError',
|
||||
'breakpoint',
|
||||
'context',
|
||||
'current_actor',
|
||||
'find_actor',
|
||||
'query_actor',
|
||||
'get_arbiter',
|
||||
'is_root_process',
|
||||
'msg',
|
||||
'open_actor_cluster',
|
||||
'open_nursery',
|
||||
'open_root_actor',
|
||||
'pause',
|
||||
'post_mortem',
|
||||
'pause_from_sync',
|
||||
'query_actor',
|
||||
'run_daemon',
|
||||
'stream',
|
||||
|
|
|
@ -30,7 +30,6 @@ from functools import (
|
|||
from contextlib import asynccontextmanager as acm
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
Callable,
|
||||
AsyncIterator,
|
||||
AsyncGenerator,
|
||||
|
@ -40,7 +39,10 @@ from types import FrameType
|
|||
import pdbp
|
||||
import tractor
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from trio_typing import (
|
||||
TaskStatus,
|
||||
# Task,
|
||||
)
|
||||
|
||||
from .log import get_logger
|
||||
from ._discovery import get_root
|
||||
|
@ -69,10 +71,10 @@ class Lock:
|
|||
'''
|
||||
repl: MultiActorPdb | None = None
|
||||
# placeholder for function to set a ``trio.Event`` on debugger exit
|
||||
# pdb_release_hook: Optional[Callable] = None
|
||||
# pdb_release_hook: Callable | None = None
|
||||
|
||||
_trio_handler: Callable[
|
||||
[int, Optional[FrameType]], Any
|
||||
[int, FrameType | None], Any
|
||||
] | int | None = None
|
||||
|
||||
# actor-wide variable pointing to current task name using debugger
|
||||
|
@ -83,23 +85,23 @@ class Lock:
|
|||
# and must be cancelled if this actor is cancelled via IPC
|
||||
# request-message otherwise deadlocks with the parent actor may
|
||||
# ensure
|
||||
_debugger_request_cs: Optional[trio.CancelScope] = None
|
||||
_debugger_request_cs: trio.CancelScope | None = None
|
||||
|
||||
# NOTE: set only in the root actor for the **local** root spawned task
|
||||
# which has acquired the lock (i.e. this is on the callee side of
|
||||
# the `lock_tty_for_child()` context entry).
|
||||
_root_local_task_cs_in_debug: Optional[trio.CancelScope] = None
|
||||
_root_local_task_cs_in_debug: trio.CancelScope | None = None
|
||||
|
||||
# actor tree-wide actor uid that supposedly has the tty lock
|
||||
global_actor_in_debug: Optional[tuple[str, str]] = None
|
||||
global_actor_in_debug: tuple[str, str] = None
|
||||
|
||||
local_pdb_complete: Optional[trio.Event] = None
|
||||
no_remote_has_tty: Optional[trio.Event] = None
|
||||
local_pdb_complete: trio.Event | None = None
|
||||
no_remote_has_tty: trio.Event | None = None
|
||||
|
||||
# lock in root actor preventing multi-access to local tty
|
||||
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||
|
||||
_orig_sigint_handler: Optional[Callable] = None
|
||||
_orig_sigint_handler: Callable | None = None
|
||||
_blocked: set[tuple[str, str]] = set()
|
||||
|
||||
@classmethod
|
||||
|
@ -110,6 +112,7 @@ class Lock:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
@pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()`
|
||||
def unshield_sigint(cls):
|
||||
# always restore ``trio``'s sigint handler. see notes below in
|
||||
# the pdb factory about the nightmare that is that code swapping
|
||||
|
@ -129,10 +132,6 @@ class Lock:
|
|||
if owner:
|
||||
raise
|
||||
|
||||
# actor-local state, irrelevant for non-root.
|
||||
cls.global_actor_in_debug = None
|
||||
cls.local_task_in_debug = None
|
||||
|
||||
try:
|
||||
# sometimes the ``trio`` might already be terminated in
|
||||
# which case this call will raise.
|
||||
|
@ -143,6 +142,11 @@ class Lock:
|
|||
cls.unshield_sigint()
|
||||
cls.repl = None
|
||||
|
||||
# actor-local state, irrelevant for non-root.
|
||||
cls.global_actor_in_debug = None
|
||||
cls.local_task_in_debug = None
|
||||
|
||||
|
||||
|
||||
class TractorConfig(pdbp.DefaultConfig):
|
||||
'''
|
||||
|
@ -151,7 +155,7 @@ class TractorConfig(pdbp.DefaultConfig):
|
|||
'''
|
||||
use_pygments: bool = True
|
||||
sticky_by_default: bool = False
|
||||
enable_hidden_frames: bool = False
|
||||
enable_hidden_frames: bool = True
|
||||
|
||||
# much thanks @mdmintz for the hot tip!
|
||||
# fixes line spacing issue when resizing terminal B)
|
||||
|
@ -228,26 +232,23 @@ async def _acquire_debug_lock_from_root_task(
|
|||
to the ``pdb`` repl.
|
||||
|
||||
'''
|
||||
task_name = trio.lowlevel.current_task().name
|
||||
task_name: str = trio.lowlevel.current_task().name
|
||||
we_acquired: bool = False
|
||||
|
||||
log.runtime(
|
||||
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
|
||||
)
|
||||
|
||||
we_acquired = False
|
||||
|
||||
try:
|
||||
log.runtime(
|
||||
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
||||
)
|
||||
we_acquired = True
|
||||
|
||||
# NOTE: if the surrounding cancel scope from the
|
||||
# `lock_tty_for_child()` caller is cancelled, this line should
|
||||
# unblock and NOT leave us in some kind of
|
||||
# a "child-locked-TTY-but-child-is-uncontactable-over-IPC"
|
||||
# condition.
|
||||
await Lock._debug_lock.acquire()
|
||||
we_acquired = True
|
||||
|
||||
if Lock.no_remote_has_tty is None:
|
||||
# mark the tty lock as being in use so that the runtime
|
||||
|
@ -374,7 +375,7 @@ async def wait_for_parent_stdin_hijack(
|
|||
|
||||
This function is used by any sub-actor to acquire mutex access to
|
||||
the ``pdb`` REPL and thus the root's TTY for interactive debugging
|
||||
(see below inside ``_breakpoint()``). It can be used to ensure that
|
||||
(see below inside ``_pause()``). It can be used to ensure that
|
||||
an intermediate nursery-owning actor does not clobber its children
|
||||
if they are in debug (see below inside
|
||||
``maybe_wait_for_debugger()``).
|
||||
|
@ -440,17 +441,29 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
|
|||
return pdb, Lock.unshield_sigint
|
||||
|
||||
|
||||
async def _breakpoint(
|
||||
async def _pause(
|
||||
|
||||
debug_func,
|
||||
debug_func: Callable | None = None,
|
||||
release_lock_signal: trio.Event | None = None,
|
||||
|
||||
# TODO:
|
||||
# shield: bool = False
|
||||
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Breakpoint entry for engaging debugger instance sync-interaction,
|
||||
from async code, executing in actor runtime (task).
|
||||
A pause point (more commonly known as a "breakpoint") interrupt
|
||||
instruction for engaging a blocking debugger instance to
|
||||
conduct manual console-based-REPL-interaction from within
|
||||
`tractor`'s async runtime, normally from some single-threaded
|
||||
and currently executing actor-hosted-`trio`-task in some
|
||||
(remote) process.
|
||||
|
||||
NOTE: we use the semantics "pause" since it better encompasses
|
||||
the entirety of the necessary global-runtime-state-mutation any
|
||||
actor-task must access and lock in order to get full isolated
|
||||
control over the process tree's root TTY:
|
||||
https://en.wikipedia.org/wiki/Breakpoint
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
|
@ -559,10 +572,23 @@ async def _breakpoint(
|
|||
Lock.repl = pdb
|
||||
|
||||
try:
|
||||
# block here one (at the appropriate frame *up*) where
|
||||
# ``breakpoint()`` was awaited and begin handling stdio.
|
||||
log.debug("Entering the synchronous world of pdb")
|
||||
debug_func(actor, pdb)
|
||||
# breakpoint()
|
||||
if debug_func is None:
|
||||
# assert release_lock_signal, (
|
||||
# 'Must pass `release_lock_signal: trio.Event` if no '
|
||||
# 'trace func provided!'
|
||||
# )
|
||||
print(f"{actor.uid} ENTERING WAIT")
|
||||
task_status.started()
|
||||
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await release_lock_signal.wait()
|
||||
|
||||
else:
|
||||
# block here one (at the appropriate frame *up*) where
|
||||
# ``breakpoint()`` was awaited and begin handling stdio.
|
||||
log.debug("Entering the synchronous world of pdb")
|
||||
debug_func(actor, pdb)
|
||||
|
||||
except bdb.BdbQuit:
|
||||
Lock.release()
|
||||
|
@ -583,7 +609,7 @@ async def _breakpoint(
|
|||
def shield_sigint_handler(
|
||||
signum: int,
|
||||
frame: 'frame', # type: ignore # noqa
|
||||
# pdb_obj: Optional[MultiActorPdb] = None,
|
||||
# pdb_obj: MultiActorPdb | None = None,
|
||||
*args,
|
||||
|
||||
) -> None:
|
||||
|
@ -597,7 +623,7 @@ def shield_sigint_handler(
|
|||
'''
|
||||
__tracebackhide__ = True
|
||||
|
||||
uid_in_debug = Lock.global_actor_in_debug
|
||||
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
|
||||
|
||||
actor = tractor.current_actor()
|
||||
# print(f'{actor.uid} in HANDLER with ')
|
||||
|
@ -615,14 +641,14 @@ def shield_sigint_handler(
|
|||
else:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
any_connected = False
|
||||
any_connected: bool = False
|
||||
|
||||
if uid_in_debug is not None:
|
||||
# try to see if the supposed (sub)actor in debug still
|
||||
# has an active connection to *this* actor, and if not
|
||||
# it's likely they aren't using the TTY lock / debugger
|
||||
# and we should propagate SIGINT normally.
|
||||
chans = actor._peers.get(tuple(uid_in_debug))
|
||||
chans: list[tractor.Channel] = actor._peers.get(tuple(uid_in_debug))
|
||||
if chans:
|
||||
any_connected = any(chan.connected() for chan in chans)
|
||||
if not any_connected:
|
||||
|
@ -635,7 +661,7 @@ def shield_sigint_handler(
|
|||
return do_cancel()
|
||||
|
||||
# only set in the actor actually running the REPL
|
||||
pdb_obj = Lock.repl
|
||||
pdb_obj: MultiActorPdb | None = Lock.repl
|
||||
|
||||
# root actor branch that reports whether or not a child
|
||||
# has locked debugger.
|
||||
|
@ -693,7 +719,7 @@ def shield_sigint_handler(
|
|||
)
|
||||
return do_cancel()
|
||||
|
||||
task = Lock.local_task_in_debug
|
||||
task: str | None = Lock.local_task_in_debug
|
||||
if (
|
||||
task
|
||||
and pdb_obj
|
||||
|
@ -708,8 +734,8 @@ def shield_sigint_handler(
|
|||
# elif debug_mode():
|
||||
|
||||
else: # XXX: shouldn't ever get here?
|
||||
print("WTFWTFWTF")
|
||||
raise KeyboardInterrupt
|
||||
raise RuntimeError("WTFWTFWTF")
|
||||
# raise KeyboardInterrupt("WTFWTFWTF")
|
||||
|
||||
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
|
||||
# it looks to be that the last command that was run (eg. ll)
|
||||
|
@ -737,21 +763,18 @@ def shield_sigint_handler(
|
|||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||
|
||||
# XXX LEGACY: lol, see ``pdbpp`` issue:
|
||||
# https://github.com/pdbpp/pdbpp/issues/496
|
||||
|
||||
|
||||
def _set_trace(
|
||||
actor: tractor.Actor | None = None,
|
||||
pdb: MultiActorPdb | None = None,
|
||||
):
|
||||
__tracebackhide__ = True
|
||||
actor = actor or tractor.current_actor()
|
||||
actor: tractor.Actor = actor or tractor.current_actor()
|
||||
|
||||
# start 2 levels up in user code
|
||||
frame: Optional[FrameType] = sys._getframe()
|
||||
frame: FrameType | None = sys._getframe()
|
||||
if frame:
|
||||
frame = frame.f_back # type: ignore
|
||||
frame: FrameType = frame.f_back # type: ignore
|
||||
|
||||
if (
|
||||
frame
|
||||
|
@ -771,12 +794,76 @@ def _set_trace(
|
|||
Lock.local_task_in_debug = 'sync'
|
||||
|
||||
pdb.set_trace(frame=frame)
|
||||
# undo_
|
||||
|
||||
|
||||
breakpoint = partial(
|
||||
_breakpoint,
|
||||
# TODO: allow pausing from sync code, normally by remapping
|
||||
# python's builtin breakpoint() hook to this runtime aware version.
|
||||
def pause_from_sync() -> None:
|
||||
print("ENTER SYNC PAUSE")
|
||||
import greenback
|
||||
__tracebackhide__ = True
|
||||
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
# task_can_release_tty_lock = trio.Event()
|
||||
|
||||
# spawn bg task which will lock out the TTY, we poll
|
||||
# just below until the release event is reporting that task as
|
||||
# waiting.. not the most ideal but works for now ;)
|
||||
greenback.await_(
|
||||
actor._service_n.start(partial(
|
||||
_pause,
|
||||
debug_func=None,
|
||||
# release_lock_signal=task_can_release_tty_lock,
|
||||
))
|
||||
)
|
||||
|
||||
db, undo_sigint = mk_mpdb()
|
||||
Lock.local_task_in_debug = 'sync'
|
||||
# db.config.enable_hidden_frames = True
|
||||
|
||||
# we entered the global ``breakpoint()`` built-in from sync
|
||||
# code?
|
||||
frame: FrameType | None = sys._getframe()
|
||||
# print(f'FRAME: {str(frame)}')
|
||||
# assert not db._is_hidden(frame)
|
||||
|
||||
frame: FrameType = frame.f_back # type: ignore
|
||||
# print(f'FRAME: {str(frame)}')
|
||||
# if not db._is_hidden(frame):
|
||||
# pdbp.set_trace()
|
||||
# db._hidden_frames.append(
|
||||
# (frame, frame.f_lineno)
|
||||
# )
|
||||
db.set_trace(frame=frame)
|
||||
# NOTE XXX: see the `@pdbp.hideframe` decoration
|
||||
# on `Lock.unshield_sigint()`.. I have NO CLUE why
|
||||
# the next instruction's def frame is being shown
|
||||
# in the tb but it seems to be something wonky with
|
||||
# the way `pdb` core works?
|
||||
# undo_sigint()
|
||||
|
||||
# Lock.global_actor_in_debug = actor.uid
|
||||
# Lock.release()
|
||||
# task_can_release_tty_lock.set()
|
||||
|
||||
|
||||
# using the "pause" semantics instead since
|
||||
# that better covers actually somewhat "pausing the runtime"
|
||||
# for this particular paralell task to do debugging B)
|
||||
pause = partial(
|
||||
_pause,
|
||||
_set_trace,
|
||||
)
|
||||
pp = pause # short-hand for "pause point"
|
||||
|
||||
|
||||
async def breakpoint(**kwargs):
|
||||
log.warning(
|
||||
'`tractor.breakpoint()` is deprecated!\n'
|
||||
'Please use `tractor.pause()` instead!\n'
|
||||
)
|
||||
await pause(**kwargs)
|
||||
|
||||
|
||||
def _post_mortem(
|
||||
|
@ -801,7 +888,7 @@ def _post_mortem(
|
|||
|
||||
|
||||
post_mortem = partial(
|
||||
_breakpoint,
|
||||
_pause,
|
||||
_post_mortem,
|
||||
)
|
||||
|
||||
|
@ -883,8 +970,7 @@ async def maybe_wait_for_debugger(
|
|||
# will make the pdb repl unusable.
|
||||
# Instead try to wait for pdb to be released before
|
||||
# tearing down.
|
||||
|
||||
sub_in_debug = None
|
||||
sub_in_debug: tuple[str, str] | None = None
|
||||
|
||||
for _ in range(poll_steps):
|
||||
|
||||
|
@ -904,13 +990,15 @@ async def maybe_wait_for_debugger(
|
|||
|
||||
debug_complete = Lock.no_remote_has_tty
|
||||
if (
|
||||
(debug_complete and
|
||||
not debug_complete.is_set())
|
||||
debug_complete
|
||||
and sub_in_debug is not None
|
||||
and not debug_complete.is_set()
|
||||
):
|
||||
log.debug(
|
||||
log.pdb(
|
||||
'Root has errored but pdb is in use by '
|
||||
f'child {sub_in_debug}\n'
|
||||
'Waiting on tty lock to release..')
|
||||
'Waiting on tty lock to release..'
|
||||
)
|
||||
|
||||
await debug_complete.wait()
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ async def open_root_actor(
|
|||
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
||||
builtin_bp_handler = sys.breakpointhook
|
||||
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
|
||||
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
|
||||
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.pause_from_sync'
|
||||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger lock state.
|
||||
|
@ -237,10 +237,10 @@ async def open_root_actor(
|
|||
) as err:
|
||||
|
||||
entered = await _debug._maybe_enter_pm(err)
|
||||
|
||||
if (
|
||||
not entered
|
||||
and not is_multi_cancelled(err)
|
||||
and
|
||||
not is_multi_cancelled(err)
|
||||
):
|
||||
logger.exception('Root actor crashed:\n')
|
||||
|
||||
|
|
|
@ -632,7 +632,7 @@ class Actor:
|
|||
and not db_cs.cancel_called
|
||||
and uid == pdb_user_uid
|
||||
):
|
||||
log.warning(
|
||||
log.critical(
|
||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||
)
|
||||
# TODO: figure out why this breaks tests..
|
||||
|
@ -1723,4 +1723,6 @@ class Arbiter(Actor):
|
|||
|
||||
) -> None:
|
||||
uid = (str(uid[0]), str(uid[1]))
|
||||
self._registry.pop(uid)
|
||||
entry: tuple = self._registry.pop(uid, None)
|
||||
if entry is None:
|
||||
log.warning(f'Request to de-register {uid} failed?')
|
||||
|
|
|
@ -28,7 +28,6 @@ from typing import (
|
|||
Callable,
|
||||
AsyncIterator,
|
||||
Awaitable,
|
||||
Optional,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
@ -65,9 +64,9 @@ class LinkedTaskChannel(trio.abc.Channel):
|
|||
_trio_exited: bool = False
|
||||
|
||||
# set after ``asyncio.create_task()``
|
||||
_aio_task: Optional[asyncio.Task] = None
|
||||
_aio_err: Optional[BaseException] = None
|
||||
_broadcaster: Optional[BroadcastReceiver] = None
|
||||
_aio_task: asyncio.Task | None = None
|
||||
_aio_err: BaseException | None = None
|
||||
_broadcaster: BroadcastReceiver | None = None
|
||||
|
||||
async def aclose(self) -> None:
|
||||
await self._from_aio.aclose()
|
||||
|
@ -188,7 +187,7 @@ def _run_asyncio_task(
|
|||
|
||||
cancel_scope = trio.CancelScope()
|
||||
aio_task_complete = trio.Event()
|
||||
aio_err: Optional[BaseException] = None
|
||||
aio_err: BaseException | None = None
|
||||
|
||||
chan = LinkedTaskChannel(
|
||||
aio_q, # asyncio.Queue
|
||||
|
@ -270,7 +269,7 @@ def _run_asyncio_task(
|
|||
'''
|
||||
nonlocal chan
|
||||
aio_err = chan._aio_err
|
||||
task_err: Optional[BaseException] = None
|
||||
task_err: BaseException | None = None
|
||||
|
||||
# only to avoid ``asyncio`` complaining about uncaptured
|
||||
# task exceptions
|
||||
|
@ -350,11 +349,11 @@ async def translate_aio_errors(
|
|||
'''
|
||||
trio_task = trio.lowlevel.current_task()
|
||||
|
||||
aio_err: Optional[BaseException] = None
|
||||
aio_err: BaseException | None = None
|
||||
|
||||
# TODO: make thisi a channel method?
|
||||
def maybe_raise_aio_err(
|
||||
err: Optional[Exception] = None
|
||||
err: Exception | None = None
|
||||
) -> None:
|
||||
aio_err = chan._aio_err
|
||||
if (
|
||||
|
|
Loading…
Reference in New Issue