Compare commits
9 Commits
main
...
asyncio_de
Author | SHA1 | Date |
---|---|---|
|
697900deb1 | |
|
2e55c124b1 | |
|
0f21c8ba6a | |
|
7b7410bc0f | |
|
b59cba74cd | |
|
7e39ef7ed1 | |
|
c8ea0fdf53 | |
|
885319e9ae | |
|
b815b61707 |
|
@ -3,8 +3,8 @@
|
||||||
|gh_actions|
|
|gh_actions|
|
||||||
|docs|
|
|docs|
|
||||||
|
|
||||||
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
|
``tractor`` is a `structured concurrent`_, (optionally
|
||||||
built on trio_.
|
distributed_) multi-processing_ runtime built on trio_.
|
||||||
|
|
||||||
Fundamentally, ``tractor`` gives you parallelism via
|
Fundamentally, ``tractor`` gives you parallelism via
|
||||||
``trio``-"*actors*": independent Python processes (aka
|
``trio``-"*actors*": independent Python processes (aka
|
||||||
|
@ -17,11 +17,20 @@ protocol" constructed on top of multiple Pythons each running a ``trio``
|
||||||
scheduled runtime - a call to ``trio.run()``.
|
scheduled runtime - a call to ``trio.run()``.
|
||||||
|
|
||||||
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
||||||
but likely *does not* look like what *you* probably think an "actor
|
but likely **does not** look like what **you** probably *think* an "actor
|
||||||
model" looks like, and that's *intentional*.
|
model" looks like, and that's **intentional**.
|
||||||
|
|
||||||
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
|
|
||||||
A great place to start is the `trio docs`_ and this `blog post`_.
|
Where do i start!?
|
||||||
|
------------------
|
||||||
|
The first step to grok ``tractor`` is to get an intermediate
|
||||||
|
knowledge of ``trio`` and **structured concurrency** B)
|
||||||
|
|
||||||
|
Some great places to start are,
|
||||||
|
- the seminal `blog post`_
|
||||||
|
- obviously the `trio docs`_
|
||||||
|
- wikipedia's nascent SC_ page
|
||||||
|
- the fancy diagrams @ libdill-docs_
|
||||||
|
|
||||||
|
|
||||||
Features
|
Features
|
||||||
|
@ -593,6 +602,7 @@ matrix seems too hip, we're also mostly all in the the `trio gitter
|
||||||
channel`_!
|
channel`_!
|
||||||
|
|
||||||
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
||||||
|
.. _distributed: https://en.wikipedia.org/wiki/Distributed_computing
|
||||||
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
|
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
|
||||||
.. _trio: https://github.com/python-trio/trio
|
.. _trio: https://github.com/python-trio/trio
|
||||||
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
|
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
|
||||||
|
@ -611,8 +621,9 @@ channel`_!
|
||||||
.. _trio docs: https://trio.readthedocs.io/en/latest/
|
.. _trio docs: https://trio.readthedocs.io/en/latest/
|
||||||
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
|
||||||
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||||
|
.. _SC: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||||
|
.. _libdill-docs: https://sustrik.github.io/libdill/structured-concurrency.html
|
||||||
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
||||||
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
|
|
||||||
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
|
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
|
||||||
.. _async generators: https://www.python.org/dev/peps/pep-0525/
|
.. _async generators: https://www.python.org/dev/peps/pep-0525/
|
||||||
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
|
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
from tractor import to_asyncio
|
||||||
|
|
||||||
|
|
||||||
|
async def aio_sleep_forever():
|
||||||
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
||||||
|
async def bp_then_error(
|
||||||
|
to_trio: trio.MemorySendChannel,
|
||||||
|
from_trio: asyncio.Queue,
|
||||||
|
|
||||||
|
raise_after_bp: bool = True,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
# sync with ``trio``-side (caller) task
|
||||||
|
to_trio.send_nowait('start')
|
||||||
|
|
||||||
|
# NOTE: what happens here inside the hook needs some refinement..
|
||||||
|
# => seems like it's still `._debug._set_trace()` but
|
||||||
|
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
|
||||||
|
# some further, at least, meta-data about the task/actoq in debug
|
||||||
|
# in terms of making it clear it's asyncio mucking about.
|
||||||
|
breakpoint()
|
||||||
|
|
||||||
|
# short checkpoint / delay
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
if raise_after_bp:
|
||||||
|
raise ValueError('blah')
|
||||||
|
|
||||||
|
# TODO: test case with this so that it gets cancelled?
|
||||||
|
else:
|
||||||
|
# XXX NOTE: this is required in order to get the SIGINT-ignored
|
||||||
|
# hang case documented in the module script section!
|
||||||
|
await aio_sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def trio_ctx(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
bp_before_started: bool = False,
|
||||||
|
):
|
||||||
|
|
||||||
|
# this will block until the ``asyncio`` task sends a "first"
|
||||||
|
# message, see first line in above func.
|
||||||
|
async with (
|
||||||
|
|
||||||
|
to_asyncio.open_channel_from(
|
||||||
|
bp_then_error,
|
||||||
|
raise_after_bp=not bp_before_started,
|
||||||
|
) as (first, chan),
|
||||||
|
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
):
|
||||||
|
|
||||||
|
assert first == 'start'
|
||||||
|
|
||||||
|
if bp_before_started:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
await ctx.started(first)
|
||||||
|
|
||||||
|
n.start_soon(
|
||||||
|
to_asyncio.run_task,
|
||||||
|
aio_sleep_forever,
|
||||||
|
)
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def main(
|
||||||
|
bps_all_over: bool = False,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
p = await n.start_actor(
|
||||||
|
'aio_daemon',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
infect_asyncio=True,
|
||||||
|
debug_mode=True,
|
||||||
|
loglevel='cancel',
|
||||||
|
)
|
||||||
|
|
||||||
|
async with p.open_context(
|
||||||
|
trio_ctx,
|
||||||
|
bp_before_started=bps_all_over,
|
||||||
|
) as (ctx, first):
|
||||||
|
|
||||||
|
assert first == 'start'
|
||||||
|
|
||||||
|
if bps_all_over:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
await ctx.cancel()
|
||||||
|
assert 0
|
||||||
|
|
||||||
|
# TODO: case where we cancel from trio-side while asyncio task
|
||||||
|
# has debugger lock?
|
||||||
|
# await p.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
# works fine B)
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
# will hang and ignores SIGINT !!
|
||||||
|
# NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it
|
||||||
|
# manually..
|
||||||
|
# trio.run(main, True)
|
|
@ -21,7 +21,6 @@ tractor: structured concurrent ``trio``-"actors".
|
||||||
from exceptiongroup import BaseExceptionGroup
|
from exceptiongroup import BaseExceptionGroup
|
||||||
|
|
||||||
from ._clustering import open_actor_cluster
|
from ._clustering import open_actor_cluster
|
||||||
from ._ipc import Channel
|
|
||||||
from ._context import (
|
from ._context import (
|
||||||
Context, # the type
|
Context, # the type
|
||||||
context, # a func-decorator
|
context, # a func-decorator
|
||||||
|
@ -48,6 +47,8 @@ from ._exceptions import (
|
||||||
)
|
)
|
||||||
from ._debug import (
|
from ._debug import (
|
||||||
breakpoint,
|
breakpoint,
|
||||||
|
pause,
|
||||||
|
pause_from_sync,
|
||||||
post_mortem,
|
post_mortem,
|
||||||
)
|
)
|
||||||
from . import msg
|
from . import msg
|
||||||
|
@ -55,31 +56,35 @@ from ._root import (
|
||||||
run_daemon,
|
run_daemon,
|
||||||
open_root_actor,
|
open_root_actor,
|
||||||
)
|
)
|
||||||
|
from ._ipc import Channel
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'Actor',
|
'Actor',
|
||||||
|
'BaseExceptionGroup',
|
||||||
'Channel',
|
'Channel',
|
||||||
'Context',
|
'Context',
|
||||||
'ContextCancelled',
|
'ContextCancelled',
|
||||||
'ModuleNotExposed',
|
'ModuleNotExposed',
|
||||||
'MsgStream',
|
'MsgStream',
|
||||||
'BaseExceptionGroup',
|
|
||||||
'Portal',
|
'Portal',
|
||||||
'RemoteActorError',
|
'RemoteActorError',
|
||||||
'breakpoint',
|
'breakpoint',
|
||||||
'context',
|
'context',
|
||||||
'current_actor',
|
'current_actor',
|
||||||
'find_actor',
|
'find_actor',
|
||||||
|
'query_actor',
|
||||||
'get_arbiter',
|
'get_arbiter',
|
||||||
'is_root_process',
|
'is_root_process',
|
||||||
'msg',
|
'msg',
|
||||||
'open_actor_cluster',
|
'open_actor_cluster',
|
||||||
'open_nursery',
|
'open_nursery',
|
||||||
'open_root_actor',
|
'open_root_actor',
|
||||||
|
'pause',
|
||||||
'post_mortem',
|
'post_mortem',
|
||||||
|
'pause_from_sync',
|
||||||
'query_actor',
|
'query_actor',
|
||||||
'run_daemon',
|
'run_daemon',
|
||||||
'stream',
|
'stream',
|
||||||
|
|
|
@ -30,7 +30,6 @@ from functools import (
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Optional,
|
|
||||||
Callable,
|
Callable,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
|
@ -40,7 +39,10 @@ from types import FrameType
|
||||||
import pdbp
|
import pdbp
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import (
|
||||||
|
TaskStatus,
|
||||||
|
# Task,
|
||||||
|
)
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._discovery import get_root
|
from ._discovery import get_root
|
||||||
|
@ -69,10 +71,10 @@ class Lock:
|
||||||
'''
|
'''
|
||||||
repl: MultiActorPdb | None = None
|
repl: MultiActorPdb | None = None
|
||||||
# placeholder for function to set a ``trio.Event`` on debugger exit
|
# placeholder for function to set a ``trio.Event`` on debugger exit
|
||||||
# pdb_release_hook: Optional[Callable] = None
|
# pdb_release_hook: Callable | None = None
|
||||||
|
|
||||||
_trio_handler: Callable[
|
_trio_handler: Callable[
|
||||||
[int, Optional[FrameType]], Any
|
[int, FrameType | None], Any
|
||||||
] | int | None = None
|
] | int | None = None
|
||||||
|
|
||||||
# actor-wide variable pointing to current task name using debugger
|
# actor-wide variable pointing to current task name using debugger
|
||||||
|
@ -83,23 +85,23 @@ class Lock:
|
||||||
# and must be cancelled if this actor is cancelled via IPC
|
# and must be cancelled if this actor is cancelled via IPC
|
||||||
# request-message otherwise deadlocks with the parent actor may
|
# request-message otherwise deadlocks with the parent actor may
|
||||||
# ensure
|
# ensure
|
||||||
_debugger_request_cs: Optional[trio.CancelScope] = None
|
_debugger_request_cs: trio.CancelScope | None = None
|
||||||
|
|
||||||
# NOTE: set only in the root actor for the **local** root spawned task
|
# NOTE: set only in the root actor for the **local** root spawned task
|
||||||
# which has acquired the lock (i.e. this is on the callee side of
|
# which has acquired the lock (i.e. this is on the callee side of
|
||||||
# the `lock_tty_for_child()` context entry).
|
# the `lock_tty_for_child()` context entry).
|
||||||
_root_local_task_cs_in_debug: Optional[trio.CancelScope] = None
|
_root_local_task_cs_in_debug: trio.CancelScope | None = None
|
||||||
|
|
||||||
# actor tree-wide actor uid that supposedly has the tty lock
|
# actor tree-wide actor uid that supposedly has the tty lock
|
||||||
global_actor_in_debug: Optional[tuple[str, str]] = None
|
global_actor_in_debug: tuple[str, str] = None
|
||||||
|
|
||||||
local_pdb_complete: Optional[trio.Event] = None
|
local_pdb_complete: trio.Event | None = None
|
||||||
no_remote_has_tty: Optional[trio.Event] = None
|
no_remote_has_tty: trio.Event | None = None
|
||||||
|
|
||||||
# lock in root actor preventing multi-access to local tty
|
# lock in root actor preventing multi-access to local tty
|
||||||
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
_orig_sigint_handler: Optional[Callable] = None
|
_orig_sigint_handler: Callable | None = None
|
||||||
_blocked: set[tuple[str, str]] = set()
|
_blocked: set[tuple[str, str]] = set()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -110,6 +112,7 @@ class Lock:
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()`
|
||||||
def unshield_sigint(cls):
|
def unshield_sigint(cls):
|
||||||
# always restore ``trio``'s sigint handler. see notes below in
|
# always restore ``trio``'s sigint handler. see notes below in
|
||||||
# the pdb factory about the nightmare that is that code swapping
|
# the pdb factory about the nightmare that is that code swapping
|
||||||
|
@ -129,10 +132,6 @@ class Lock:
|
||||||
if owner:
|
if owner:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# actor-local state, irrelevant for non-root.
|
|
||||||
cls.global_actor_in_debug = None
|
|
||||||
cls.local_task_in_debug = None
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# sometimes the ``trio`` might already be terminated in
|
# sometimes the ``trio`` might already be terminated in
|
||||||
# which case this call will raise.
|
# which case this call will raise.
|
||||||
|
@ -143,6 +142,11 @@ class Lock:
|
||||||
cls.unshield_sigint()
|
cls.unshield_sigint()
|
||||||
cls.repl = None
|
cls.repl = None
|
||||||
|
|
||||||
|
# actor-local state, irrelevant for non-root.
|
||||||
|
cls.global_actor_in_debug = None
|
||||||
|
cls.local_task_in_debug = None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TractorConfig(pdbp.DefaultConfig):
|
class TractorConfig(pdbp.DefaultConfig):
|
||||||
'''
|
'''
|
||||||
|
@ -151,7 +155,7 @@ class TractorConfig(pdbp.DefaultConfig):
|
||||||
'''
|
'''
|
||||||
use_pygments: bool = True
|
use_pygments: bool = True
|
||||||
sticky_by_default: bool = False
|
sticky_by_default: bool = False
|
||||||
enable_hidden_frames: bool = False
|
enable_hidden_frames: bool = True
|
||||||
|
|
||||||
# much thanks @mdmintz for the hot tip!
|
# much thanks @mdmintz for the hot tip!
|
||||||
# fixes line spacing issue when resizing terminal B)
|
# fixes line spacing issue when resizing terminal B)
|
||||||
|
@ -228,26 +232,23 @@ async def _acquire_debug_lock_from_root_task(
|
||||||
to the ``pdb`` repl.
|
to the ``pdb`` repl.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name: str = trio.lowlevel.current_task().name
|
||||||
|
we_acquired: bool = False
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
|
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
|
||||||
)
|
)
|
||||||
|
|
||||||
we_acquired = False
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
||||||
)
|
)
|
||||||
we_acquired = True
|
|
||||||
|
|
||||||
# NOTE: if the surrounding cancel scope from the
|
# NOTE: if the surrounding cancel scope from the
|
||||||
# `lock_tty_for_child()` caller is cancelled, this line should
|
# `lock_tty_for_child()` caller is cancelled, this line should
|
||||||
# unblock and NOT leave us in some kind of
|
# unblock and NOT leave us in some kind of
|
||||||
# a "child-locked-TTY-but-child-is-uncontactable-over-IPC"
|
# a "child-locked-TTY-but-child-is-uncontactable-over-IPC"
|
||||||
# condition.
|
# condition.
|
||||||
await Lock._debug_lock.acquire()
|
await Lock._debug_lock.acquire()
|
||||||
|
we_acquired = True
|
||||||
|
|
||||||
if Lock.no_remote_has_tty is None:
|
if Lock.no_remote_has_tty is None:
|
||||||
# mark the tty lock as being in use so that the runtime
|
# mark the tty lock as being in use so that the runtime
|
||||||
|
@ -374,7 +375,7 @@ async def wait_for_parent_stdin_hijack(
|
||||||
|
|
||||||
This function is used by any sub-actor to acquire mutex access to
|
This function is used by any sub-actor to acquire mutex access to
|
||||||
the ``pdb`` REPL and thus the root's TTY for interactive debugging
|
the ``pdb`` REPL and thus the root's TTY for interactive debugging
|
||||||
(see below inside ``_breakpoint()``). It can be used to ensure that
|
(see below inside ``_pause()``). It can be used to ensure that
|
||||||
an intermediate nursery-owning actor does not clobber its children
|
an intermediate nursery-owning actor does not clobber its children
|
||||||
if they are in debug (see below inside
|
if they are in debug (see below inside
|
||||||
``maybe_wait_for_debugger()``).
|
``maybe_wait_for_debugger()``).
|
||||||
|
@ -440,17 +441,29 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
|
||||||
return pdb, Lock.unshield_sigint
|
return pdb, Lock.unshield_sigint
|
||||||
|
|
||||||
|
|
||||||
async def _breakpoint(
|
async def _pause(
|
||||||
|
|
||||||
debug_func,
|
debug_func: Callable | None = None,
|
||||||
|
release_lock_signal: trio.Event | None = None,
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# shield: bool = False
|
# shield: bool = False
|
||||||
|
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Breakpoint entry for engaging debugger instance sync-interaction,
|
A pause point (more commonly known as a "breakpoint") interrupt
|
||||||
from async code, executing in actor runtime (task).
|
instruction for engaging a blocking debugger instance to
|
||||||
|
conduct manual console-based-REPL-interaction from within
|
||||||
|
`tractor`'s async runtime, normally from some single-threaded
|
||||||
|
and currently executing actor-hosted-`trio`-task in some
|
||||||
|
(remote) process.
|
||||||
|
|
||||||
|
NOTE: we use the semantics "pause" since it better encompasses
|
||||||
|
the entirety of the necessary global-runtime-state-mutation any
|
||||||
|
actor-task must access and lock in order to get full isolated
|
||||||
|
control over the process tree's root TTY:
|
||||||
|
https://en.wikipedia.org/wiki/Breakpoint
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
|
@ -559,10 +572,23 @@ async def _breakpoint(
|
||||||
Lock.repl = pdb
|
Lock.repl = pdb
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# block here one (at the appropriate frame *up*) where
|
# breakpoint()
|
||||||
# ``breakpoint()`` was awaited and begin handling stdio.
|
if debug_func is None:
|
||||||
log.debug("Entering the synchronous world of pdb")
|
# assert release_lock_signal, (
|
||||||
debug_func(actor, pdb)
|
# 'Must pass `release_lock_signal: trio.Event` if no '
|
||||||
|
# 'trace func provided!'
|
||||||
|
# )
|
||||||
|
print(f"{actor.uid} ENTERING WAIT")
|
||||||
|
task_status.started()
|
||||||
|
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await release_lock_signal.wait()
|
||||||
|
|
||||||
|
else:
|
||||||
|
# block here one (at the appropriate frame *up*) where
|
||||||
|
# ``breakpoint()`` was awaited and begin handling stdio.
|
||||||
|
log.debug("Entering the synchronous world of pdb")
|
||||||
|
debug_func(actor, pdb)
|
||||||
|
|
||||||
except bdb.BdbQuit:
|
except bdb.BdbQuit:
|
||||||
Lock.release()
|
Lock.release()
|
||||||
|
@ -583,7 +609,7 @@ async def _breakpoint(
|
||||||
def shield_sigint_handler(
|
def shield_sigint_handler(
|
||||||
signum: int,
|
signum: int,
|
||||||
frame: 'frame', # type: ignore # noqa
|
frame: 'frame', # type: ignore # noqa
|
||||||
# pdb_obj: Optional[MultiActorPdb] = None,
|
# pdb_obj: MultiActorPdb | None = None,
|
||||||
*args,
|
*args,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -597,7 +623,7 @@ def shield_sigint_handler(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
|
|
||||||
uid_in_debug = Lock.global_actor_in_debug
|
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
|
||||||
|
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
# print(f'{actor.uid} in HANDLER with ')
|
# print(f'{actor.uid} in HANDLER with ')
|
||||||
|
@ -615,14 +641,14 @@ def shield_sigint_handler(
|
||||||
else:
|
else:
|
||||||
raise KeyboardInterrupt
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
any_connected = False
|
any_connected: bool = False
|
||||||
|
|
||||||
if uid_in_debug is not None:
|
if uid_in_debug is not None:
|
||||||
# try to see if the supposed (sub)actor in debug still
|
# try to see if the supposed (sub)actor in debug still
|
||||||
# has an active connection to *this* actor, and if not
|
# has an active connection to *this* actor, and if not
|
||||||
# it's likely they aren't using the TTY lock / debugger
|
# it's likely they aren't using the TTY lock / debugger
|
||||||
# and we should propagate SIGINT normally.
|
# and we should propagate SIGINT normally.
|
||||||
chans = actor._peers.get(tuple(uid_in_debug))
|
chans: list[tractor.Channel] = actor._peers.get(tuple(uid_in_debug))
|
||||||
if chans:
|
if chans:
|
||||||
any_connected = any(chan.connected() for chan in chans)
|
any_connected = any(chan.connected() for chan in chans)
|
||||||
if not any_connected:
|
if not any_connected:
|
||||||
|
@ -635,7 +661,7 @@ def shield_sigint_handler(
|
||||||
return do_cancel()
|
return do_cancel()
|
||||||
|
|
||||||
# only set in the actor actually running the REPL
|
# only set in the actor actually running the REPL
|
||||||
pdb_obj = Lock.repl
|
pdb_obj: MultiActorPdb | None = Lock.repl
|
||||||
|
|
||||||
# root actor branch that reports whether or not a child
|
# root actor branch that reports whether or not a child
|
||||||
# has locked debugger.
|
# has locked debugger.
|
||||||
|
@ -693,7 +719,7 @@ def shield_sigint_handler(
|
||||||
)
|
)
|
||||||
return do_cancel()
|
return do_cancel()
|
||||||
|
|
||||||
task = Lock.local_task_in_debug
|
task: str | None = Lock.local_task_in_debug
|
||||||
if (
|
if (
|
||||||
task
|
task
|
||||||
and pdb_obj
|
and pdb_obj
|
||||||
|
@ -708,8 +734,8 @@ def shield_sigint_handler(
|
||||||
# elif debug_mode():
|
# elif debug_mode():
|
||||||
|
|
||||||
else: # XXX: shouldn't ever get here?
|
else: # XXX: shouldn't ever get here?
|
||||||
print("WTFWTFWTF")
|
raise RuntimeError("WTFWTFWTF")
|
||||||
raise KeyboardInterrupt
|
# raise KeyboardInterrupt("WTFWTFWTF")
|
||||||
|
|
||||||
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
|
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
|
||||||
# it looks to be that the last command that was run (eg. ll)
|
# it looks to be that the last command that was run (eg. ll)
|
||||||
|
@ -737,21 +763,18 @@ def shield_sigint_handler(
|
||||||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||||
|
|
||||||
# XXX LEGACY: lol, see ``pdbpp`` issue:
|
|
||||||
# https://github.com/pdbpp/pdbpp/issues/496
|
|
||||||
|
|
||||||
|
|
||||||
def _set_trace(
|
def _set_trace(
|
||||||
actor: tractor.Actor | None = None,
|
actor: tractor.Actor | None = None,
|
||||||
pdb: MultiActorPdb | None = None,
|
pdb: MultiActorPdb | None = None,
|
||||||
):
|
):
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
actor = actor or tractor.current_actor()
|
actor: tractor.Actor = actor or tractor.current_actor()
|
||||||
|
|
||||||
# start 2 levels up in user code
|
# start 2 levels up in user code
|
||||||
frame: Optional[FrameType] = sys._getframe()
|
frame: FrameType | None = sys._getframe()
|
||||||
if frame:
|
if frame:
|
||||||
frame = frame.f_back # type: ignore
|
frame: FrameType = frame.f_back # type: ignore
|
||||||
|
|
||||||
if (
|
if (
|
||||||
frame
|
frame
|
||||||
|
@ -771,12 +794,76 @@ def _set_trace(
|
||||||
Lock.local_task_in_debug = 'sync'
|
Lock.local_task_in_debug = 'sync'
|
||||||
|
|
||||||
pdb.set_trace(frame=frame)
|
pdb.set_trace(frame=frame)
|
||||||
|
# undo_
|
||||||
|
|
||||||
|
|
||||||
breakpoint = partial(
|
# TODO: allow pausing from sync code, normally by remapping
|
||||||
_breakpoint,
|
# python's builtin breakpoint() hook to this runtime aware version.
|
||||||
|
def pause_from_sync() -> None:
|
||||||
|
print("ENTER SYNC PAUSE")
|
||||||
|
import greenback
|
||||||
|
__tracebackhide__ = True
|
||||||
|
|
||||||
|
actor: tractor.Actor = tractor.current_actor()
|
||||||
|
# task_can_release_tty_lock = trio.Event()
|
||||||
|
|
||||||
|
# spawn bg task which will lock out the TTY, we poll
|
||||||
|
# just below until the release event is reporting that task as
|
||||||
|
# waiting.. not the most ideal but works for now ;)
|
||||||
|
greenback.await_(
|
||||||
|
actor._service_n.start(partial(
|
||||||
|
_pause,
|
||||||
|
debug_func=None,
|
||||||
|
# release_lock_signal=task_can_release_tty_lock,
|
||||||
|
))
|
||||||
|
)
|
||||||
|
|
||||||
|
db, undo_sigint = mk_mpdb()
|
||||||
|
Lock.local_task_in_debug = 'sync'
|
||||||
|
# db.config.enable_hidden_frames = True
|
||||||
|
|
||||||
|
# we entered the global ``breakpoint()`` built-in from sync
|
||||||
|
# code?
|
||||||
|
frame: FrameType | None = sys._getframe()
|
||||||
|
# print(f'FRAME: {str(frame)}')
|
||||||
|
# assert not db._is_hidden(frame)
|
||||||
|
|
||||||
|
frame: FrameType = frame.f_back # type: ignore
|
||||||
|
# print(f'FRAME: {str(frame)}')
|
||||||
|
# if not db._is_hidden(frame):
|
||||||
|
# pdbp.set_trace()
|
||||||
|
# db._hidden_frames.append(
|
||||||
|
# (frame, frame.f_lineno)
|
||||||
|
# )
|
||||||
|
db.set_trace(frame=frame)
|
||||||
|
# NOTE XXX: see the `@pdbp.hideframe` decoration
|
||||||
|
# on `Lock.unshield_sigint()`.. I have NO CLUE why
|
||||||
|
# the next instruction's def frame is being shown
|
||||||
|
# in the tb but it seems to be something wonky with
|
||||||
|
# the way `pdb` core works?
|
||||||
|
# undo_sigint()
|
||||||
|
|
||||||
|
# Lock.global_actor_in_debug = actor.uid
|
||||||
|
# Lock.release()
|
||||||
|
# task_can_release_tty_lock.set()
|
||||||
|
|
||||||
|
|
||||||
|
# using the "pause" semantics instead since
|
||||||
|
# that better covers actually somewhat "pausing the runtime"
|
||||||
|
# for this particular paralell task to do debugging B)
|
||||||
|
pause = partial(
|
||||||
|
_pause,
|
||||||
_set_trace,
|
_set_trace,
|
||||||
)
|
)
|
||||||
|
pp = pause # short-hand for "pause point"
|
||||||
|
|
||||||
|
|
||||||
|
async def breakpoint(**kwargs):
|
||||||
|
log.warning(
|
||||||
|
'`tractor.breakpoint()` is deprecated!\n'
|
||||||
|
'Please use `tractor.pause()` instead!\n'
|
||||||
|
)
|
||||||
|
await pause(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
def _post_mortem(
|
def _post_mortem(
|
||||||
|
@ -801,7 +888,7 @@ def _post_mortem(
|
||||||
|
|
||||||
|
|
||||||
post_mortem = partial(
|
post_mortem = partial(
|
||||||
_breakpoint,
|
_pause,
|
||||||
_post_mortem,
|
_post_mortem,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -883,8 +970,7 @@ async def maybe_wait_for_debugger(
|
||||||
# will make the pdb repl unusable.
|
# will make the pdb repl unusable.
|
||||||
# Instead try to wait for pdb to be released before
|
# Instead try to wait for pdb to be released before
|
||||||
# tearing down.
|
# tearing down.
|
||||||
|
sub_in_debug: tuple[str, str] | None = None
|
||||||
sub_in_debug = None
|
|
||||||
|
|
||||||
for _ in range(poll_steps):
|
for _ in range(poll_steps):
|
||||||
|
|
||||||
|
@ -904,13 +990,15 @@ async def maybe_wait_for_debugger(
|
||||||
|
|
||||||
debug_complete = Lock.no_remote_has_tty
|
debug_complete = Lock.no_remote_has_tty
|
||||||
if (
|
if (
|
||||||
(debug_complete and
|
debug_complete
|
||||||
not debug_complete.is_set())
|
and sub_in_debug is not None
|
||||||
|
and not debug_complete.is_set()
|
||||||
):
|
):
|
||||||
log.debug(
|
log.pdb(
|
||||||
'Root has errored but pdb is in use by '
|
'Root has errored but pdb is in use by '
|
||||||
f'child {sub_in_debug}\n'
|
f'child {sub_in_debug}\n'
|
||||||
'Waiting on tty lock to release..')
|
'Waiting on tty lock to release..'
|
||||||
|
)
|
||||||
|
|
||||||
await debug_complete.wait()
|
await debug_complete.wait()
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ async def open_root_actor(
|
||||||
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
||||||
builtin_bp_handler = sys.breakpointhook
|
builtin_bp_handler = sys.breakpointhook
|
||||||
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
|
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
|
||||||
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
|
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.pause_from_sync'
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
# on our debugger lock state.
|
# on our debugger lock state.
|
||||||
|
@ -237,10 +237,10 @@ async def open_root_actor(
|
||||||
) as err:
|
) as err:
|
||||||
|
|
||||||
entered = await _debug._maybe_enter_pm(err)
|
entered = await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not entered
|
not entered
|
||||||
and not is_multi_cancelled(err)
|
and
|
||||||
|
not is_multi_cancelled(err)
|
||||||
):
|
):
|
||||||
logger.exception('Root actor crashed:\n')
|
logger.exception('Root actor crashed:\n')
|
||||||
|
|
||||||
|
|
|
@ -632,7 +632,7 @@ class Actor:
|
||||||
and not db_cs.cancel_called
|
and not db_cs.cancel_called
|
||||||
and uid == pdb_user_uid
|
and uid == pdb_user_uid
|
||||||
):
|
):
|
||||||
log.warning(
|
log.critical(
|
||||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||||
)
|
)
|
||||||
# TODO: figure out why this breaks tests..
|
# TODO: figure out why this breaks tests..
|
||||||
|
@ -1723,4 +1723,6 @@ class Arbiter(Actor):
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
uid = (str(uid[0]), str(uid[1]))
|
uid = (str(uid[0]), str(uid[1]))
|
||||||
self._registry.pop(uid)
|
entry: tuple = self._registry.pop(uid, None)
|
||||||
|
if entry is None:
|
||||||
|
log.warning(f'Request to de-register {uid} failed?')
|
||||||
|
|
|
@ -28,7 +28,6 @@ from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
Awaitable,
|
Awaitable,
|
||||||
Optional,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
@ -65,9 +64,9 @@ class LinkedTaskChannel(trio.abc.Channel):
|
||||||
_trio_exited: bool = False
|
_trio_exited: bool = False
|
||||||
|
|
||||||
# set after ``asyncio.create_task()``
|
# set after ``asyncio.create_task()``
|
||||||
_aio_task: Optional[asyncio.Task] = None
|
_aio_task: asyncio.Task | None = None
|
||||||
_aio_err: Optional[BaseException] = None
|
_aio_err: BaseException | None = None
|
||||||
_broadcaster: Optional[BroadcastReceiver] = None
|
_broadcaster: BroadcastReceiver | None = None
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
await self._from_aio.aclose()
|
await self._from_aio.aclose()
|
||||||
|
@ -188,7 +187,7 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
cancel_scope = trio.CancelScope()
|
cancel_scope = trio.CancelScope()
|
||||||
aio_task_complete = trio.Event()
|
aio_task_complete = trio.Event()
|
||||||
aio_err: Optional[BaseException] = None
|
aio_err: BaseException | None = None
|
||||||
|
|
||||||
chan = LinkedTaskChannel(
|
chan = LinkedTaskChannel(
|
||||||
aio_q, # asyncio.Queue
|
aio_q, # asyncio.Queue
|
||||||
|
@ -270,7 +269,7 @@ def _run_asyncio_task(
|
||||||
'''
|
'''
|
||||||
nonlocal chan
|
nonlocal chan
|
||||||
aio_err = chan._aio_err
|
aio_err = chan._aio_err
|
||||||
task_err: Optional[BaseException] = None
|
task_err: BaseException | None = None
|
||||||
|
|
||||||
# only to avoid ``asyncio`` complaining about uncaptured
|
# only to avoid ``asyncio`` complaining about uncaptured
|
||||||
# task exceptions
|
# task exceptions
|
||||||
|
@ -350,11 +349,11 @@ async def translate_aio_errors(
|
||||||
'''
|
'''
|
||||||
trio_task = trio.lowlevel.current_task()
|
trio_task = trio.lowlevel.current_task()
|
||||||
|
|
||||||
aio_err: Optional[BaseException] = None
|
aio_err: BaseException | None = None
|
||||||
|
|
||||||
# TODO: make thisi a channel method?
|
# TODO: make thisi a channel method?
|
||||||
def maybe_raise_aio_err(
|
def maybe_raise_aio_err(
|
||||||
err: Optional[Exception] = None
|
err: Exception | None = None
|
||||||
) -> None:
|
) -> None:
|
||||||
aio_err = chan._aio_err
|
aio_err = chan._aio_err
|
||||||
if (
|
if (
|
||||||
|
|
Loading…
Reference in New Issue