Merge pull request #372 from goodboy/devx_subpkg

Start ` tractor.devx` sub-pkg
sc_super_proto_dgrams
goodboy 2025-03-20 19:48:42 -04:00 committed by GitHub
commit 3ccbfd7e54
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1266 additions and 400 deletions

View File

@ -3,8 +3,8 @@
|gh_actions| |gh_actions|
|docs| |docs|
``tractor`` is a `structured concurrent`_, multi-processing_ runtime ``tractor`` is a `structured concurrent`_, (optionally
built on trio_. distributed_) multi-processing_ runtime built on trio_.
Fundamentally, ``tractor`` gives you parallelism via Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka ``trio``-"*actors*": independent Python processes (aka
@ -17,11 +17,20 @@ protocol" constructed on top of multiple Pythons each running a ``trio``
scheduled runtime - a call to ``trio.run()``. scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_" We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
but likely *does not* look like what *you* probably think an "actor but likely **does not** look like what **you** probably *think* an "actor
model" looks like, and that's *intentional*. model" looks like, and that's **intentional**.
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
A great place to start is the `trio docs`_ and this `blog post`_. Where do i start!?
------------------
The first step to grok ``tractor`` is to get an intermediate
knowledge of ``trio`` and **structured concurrency** B)
Some great places to start are,
- the seminal `blog post`_
- obviously the `trio docs`_
- wikipedia's nascent SC_ page
- the fancy diagrams @ libdill-docs_
Features Features
@ -593,6 +602,7 @@ matrix seems too hip, we're also mostly all in the the `trio gitter
channel`_! channel`_!
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _distributed: https://en.wikipedia.org/wiki/Distributed_computing
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing .. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trio: https://github.com/python-trio/trio .. _trio: https://github.com/python-trio/trio
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
@ -611,8 +621,9 @@ channel`_!
.. _trio docs: https://trio.readthedocs.io/en/latest/ .. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _SC: https://en.wikipedia.org/wiki/Structured_concurrency
.. _libdill-docs: https://sustrik.github.io/libdill/structured-concurrency.html
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel .. _trio-parallel: https://github.com/richardsheridan/trio-parallel

View File

@ -0,0 +1,117 @@
import asyncio
import trio
import tractor
from tractor import to_asyncio
async def aio_sleep_forever():
await asyncio.sleep(float('inf'))
async def bp_then_error(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
raise_after_bp: bool = True,
) -> None:
# sync with ``trio``-side (caller) task
to_trio.send_nowait('start')
# NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actoq in debug
# in terms of making it clear it's asyncio mucking about.
breakpoint()
# short checkpoint / delay
await asyncio.sleep(0.5)
if raise_after_bp:
raise ValueError('blah')
# TODO: test case with this so that it gets cancelled?
else:
# XXX NOTE: this is required in order to get the SIGINT-ignored
# hang case documented in the module script section!
await aio_sleep_forever()
@tractor.context
async def trio_ctx(
ctx: tractor.Context,
bp_before_started: bool = False,
):
# this will block until the ``asyncio`` task sends a "first"
# message, see first line in above func.
async with (
to_asyncio.open_channel_from(
bp_then_error,
raise_after_bp=not bp_before_started,
) as (first, chan),
trio.open_nursery() as n,
):
assert first == 'start'
if bp_before_started:
await tractor.breakpoint()
await ctx.started(first)
n.start_soon(
to_asyncio.run_task,
aio_sleep_forever,
)
await trio.sleep_forever()
async def main(
bps_all_over: bool = False,
) -> None:
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
)
async with p.open_context(
trio_ctx,
bp_before_started=bps_all_over,
) as (ctx, first):
assert first == 'start'
if bps_all_over:
await tractor.breakpoint()
# await trio.sleep_forever()
await ctx.cancel()
assert 0
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
# await p.cancel_actor()
if __name__ == '__main__':
# works fine B)
trio.run(main)
# will hang and ignores SIGINT !!
# NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it
# manually..
# trio.run(main, True)

View File

@ -26,7 +26,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup( setup(
name="tractor", name="tractor",
version='0.1.0a6dev0', # alpha zone version='0.1.0a6dev0', # alpha zone
description='structured concurrrent `trio`-"actors"', description='structured concurrent `trio`-"actors"',
long_description=readme, long_description=readme,
license='AGPLv3', license='AGPLv3',
author='Tyler Goodlet', author='Tyler Goodlet',
@ -40,6 +40,7 @@ setup(
'tractor.trionics', # trio extensions 'tractor.trionics', # trio extensions
'tractor.msg', # lowlevel data types 'tractor.msg', # lowlevel data types
'tractor._testing', # internal cross-subsys suite utils 'tractor._testing', # internal cross-subsys suite utils
'tractor.devx', # "dev-experience"
], ],
install_requires=[ install_requires=[
@ -53,6 +54,7 @@ setup(
# 'exceptiongroup', # in stdlib as of 3.11! # 'exceptiongroup', # in stdlib as of 3.11!
# tooling # tooling
'stackscope',
'tricycle', 'tricycle',
'trio_typing', 'trio_typing',
'colorlog', 'colorlog',
@ -64,16 +66,15 @@ setup(
# debug mode REPL # debug mode REPL
'pdbp', 'pdbp',
# TODO: distributed transport using
# linux kernel networking
# 'pyroute2',
# pip ref docs on these specs: # pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep: # and pep:
# https://peps.python.org/pep-0440/#version-specifiers # https://peps.python.org/pep-0440/#version-specifiers
# windows deps workaround for ``pdbpp``
# https://github.com/pdbpp/pdbpp/issues/498
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.11", python_requires=">=3.11",

View File

@ -245,10 +245,10 @@ def test_simple_context(
trio.run(main) trio.run(main)
except error_parent: except error_parent:
pass pass
except trio.MultiError as me: except BaseExceptionGroup as beg:
# XXX: on windows it seems we may have to expect the group error # XXX: on windows it seems we may have to expect the group error
from tractor._exceptions import is_multi_cancelled from tractor._exceptions import is_multi_cancelled
assert is_multi_cancelled(me) assert is_multi_cancelled(beg)
else: else:
trio.run(main) trio.run(main)

View File

@ -10,6 +10,7 @@ TODO:
- wonder if any of it'll work on OS X? - wonder if any of it'll work on OS X?
""" """
from functools import partial
import itertools import itertools
from typing import Optional from typing import Optional
import platform import platform
@ -26,6 +27,10 @@ from pexpect.exceptions import (
from tractor._testing import ( from tractor._testing import (
examples_dir, examples_dir,
) )
from tractor.devx._debug import (
_pause_msg,
_crash_msg,
)
from .conftest import ( from .conftest import (
_ci_env, _ci_env,
) )
@ -123,20 +128,52 @@ def expect(
raise raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
def assert_before( def assert_before(
child, child,
patts: list[str], patts: list[str],
**kwargs,
) -> None: ) -> None:
before = str(child.before.decode()) # as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
for patt in patts: **kwargs
try: )
assert patt in before
except AssertionError:
print(before)
raise
@pytest.fixture( @pytest.fixture(
@ -195,7 +232,10 @@ def test_root_actor_error(spawn, user_in_out):
before = str(child.before.decode()) before = str(child.before.decode())
# make sure expected logging and error arrives # make sure expected logging and error arrives
assert "Attaching to pdb in crashed actor: ('root'" in before assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
assert 'AssertionError' in before assert 'AssertionError' in before
# send user command # send user command
@ -332,7 +372,10 @@ def test_subactor_error(
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
if do_next: if do_next:
child.sendline('n') child.sendline('n')
@ -353,9 +396,15 @@ def test_subactor_error(
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('root'" in before assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
# error is a remote error propagated from the subactor # error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
# another round # another round
if ctlc: if ctlc:
@ -380,7 +429,10 @@ def test_subactor_breakpoint(
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
# entries # entries
@ -396,7 +448,10 @@ def test_subactor_breakpoint(
child.sendline('continue') child.sendline('continue')
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -441,7 +496,10 @@ def test_multi_subactors(
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -461,7 +519,10 @@ def test_multi_subactors(
# first name_error failure # first name_error failure
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
assert "NameError" in before assert "NameError" in before
if ctlc: if ctlc:
@ -487,7 +548,10 @@ def test_multi_subactors(
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -527,17 +591,21 @@ def test_multi_subactors(
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
assert_before(child, [ assert_before(
# debugger attaches to root child, [
"Attaching to pdb in crashed actor: ('root'", # debugger attaches to root
# "Attaching to pdb in crashed actor: ('root'",
_crash_msg,
"('root'",
# expect a multierror with exceptions for each sub-actor # expect a multierror with exceptions for each sub-actor
"RemoteActorError: ('breakpoint_forever'", "RemoteActorError: ('breakpoint_forever'",
"RemoteActorError: ('name_error'", "RemoteActorError: ('name_error'",
"RemoteActorError: ('spawn_error'", "RemoteActorError: ('spawn_error'",
"RemoteActorError: ('name_error_1'", "RemoteActorError: ('name_error_1'",
'bdb.BdbQuit', 'bdb.BdbQuit',
]) ]
)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -574,15 +642,22 @@ def test_multi_daemon_subactors(
# the root's tty lock first so anticipate either crash # the root's tty lock first so anticipate either crash
# message on the first entry. # message on the first entry.
bp_forever_msg = "Attaching pdb to actor: ('bp_forever'" bp_forev_parts = [_pause_msg, "('bp_forever'"]
bp_forev_in_msg = partial(
in_prompt_msg,
parts=bp_forev_parts,
)
name_error_msg = "NameError: name 'doggypants' is not defined" name_error_msg = "NameError: name 'doggypants' is not defined"
name_error_parts = [name_error_msg]
before = str(child.before.decode()) before = str(child.before.decode())
if bp_forever_msg in before:
next_msg = name_error_msg if bp_forev_in_msg(prompt=before):
next_parts = name_error_parts
elif name_error_msg in before: elif name_error_msg in before:
next_msg = bp_forever_msg next_parts = bp_forev_parts
else: else:
raise ValueError("Neither log msg was found !?") raise ValueError("Neither log msg was found !?")
@ -599,7 +674,10 @@ def test_multi_daemon_subactors(
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(PROMPT)
assert_before(child, [next_msg]) assert_before(
child,
next_parts,
)
# XXX: hooray the root clobbering the child here was fixed! # XXX: hooray the root clobbering the child here was fixed!
# IMO, this demonstrates the true power of SC system design. # IMO, this demonstrates the true power of SC system design.
@ -623,9 +701,15 @@ def test_multi_daemon_subactors(
child.expect(PROMPT) child.expect(PROMPT)
try: try:
assert_before(child, [bp_forever_msg]) assert_before(
child,
bp_forev_parts,
)
except AssertionError: except AssertionError:
assert_before(child, [name_error_msg]) assert_before(
child,
name_error_parts,
)
else: else:
if ctlc: if ctlc:
@ -637,7 +721,10 @@ def test_multi_daemon_subactors(
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(PROMPT)
assert_before(child, [name_error_msg]) assert_before(
child,
name_error_parts,
)
# wait for final error in root # wait for final error in root
# where it crashs with boxed error # where it crashs with boxed error
@ -647,7 +734,7 @@ def test_multi_daemon_subactors(
child.expect(PROMPT) child.expect(PROMPT)
assert_before( assert_before(
child, child,
[bp_forever_msg] bp_forev_parts
) )
except AssertionError: except AssertionError:
break break
@ -656,7 +743,9 @@ def test_multi_daemon_subactors(
child, child,
[ [
# boxed error raised in root task # boxed error raised in root task
"Attaching to pdb in crashed actor: ('root'", # "Attaching to pdb in crashed actor: ('root'",
_crash_msg,
"('root'",
"_exceptions.RemoteActorError: ('name_error'", "_exceptions.RemoteActorError: ('name_error'",
] ]
) )
@ -770,7 +859,7 @@ def test_multi_nested_subactors_error_through_nurseries(
child = spawn('multi_nested_subactors_error_up_through_nurseries') child = spawn('multi_nested_subactors_error_up_through_nurseries')
timed_out_early: bool = False # timed_out_early: bool = False
for send_char in itertools.cycle(['c', 'q']): for send_char in itertools.cycle(['c', 'q']):
try: try:
@ -871,11 +960,14 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
if not timed_out_early: if not timed_out_early:
before = str(child.before.decode()) before = str(child.before.decode())
assert_before(child, [ assert_before(
"tractor._exceptions.RemoteActorError: ('spawner0'", child,
"tractor._exceptions.RemoteActorError: ('name_error'", [
"NameError: name 'doggypants' is not defined", "tractor._exceptions.RemoteActorError: ('spawner0'",
]) "tractor._exceptions.RemoteActorError: ('name_error'",
"NameError: name 'doggypants' is not defined",
],
)
def test_root_cancels_child_context_during_startup( def test_root_cancels_child_context_during_startup(
@ -909,8 +1001,10 @@ def test_different_debug_mode_per_actor(
# only one actor should enter the debugger # only one actor should enter the debugger
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before assert in_prompt_msg(
assert "RuntimeError" in before before,
[_crash_msg, "('debugged_boi'", "RuntimeError"],
)
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)

View File

@ -38,10 +38,13 @@ async def async_gen_stream(sequence):
assert cs.cancelled_caught assert cs.cancelled_caught
# TODO: deprecated either remove entirely
# or re-impl in terms of `MsgStream` one-sides
# wrapper, but at least remove `Portal.open_stream_from()`
@tractor.stream @tractor.stream
async def context_stream( async def context_stream(
ctx: tractor.Context, ctx: tractor.Context,
sequence sequence: list[int],
): ):
for i in sequence: for i in sequence:
await ctx.send_yield(i) await ctx.send_yield(i)

View File

@ -19,7 +19,6 @@ tractor: structured concurrent ``trio``-"actors".
""" """
from ._clustering import open_actor_cluster from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._context import ( from ._context import (
Context, # the type Context, # the type
context, # a func-decorator context, # a func-decorator
@ -44,8 +43,10 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
ContextCancelled, ContextCancelled,
) )
from ._debug import ( from .devx import (
breakpoint, breakpoint,
pause,
pause_from_sync,
post_mortem, post_mortem,
) )
from . import msg from . import msg
@ -53,31 +54,35 @@ from ._root import (
run_daemon, run_daemon,
open_root_actor, open_root_actor,
) )
from ._ipc import Channel
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
__all__ = [ __all__ = [
'Actor', 'Actor',
'BaseExceptionGroup',
'Channel', 'Channel',
'Context', 'Context',
'ContextCancelled', 'ContextCancelled',
'ModuleNotExposed', 'ModuleNotExposed',
'MsgStream', 'MsgStream',
'BaseExceptionGroup',
'Portal', 'Portal',
'RemoteActorError', 'RemoteActorError',
'breakpoint', 'breakpoint',
'context', 'context',
'current_actor', 'current_actor',
'find_actor', 'find_actor',
'query_actor',
'get_arbiter', 'get_arbiter',
'is_root_process', 'is_root_process',
'msg', 'msg',
'open_actor_cluster', 'open_actor_cluster',
'open_nursery', 'open_nursery',
'open_root_actor', 'open_root_actor',
'pause',
'post_mortem', 'post_mortem',
'pause_from_sync',
'query_actor', 'query_actor',
'run_daemon', 'run_daemon',
'stream', 'stream',

View File

@ -313,7 +313,7 @@ async def _drain_to_final_msg(
log.critical('SHOULD NEVER GET HERE!?') log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata assert error.msgdata == ctx._remote_error.msgdata
from ._debug import pause from .devx._debug import pause
await pause() await pause()
ctx._maybe_cancel_and_set_remote_error(error) ctx._maybe_cancel_and_set_remote_error(error)
ctx._maybe_raise_remote_err(error) ctx._maybe_raise_remote_err(error)
@ -868,6 +868,9 @@ class Context:
# TODO: maybe we should also call `._res_scope.cancel()` if it # TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs? # exists to support cancelling any drain loop hangs?
# NOTE: this usage actually works here B)
# from .devx._debug import breakpoint
# await breakpoint()
# TODO: add to `Channel`? # TODO: add to `Channel`?
@property @property
@ -2199,7 +2202,7 @@ async def open_context_from_portal(
# pass # pass
# TODO: factor ^ into below for non-root cases? # TODO: factor ^ into below for non-root cases?
# #
from ._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
was_acquired: bool = await maybe_wait_for_debugger( was_acquired: bool = await maybe_wait_for_debugger(
# header_msg=( # header_msg=(
# 'Delaying `ctx.cancel()` until debug lock ' # 'Delaying `ctx.cancel()` until debug lock '
@ -2310,7 +2313,7 @@ async def open_context_from_portal(
# where the root is waiting on the lock to clear but the # where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC. # child has already cleared it and clobbered IPC.
if debug_mode(): if debug_mode():
from ._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
# though it should be impossible for any tasks # though it should be impossible for any tasks

View File

@ -38,7 +38,7 @@ from ._runtime import (
# Arbiter as Registry, # Arbiter as Registry,
async_main, async_main,
) )
from . import _debug from .devx import _debug
from . import _spawn from . import _spawn
from . import _state from . import _state
from . import log from . import log
@ -90,7 +90,7 @@ async def open_root_actor(
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' os.environ['PYTHONBREAKPOINT'] = 'tractor.devx._debug.pause_from_sync'
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
@ -131,14 +131,20 @@ async def open_root_actor(
) )
) )
loglevel = (loglevel or log._default_loglevel).upper() loglevel = (
loglevel
or log._default_loglevel
).upper()
if debug_mode and _spawn._spawn_method == 'trio': if (
debug_mode
and _spawn._spawn_method == 'trio'
):
_state._runtime_vars['_debug_mode'] = True _state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing # expose internal debug module to every actor allowing for
# for use of ``await tractor.breakpoint()`` # use of ``await tractor.pause()``
enable_modules.append('tractor._debug') enable_modules.append('tractor.devx._debug')
# if debug mode get's enabled *at least* use that level of # if debug mode get's enabled *at least* use that level of
# logging for some informative console prompts. # logging for some informative console prompts.
@ -156,7 +162,20 @@ async def open_root_actor(
"Debug mode is only supported for the `trio` backend!" "Debug mode is only supported for the `trio` backend!"
) )
log.get_console_log(loglevel) assert loglevel
_log = log.get_console_log(loglevel)
assert _log
# TODO: factor this into `.devx._stackscope`!!
if debug_mode:
try:
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
try: try:
# make a temporary connection to see if an arbiter exists, # make a temporary connection to see if an arbiter exists,
@ -237,10 +256,10 @@ async def open_root_actor(
) as err: ) as err:
entered = await _debug._maybe_enter_pm(err) entered = await _debug._maybe_enter_pm(err)
if ( if (
not entered not entered
and not is_multi_cancelled(err) and
not is_multi_cancelled(err)
): ):
logger.exception('Root actor crashed:\n') logger.exception('Root actor crashed:\n')

View File

@ -55,7 +55,7 @@ from ._exceptions import (
unpack_error, unpack_error,
TransportClosed, TransportClosed,
) )
from . import _debug from .devx import _debug
from . import _state from . import _state
from .log import get_logger from .log import get_logger

View File

@ -78,7 +78,7 @@ from ._exceptions import (
TransportClosed, TransportClosed,
) )
from ._discovery import get_arbiter from ._discovery import get_arbiter
from . import _debug from .devx import _debug
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
from . import _mp_fixup_main from . import _mp_fixup_main
@ -187,7 +187,7 @@ class Actor:
self._parent_main_data = _mp_fixup_main._mp_figure_out_main() self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
# always include debugging tools module # always include debugging tools module
enable_modules.append('tractor._debug') enable_modules.append('tractor.devx._debug')
mods = {} mods = {}
for name in enable_modules: for name in enable_modules:
@ -632,7 +632,7 @@ class Actor:
and not db_cs.cancel_called and not db_cs.cancel_called
and uid == pdb_user_uid and uid == pdb_user_uid
): ):
log.warning( log.critical(
f'STALE DEBUG LOCK DETECTED FOR {uid}' f'STALE DEBUG LOCK DETECTED FOR {uid}'
) )
# TODO: figure out why this breaks tests.. # TODO: figure out why this breaks tests..
@ -1722,4 +1722,6 @@ class Arbiter(Actor):
) -> None: ) -> None:
uid = (str(uid[0]), str(uid[1])) uid = (str(uid[0]), str(uid[1]))
self._registry.pop(uid) entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(f'Request to de-register {uid} failed?')

View File

@ -34,7 +34,7 @@ from typing import (
import trio import trio
from trio import TaskStatus from trio import TaskStatus
from ._debug import ( from .devx._debug import (
maybe_wait_for_debugger, maybe_wait_for_debugger,
acquire_debug_lock, acquire_debug_lock,
) )
@ -551,13 +551,14 @@ async def trio_proc(
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
log.pdb(
'Delaying subproc reaper while debugger locked..'
)
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False '_debug_mode', False
), ),
header_msg=(
'Delaying subproc reaper while debugger locked..\n'
),
# TODO: need a diff value then default? # TODO: need a diff value then default?
# poll_steps=9999999, # poll_steps=9999999,
) )

View File

@ -31,7 +31,7 @@ import warnings
import trio import trio
from ._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._runtime import Actor from ._runtime import Actor

View File

@ -0,0 +1,37 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Runtime "developer experience" utils and addons to aid our
(advanced) users and core devs in building distributed applications
and working with/on the actor runtime.
"""
from ._debug import (
maybe_wait_for_debugger as maybe_wait_for_debugger,
acquire_debug_lock as acquire_debug_lock,
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler,
MultiActorPdb as MultiActorPdb,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem,
)
from ._stackscope import (
enable_stack_on_sig as enable_stack_on_sig,
)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,84 @@
# tractor: structured concurrent "actors".
# Copyright eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The fundamental cross process SC abstraction: an inter-actor,
cancel-scope linked task "context".
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from signal import (
signal,
SIGUSR1,
)
import trio
@trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None:
import stackscope
from tractor.log import get_console_log
tree_str: str = str(
stackscope.extract(
trio.lowlevel.current_root_task(),
recurse_child_tasks=True
)
)
log = get_console_log('cancel')
log.pdb(
f'Dumping `stackscope` tree:\n\n'
f'{tree_str}\n'
)
# import logging
# try:
# with open("/dev/tty", "w") as tty:
# tty.write(tree_str)
# except BaseException:
# logging.getLogger(
# "task_tree"
# ).exception("Error printing task tree")
def signal_handler(sig: int, frame: object) -> None:
import traceback
try:
trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree)
except RuntimeError:
# not in async context -- print a normal traceback
traceback.print_stack()
def enable_stack_on_sig(
sig: int = SIGUSR1
) -> None:
'''
Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1.
'''
signal(
sig,
signal_handler,
)
# NOTE: not the above can be triggered from
# a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>')

129
tractor/devx/cli.py 100644
View File

@ -0,0 +1,129 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
CLI framework extensions for hacking on the actor runtime.
Currently popular frameworks supported are:
- `typer` via the `@callback` API
"""
from __future__ import annotations
from typing import (
Any,
Callable,
)
from typing_extensions import Annotated
import typer
_runtime_vars: dict[str, Any] = {}
def load_runtime_vars(
ctx: typer.Context,
callback: Callable,
pdb: bool = False, # --pdb
ll: Annotated[
str,
typer.Option(
'--loglevel',
'-l',
help='BigD logging level',
),
] = 'cancel', # -l info
):
'''
Maybe engage crash handling with `pdbp` when code inside
a `typer` CLI endpoint cmd raises.
To use this callback simply take your `app = typer.Typer()` instance
and decorate this function with it like so:
.. code:: python
from tractor.devx import cli
app = typer.Typer()
# manual decoration to hook into `click`'s context system!
cli.load_runtime_vars = app.callback(
invoke_without_command=True,
)
And then you can use the now augmented `click` CLI context as so,
.. code:: python
@app.command(
context_settings={
"allow_extra_args": True,
"ignore_unknown_options": True,
}
)
def my_cli_cmd(
ctx: typer.Context,
):
rtvars: dict = ctx.runtime_vars
pdb: bool = rtvars['pdb']
with tractor.devx.cli.maybe_open_crash_handler(pdb=pdb):
trio.run(
partial(
my_tractor_main_task_func,
debug_mode=pdb,
loglevel=rtvars['ll'],
)
)
which will enable log level and debug mode globally for the entire
`tractor` + `trio` runtime thereafter!
Bo
'''
global _runtime_vars
_runtime_vars |= {
'pdb': pdb,
'll': ll,
}
ctx.runtime_vars: dict[str, Any] = _runtime_vars
print(
f'`typer` sub-cmd: {ctx.invoked_subcommand}\n'
f'`tractor` runtime vars: {_runtime_vars}'
)
# XXX NOTE XXX: hackzone.. if no sub-cmd is specified (the
# default if the user just invokes `bigd`) then we simply
# invoke the sole `_bigd()` cmd passing in the "parent"
# typer.Context directly to that call since we're treating it
# as a "non sub-command" or wtv..
# TODO: ideally typer would have some kinda built-in way to get
# this behaviour without having to construct and manually
# invoke our own cmd..
if (
ctx.invoked_subcommand is None
or ctx.invoked_subcommand == callback.__name__
):
cmd: typer.core.TyperCommand = typer.core.TyperCommand(
name='bigd',
callback=callback,
)
ctx.params = {'ctx': ctx}
cmd.invoke(ctx)

View File

@ -28,7 +28,6 @@ from typing import (
Callable, Callable,
AsyncIterator, AsyncIterator,
Awaitable, Awaitable,
Optional,
) )
import trio import trio
@ -65,9 +64,9 @@ class LinkedTaskChannel(trio.abc.Channel):
_trio_exited: bool = False _trio_exited: bool = False
# set after ``asyncio.create_task()`` # set after ``asyncio.create_task()``
_aio_task: Optional[asyncio.Task] = None _aio_task: asyncio.Task | None = None
_aio_err: Optional[BaseException] = None _aio_err: BaseException | None = None
_broadcaster: Optional[BroadcastReceiver] = None _broadcaster: BroadcastReceiver | None = None
async def aclose(self) -> None: async def aclose(self) -> None:
await self._from_aio.aclose() await self._from_aio.aclose()
@ -188,7 +187,7 @@ def _run_asyncio_task(
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
aio_task_complete = trio.Event() aio_task_complete = trio.Event()
aio_err: Optional[BaseException] = None aio_err: BaseException | None = None
chan = LinkedTaskChannel( chan = LinkedTaskChannel(
aio_q, # asyncio.Queue aio_q, # asyncio.Queue
@ -270,7 +269,7 @@ def _run_asyncio_task(
''' '''
nonlocal chan nonlocal chan
aio_err = chan._aio_err aio_err = chan._aio_err
task_err: Optional[BaseException] = None task_err: BaseException | None = None
# only to avoid ``asyncio`` complaining about uncaptured # only to avoid ``asyncio`` complaining about uncaptured
# task exceptions # task exceptions
@ -350,11 +349,11 @@ async def translate_aio_errors(
''' '''
trio_task = trio.lowlevel.current_task() trio_task = trio.lowlevel.current_task()
aio_err: Optional[BaseException] = None aio_err: BaseException | None = None
# TODO: make thisi a channel method? # TODO: make thisi a channel method?
def maybe_raise_aio_err( def maybe_raise_aio_err(
err: Optional[Exception] = None err: Exception | None = None
) -> None: ) -> None:
aio_err = chan._aio_err aio_err = chan._aio_err
if ( if (