Prevent asyncio from abandoning guest-runs, .pause_from_sync() support via .to_asyncio #2

Merged
goodboy merged 66 commits from aio_abandons into main 2025-03-27 17:37:57 +00:00
43 changed files with 4503 additions and 1103 deletions

View File

@ -1,8 +1,16 @@
'''

This is a pretty important step forward for the debugger REPL tooling since now you can definitely get multi-actor safe pausing from infected-asyncio actors including crash handling B)

This is a pretty important step forward for the debugger REPL tooling since now you can definitely get multi-actor safe pausing from infected-`asyncio` actors including crash handling B)
Examples of using the builtin `breakpoint()` from an `asyncio.Task`
running in a subactor spawned with `infect_asyncio=True`.
'''
import asyncio import asyncio
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import (
to_asyncio,
Portal,
)
async def aio_sleep_forever(): async def aio_sleep_forever():
@ -17,21 +25,21 @@ async def bp_then_error(
) -> None: ) -> None:
# sync with ``trio``-side (caller) task # sync with `trio`-side (caller) task
to_trio.send_nowait('start') to_trio.send_nowait('start')
# NOTE: what happens here inside the hook needs some refinement.. # NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but # => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want # we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actoq in debug # some further, at least, meta-data about the task/actor in debug
# in terms of making it clear it's asyncio mucking about. # in terms of making it clear it's `asyncio` mucking about.
breakpoint() breakpoint() # asyncio-side
# short checkpoint / delay # short checkpoint / delay
await asyncio.sleep(0.5) await asyncio.sleep(0.5) # asyncio-side
if raise_after_bp: if raise_after_bp:
raise ValueError('blah') raise ValueError('asyncio side error!')
# TODO: test case with this so that it gets cancelled? # TODO: test case with this so that it gets cancelled?
else: else:
@ -49,23 +57,21 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first" # this will block until the ``asyncio`` task sends a "first"
# message, see first line in above func. # message, see first line in above func.
async with ( async with (
to_asyncio.open_channel_from( to_asyncio.open_channel_from(
bp_then_error, bp_then_error,
raise_after_bp=not bp_before_started, # raise_after_bp=not bp_before_started,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as n, trio.open_nursery() as tn,
): ):
assert first == 'start' assert first == 'start'
if bp_before_started: if bp_before_started:
await tractor.breakpoint() await tractor.pause() # trio-side
await ctx.started(first) await ctx.started(first) # trio-side
n.start_soon( tn.start_soon(
to_asyncio.run_task, to_asyncio.run_task,
aio_sleep_forever, aio_sleep_forever,
) )
@ -73,39 +79,50 @@ async def trio_ctx(
async def main( async def main(
bps_all_over: bool = False, bps_all_over: bool = True,
# TODO, WHICH OF THESE HAZ BUGZ?
cancel_from_root: bool = False,
err_from_root: bool = False,
) -> None: ) -> None:
async with tractor.open_nursery( async with tractor.open_nursery(
# debug_mode=True, debug_mode=True,
) as n: maybe_enable_greenback=True,
# loglevel='devx',
p = await n.start_actor( ) as an:
ptl: Portal = await an.start_actor(
'aio_daemon', 'aio_daemon',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
debug_mode=True, debug_mode=True,
loglevel='cancel', # loglevel='cancel',
) )
async with p.open_context( async with ptl.open_context(
trio_ctx, trio_ctx,
bp_before_started=bps_all_over, bp_before_started=bps_all_over,
) as (ctx, first): ) as (ctx, first):
assert first == 'start' assert first == 'start'
if bps_all_over: # pause in parent to ensure no cross-actor
await tractor.breakpoint() # locking problems exist!
await tractor.pause() # trio-root
# await trio.sleep_forever() if cancel_from_root:
await ctx.cancel() await ctx.cancel()
if err_from_root:
assert 0 assert 0
else:
await trio.sleep_forever()
# TODO: case where we cancel from trio-side while asyncio task # TODO: case where we cancel from trio-side while asyncio task
# has debugger lock? # has debugger lock?
# await p.cancel_actor() # await ptl.cancel_actor()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,5 +1,5 @@
''' '''
Fast fail test with a context. Fast fail test with a `Context`.
Ensure the partially initialized sub-actor process Ensure the partially initialized sub-actor process
doesn't cause a hang on error/cancel of the parent doesn't cause a hang on error/cancel of the parent

View File

@ -7,7 +7,7 @@ async def breakpoint_forever():
try: try:
while True: while True:
yield 'yo' yield 'yo'
await tractor.breakpoint() await tractor.pause()
except BaseException: except BaseException:
tractor.log.get_console_log().exception( tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!' 'Cancelled while trying to enter pause point!'
@ -25,7 +25,8 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='cancel', # loglevel='cancel',
# loglevel='devx',
) as n: ) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__]) p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -10,7 +10,7 @@ async def name_error():
async def breakpoint_forever(): async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor." "Indefinitely re-enter debugger in child actor."
while True: while True:
await tractor.breakpoint() await tractor.pause()
# NOTE: if the test never sent 'q'/'quit' commands # NOTE: if the test never sent 'q'/'quit' commands
# on the pdb repl, without this checkpoint line the # on the pdb repl, without this checkpoint line the

View File

@ -6,7 +6,7 @@ async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor." "Indefinitely re-enter debugger in child actor."
while True: while True:
await trio.sleep(0.1) await trio.sleep(0.1)
await tractor.breakpoint() await tractor.pause()
async def name_error(): async def name_error():

View File

@ -6,19 +6,46 @@ import tractor
async def main() -> None: async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace' # intially unset, no entry.
orig_pybp_var: int = os.environ.get('PYTHONBREAKPOINT')
assert orig_pybp_var in {None, "0"}
async with tractor.open_nursery(
debug_mode=True,
) as an:
assert an
assert (
(pybp_var := os.environ['PYTHONBREAKPOINT'])
==
'tractor.devx._debug._sync_pause_from_builtin'
)
# TODO: an assert that verifies the hook has indeed been, hooked # TODO: an assert that verifies the hook has indeed been, hooked
# XD # XD
assert sys.breakpointhook is not tractor._debug._set_trace assert (
(pybp_hook := sys.breakpointhook)
is not tractor.devx._debug._set_trace
)
print(
f'$PYTHONOBREAKPOINT: {pybp_var!r}\n'
f'`sys.breakpointhook`: {pybp_hook!r}\n'
)
breakpoint() breakpoint()
pass # first bp, tractor hook set.
# TODO: an assert that verifies the hook is unhooked.. # XXX AFTER EXIT (of actor-runtime) verify the hook is unset..
#
# YES, this is weird but it's how stdlib docs say to do it..
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
assert os.environ.get('PYTHONBREAKPOINT') is orig_pybp_var
assert sys.breakpointhook assert sys.breakpointhook
# now ensure a regular builtin pause still works
breakpoint() breakpoint()
pass # last bp, stdlib hook restored
if __name__ == '__main__': if __name__ == '__main__':
trio.run(main) trio.run(main)

View File

@ -10,7 +10,7 @@ async def main():
await trio.sleep(0.1) await trio.sleep(0.1)
await tractor.breakpoint() await tractor.pause()
await trio.sleep(0.1) await trio.sleep(0.1)

View File

@ -11,7 +11,7 @@ async def main(
# loglevel='runtime', # loglevel='runtime',
): ):
while True: while True:
await tractor.breakpoint() await tractor.pause()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -0,0 +1,83 @@
'''
Verify we can dump a `stackscope` tree on a hang.
'''
import os
import signal
import trio
import tractor
@tractor.context
async def start_n_shield_hang(
ctx: tractor.Context,
):
# actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
await ctx.started(os.getpid())
print('Entering shield sleep..')
with trio.CancelScope(shield=True):
await trio.sleep_forever() # in subactor
# XXX NOTE ^^^ since this shields, we expect
# the zombie reaper (aka T800) to engage on
# SIGINT from the user and eventually hard-kill
# this subprocess!
async def main(
from_test: bool = False,
) -> None:
async with (
tractor.open_nursery(
debug_mode=True,
enable_stack_on_sig=True,
# maybe_enable_greenback=False,
loglevel='devx',
) as an,
):
ptl: tractor.Portal = await an.start_actor(
'hanger',
enable_modules=[__name__],
debug_mode=True,
)
async with ptl.open_context(
start_n_shield_hang,
) as (ctx, cpid):
_, proc, _ = an._children[ptl.chan.uid]
assert cpid == proc.pid
print(
'Yo my child hanging..?\n'
# "i'm a user who wants to see a `stackscope` tree!\n"
)
# XXX simulate the wrapping test's "user actions"
# (i.e. if a human didn't run this manually but wants to
# know what they should do to reproduce test behaviour)
if from_test:
print(
f'Sending SIGUSR1 to {cpid!r}!\n'
)
os.kill(
cpid,
signal.SIGUSR1,
)
# simulate user cancelling program
await trio.sleep(0.5)
os.kill(
os.getpid(),
signal.SIGINT,
)
else:
# actually let user send the ctl-c
await trio.sleep_forever() # in root
if __name__ == '__main__':
trio.run(main)

View File

@ -4,9 +4,9 @@ import trio
async def gen(): async def gen():
yield 'yo' yield 'yo'
await tractor.breakpoint() await tractor.pause()
yield 'yo' yield 'yo'
await tractor.breakpoint() await tractor.pause()
@tractor.context @tractor.context
@ -15,7 +15,7 @@ async def just_bp(
) -> None: ) -> None:
await ctx.started() await ctx.started()
await tractor.breakpoint() await tractor.pause()
# TODO: bps and errors in this call.. # TODO: bps and errors in this call..
async for val in gen(): async for val in gen():

View File

@ -4,6 +4,13 @@ import time
import trio import trio
import tractor import tractor
# TODO: only import these when not running from test harness?
# can we detect `pexpect` usage maybe?
# from tractor.devx._debug import (
# get_lock,
# get_debug_req,
# )
def sync_pause( def sync_pause(
use_builtin: bool = False, use_builtin: bool = False,
@ -18,7 +25,13 @@ def sync_pause(
breakpoint(hide_tb=hide_tb) breakpoint(hide_tb=hide_tb)
else: else:
# TODO: maybe for testing some kind of cm style interface
# where the `._set_trace()` call doesn't happen until block
# exit?
# assert get_lock().ctx_in_debug is None
# assert get_debug_req().repl is None
tractor.pause_from_sync() tractor.pause_from_sync()
# assert get_debug_req().repl is None
if error: if error:
raise RuntimeError('yoyo sync code error') raise RuntimeError('yoyo sync code error')
@ -41,10 +54,11 @@ async def start_n_sync_pause(
async def main() -> None: async def main() -> None:
async with ( async with (
tractor.open_nursery( tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True, debug_mode=True,
# loglevel='cancel', maybe_enable_greenback=True,
enable_stack_on_sig=True,
# loglevel='warning',
# loglevel='devx',
) as an, ) as an,
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
@ -138,7 +152,9 @@ async def main() -> None:
# the case 2. from above still exists! # the case 2. from above still exists!
use_builtin=True, use_builtin=True,
), ),
abandon_on_cancel=False, # TODO: with this `False` we can hang!??!
# abandon_on_cancel=False,
abandon_on_cancel=True,
thread_name='inline_root_bg_thread', thread_name='inline_root_bg_thread',
) )

View File

@ -0,0 +1,18 @@
First generate a built disti:
```
python -m pip install --upgrade build
python -m build --sdist --outdir dist/alpha5/
```
Then try a test ``pypi`` upload:
```
python -m twine upload --repository testpypi dist/alpha5/*
```
The push to `pypi` for realz.
```
python -m twine upload --repository testpypi dist/alpha5/*
```

View File

@ -150,6 +150,18 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize("start_method", [spawn_backend], scope='module') metafunc.parametrize("start_method", [spawn_backend], scope='module')
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
# @pytest.fixture
# def open_test_runtime(
# reg_addr: tuple,
# ) -> AsyncContextManager:
# return partial(
# tractor.open_nursery,
# registry_addrs=[reg_addr],
# )
def sig_prog(proc, sig): def sig_prog(proc, sig):
"Kill the actor-process with ``sig``." "Kill the actor-process with ``sig``."
proc.send_signal(sig) proc.send_signal(sig)

View File

View File

@ -0,0 +1,243 @@
'''
`tractor.devx.*` tooling sub-pkg test space.
'''
import time
from typing import (
Callable,
)
import pytest
from pexpect.exceptions import (
TIMEOUT,
)
from pexpect.spawnbase import SpawnBase
from tractor._testing import (
mk_cmd,
)
from tractor.devx._debug import (
_pause_msg as _pause_msg,
_crash_msg as _crash_msg,
_repl_fail_msg as _repl_fail_msg,
_ctlc_ignore_header as _ctlc_ignore_header,
)
from ..conftest import (
_ci_env,
)
@pytest.fixture
def spawn(
start_method,
testdir: pytest.Pytester,
reg_addr: tuple[str, int],
) -> Callable[[str], None]:
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
'''
if start_method != 'trio':
pytest.skip(
'`pexpect` based tests only supported on `trio` backend'
)
def unset_colors():
'''
Python 3.13 introduced colored tracebacks that break patt
matching,
https://docs.python.org/3/using/cmdline.html#envvar-PYTHON_COLORS
https://docs.python.org/3/using/cmdline.html#using-on-controlling-color
'''
import os
os.environ['PYTHON_COLORS'] = '0'
def _spawn(
cmd: str,
**mkcmd_kwargs,
):
unset_colors()
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
# preexec_fn=unset_colors,
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
)
# such that test-dep can pass input script name.
return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if mark.name == 'ctlcs_bish':
pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
f'The test and/or underlying example script can *sometimes* run fine '
f'locally but more then likely until the cpython peeps get their sh#$ together, '
f'this test will definitely not behave like `trio` under SIGINT..\n'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
def expect(
child,
# normally a `pdb` prompt by default
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
PROMPT = r"\(Pdb\+\)"
def in_prompt_msg(
child: SpawnBase,
parts: list[str],
pause_on_false: bool = False,
err_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
before: str = str(child.before.decode())
for part in parts:
if part not in before:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(before)
if err_on_false:
raise ValueError(
f'Could not find pattern in `before` output?\n'
f'part: {part!r}\n'
)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child: SpawnBase,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
assert in_prompt_msg(
child=child,
parts=patts,
# since this is an "assert" helper ;)
err_on_false=True,
**kwargs
)
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> str|None:
before: str|None = None
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
time.sleep(delay)
child.expect(PROMPT)
before = str(child.before.decode())
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
# return the console content up to the final prompt
return before

View File

@ -13,26 +13,25 @@ TODO:
from functools import partial from functools import partial
import itertools import itertools
import platform import platform
import pathlib
import time import time
import pytest import pytest
import pexpect
from pexpect.exceptions import ( from pexpect.exceptions import (
TIMEOUT, TIMEOUT,
EOF, EOF,
) )
from tractor._testing import ( from .conftest import (
examples_dir, do_ctlc,
) PROMPT,
from tractor.devx._debug import (
_pause_msg, _pause_msg,
_crash_msg, _crash_msg,
_repl_fail_msg, _repl_fail_msg,
) )
from .conftest import ( from .conftest import (
_ci_env, expect,
in_prompt_msg,
assert_before,
) )
# TODO: The next great debugger audit could be done by you! # TODO: The next great debugger audit could be done by you!
@ -52,15 +51,6 @@ if platform.system() == 'Windows':
) )
def mk_cmd(ex_name: str) -> str:
'''
Generate a command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
return ' '.join(['python', str(script_path)])
# TODO: was trying to this xfail style but some weird bug i see in CI # TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm # that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin... # thinkin...
@ -79,142 +69,6 @@ has_nested_actors = pytest.mark.has_nested_actors
# ) # )
@pytest.fixture
def spawn(
start_method,
testdir,
reg_addr,
) -> 'pexpect.spawn':
if start_method != 'trio':
pytest.skip(
"Debugger tests are only supported on the trio backend"
)
def _spawn(cmd):
return testdir.spawn(
cmd=mk_cmd(cmd),
expect_timeout=3,
)
return _spawn
PROMPT = r"\(Pdb\+\)"
def expect(
child,
# prompt by default
patt: str = PROMPT,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
__tracebackhide__: bool = False
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
# TODO: todo support terminal color-chars stripping so we can match
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
**kwargs
)
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor.devx._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -238,14 +92,15 @@ def test_root_actor_error(
# scan for the prompt # scan for the prompt
expect(child, PROMPT) expect(child, PROMPT)
before = str(child.before.decode())
# make sure expected logging and error arrives # make sure expected logging and error arrives
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_crash_msg, "('root'"] [
_crash_msg,
"('root'",
'AssertionError',
]
) )
assert 'AssertionError' in before
# send user command # send user command
child.sendline(user_input) child.sendline(user_input)
@ -264,8 +119,10 @@ def test_root_actor_error(
ids=lambda item: f'{item[0]} -> {item[1]}', ids=lambda item: f'{item[0]} -> {item[1]}',
) )
def test_root_actor_bp(spawn, user_in_out): def test_root_actor_bp(spawn, user_in_out):
"""Demonstrate breakpoint from in root actor. '''
""" Demonstrate breakpoint from in root actor.
'''
user_input, expect_err_str = user_in_out user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint') child = spawn('root_actor_breakpoint')
@ -279,7 +136,7 @@ def test_root_actor_bp(spawn, user_in_out):
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
if expect_err_str is None: if expect_err_str is None:
assert 'Error' not in str(child.before) assert 'Error' not in str(child.before)
@ -287,38 +144,6 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before) assert expect_err_str in str(child.before)
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> None:
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(PROMPT)
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
def test_root_actor_bp_forever( def test_root_actor_bp_forever(
spawn, spawn,
ctlc: bool, ctlc: bool,
@ -358,7 +183,7 @@ def test_root_actor_bp_forever(
# quit out of the loop # quit out of the loop
child.sendline('q') child.sendline('q')
child.expect(pexpect.EOF) child.expect(EOF)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -380,10 +205,12 @@ def test_subactor_error(
# scan for the prompt # scan for the prompt
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_crash_msg, "('name_error'"] [
_crash_msg,
"('name_error'",
]
) )
if do_next: if do_next:
@ -402,17 +229,15 @@ def test_subactor_error(
child.sendline('continue') child.sendline('continue')
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode()) assert in_prompt_msg(
child,
[
_crash_msg,
# root actor gets debugger engaged # root actor gets debugger engaged
assert in_prompt_msg( "('root'",
before,
[_crash_msg, "('root'"]
)
# error is a remote error propagated from the subactor # error is a remote error propagated from the subactor
assert in_prompt_msg( "('name_error'",
before, ]
[_crash_msg, "('name_error'"]
) )
# another round # another round
@ -423,7 +248,7 @@ def test_subactor_error(
child.expect('\r\n') child.expect('\r\n')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
def test_subactor_breakpoint( def test_subactor_breakpoint(
@ -433,14 +258,11 @@ def test_subactor_breakpoint(
"Single subactor with an infinite breakpoint loop" "Single subactor with an infinite breakpoint loop"
child = spawn('subactor_breakpoint') child = spawn('subactor_breakpoint')
# scan for the prompt
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_pause_msg, "('breakpoint_forever'"] [_pause_msg,
"('breakpoint_forever'",]
) )
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
@ -456,9 +278,8 @@ def test_subactor_breakpoint(
for _ in range(5): for _ in range(5):
child.sendline('continue') child.sendline('continue')
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_pause_msg, "('breakpoint_forever'"] [_pause_msg, "('breakpoint_forever'"]
) )
@ -471,9 +292,8 @@ def test_subactor_breakpoint(
# child process should exit but parent will capture pdb.BdbQuit # child process should exit but parent will capture pdb.BdbQuit
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
['RemoteActorError:', ['RemoteActorError:',
"('breakpoint_forever'", "('breakpoint_forever'",
'bdb.BdbQuit',] 'bdb.BdbQuit',]
@ -486,11 +306,10 @@ def test_subactor_breakpoint(
child.sendline('c') child.sendline('c')
# process should exit # process should exit
child.expect(pexpect.EOF) child.expect(EOF)
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
['RemoteActorError:', ['RemoteActorError:',
"('breakpoint_forever'", "('breakpoint_forever'",
'bdb.BdbQuit',] 'bdb.BdbQuit',]
@ -514,7 +333,7 @@ def test_multi_subactors(
before = str(child.before.decode()) before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_pause_msg, "('breakpoint_forever'"] [_pause_msg, "('breakpoint_forever'"]
) )
@ -535,12 +354,14 @@ def test_multi_subactors(
# first name_error failure # first name_error failure
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_crash_msg, "('name_error'"] [
_crash_msg,
"('name_error'",
"NameError",
]
) )
assert "NameError" in before
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -564,9 +385,8 @@ def test_multi_subactors(
# breakpoint loop should re-engage # breakpoint loop should re-engage
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_pause_msg, "('breakpoint_forever'"] [_pause_msg, "('breakpoint_forever'"]
) )
@ -629,7 +449,7 @@ def test_multi_subactors(
# process should exit # process should exit
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
# repeat of previous multierror for final output # repeat of previous multierror for final output
assert_before(child, [ assert_before(child, [
@ -659,25 +479,28 @@ def test_multi_daemon_subactors(
# the root's tty lock first so anticipate either crash # the root's tty lock first so anticipate either crash
# message on the first entry. # message on the first entry.
bp_forev_parts = [_pause_msg, "('bp_forever'"] bp_forev_parts = [
_pause_msg,
"('bp_forever'",
]
bp_forev_in_msg = partial( bp_forev_in_msg = partial(
in_prompt_msg, in_prompt_msg,
parts=bp_forev_parts, parts=bp_forev_parts,
) )
name_error_msg = "NameError: name 'doggypants' is not defined" name_error_msg: str = "NameError: name 'doggypants' is not defined"
name_error_parts = [name_error_msg] name_error_parts: list[str] = [name_error_msg]
before = str(child.before.decode()) before = str(child.before.decode())
if bp_forev_in_msg(prompt=before): if bp_forev_in_msg(child=child):
next_parts = name_error_parts next_parts = name_error_parts
elif name_error_msg in before: elif name_error_msg in before:
next_parts = bp_forev_parts next_parts = bp_forev_parts
else: else:
raise ValueError("Neither log msg was found !?") raise ValueError('Neither log msg was found !?')
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -746,14 +569,12 @@ def test_multi_daemon_subactors(
# wait for final error in root # wait for final error in root
# where it crashs with boxed error # where it crashs with boxed error
while True: while True:
try:
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(PROMPT)
assert_before( if not in_prompt_msg(
child, child,
bp_forev_parts bp_forev_parts
) ):
except AssertionError:
break break
assert_before( assert_before(
@ -769,7 +590,7 @@ def test_multi_daemon_subactors(
) )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
@has_nested_actors @has_nested_actors
@ -845,7 +666,7 @@ def test_multi_subactors_root_errors(
]) ])
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
assert_before(child, [ assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'", # "Attaching to pdb in crashed actor: ('root'",
@ -934,10 +755,13 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock') child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(PROMPT) child.expect(PROMPT)
assert_before(
before = str(child.before.decode()) child,
assert "NameError: name 'doggypants' is not defined" in before [
assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before "NameError: name 'doggypants' is not defined",
"tractor._exceptions.RemoteActorError: ('name_error'",
],
)
time.sleep(0.5) time.sleep(0.5)
if ctlc: if ctlc:
@ -975,7 +799,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(3): for i in range(3):
try: try:
child.expect(pexpect.EOF, timeout=0.5) child.expect(EOF, timeout=0.5)
break break
except TIMEOUT: except TIMEOUT:
child.sendline('c') child.sendline('c')
@ -1017,7 +841,7 @@ def test_root_cancels_child_context_during_startup(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_different_debug_mode_per_actor( def test_different_debug_mode_per_actor(
@ -1028,9 +852,8 @@ def test_different_debug_mode_per_actor(
child.expect(PROMPT) child.expect(PROMPT)
# only one actor should enter the debugger # only one actor should enter the debugger
before = str(child.before.decode())
assert in_prompt_msg( assert in_prompt_msg(
before, child,
[_crash_msg, "('debugged_boi'", "RuntimeError"], [_crash_msg, "('debugged_boi'", "RuntimeError"],
) )
@ -1038,9 +861,7 @@ def test_different_debug_mode_per_actor(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
before = str(child.before.decode())
# NOTE: this debugged actor error currently WON'T show up since the # NOTE: this debugged actor error currently WON'T show up since the
# root will actually cancel and terminate the nursery before the error # root will actually cancel and terminate the nursery before the error
@ -1059,103 +880,6 @@ def test_different_debug_mode_per_actor(
) )
def test_pause_from_sync(
spawn,
ctlc: bool
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
before = str(child.before.decode())
assert not in_prompt_msg(
before,
['`greenback` portal opened!'],
)
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
while attach_patts:
child.sendline('c')
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts.copy():
if key in before:
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg] + expected_patts
)
break
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.items():
assert not in_prompt_msg(
before,
other_patts,
)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(pexpect.EOF)
def test_post_mortem_api( def test_post_mortem_api(
spawn, spawn,
ctlc: bool, ctlc: bool,
@ -1258,7 +982,7 @@ def test_post_mortem_api(
# ) # )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
def test_shield_pause( def test_shield_pause(
@ -1333,9 +1057,26 @@ def test_shield_pause(
] ]
) )
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(EOF)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead
# (in terms of `greenback` and/or extra threads) and if it's from
# a sync scope suggest that usage must first call
# `ensure_portal()` in the (eventual parent) async calling scope?
def test_sync_pause_from_bg_task_in_root_actor_():
'''
When used from the root actor, normally we can only implicitly
support `.pause_from_sync()` from the main-parent-task (that
opens the runtime via `open_root_actor()`) since `greenback`
requires a `.ensure_portal()` call per `trio.Task` where it is
used.
'''
...
# TODO: needs ANSI code stripping tho, see `assert_before()` # above! # TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden(): def test_correct_frames_below_hidden():
''' '''

View File

@ -0,0 +1,381 @@
'''
That "foreign loop/thread" debug REPL support better ALSO WORK!
Same as `test_native_pause.py`.
All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually.
'''
from contextlib import (
contextmanager as cm,
)
# from functools import partial
# import itertools
import time
# from typing import (
# Iterator,
# )
import pytest
from pexpect.exceptions import (
TIMEOUT,
EOF,
)
from .conftest import (
# _ci_env,
do_ctlc,
PROMPT,
# expect,
in_prompt_msg,
assert_before,
_pause_msg,
_crash_msg,
_ctlc_ignore_header,
# _repl_fail_msg,
)
@cm
def maybe_expect_timeout(
ctlc: bool = False,
) -> None:
try:
yield
except TIMEOUT:
# breakpoint()
if ctlc:
pytest.xfail(
'Some kinda redic threading SIGINT bug i think?\n'
'See the notes in `examples/debugging/sync_bp.py`..\n'
)
raise
@pytest.mark.ctlcs_bish
def test_pause_from_sync(
spawn,
ctlc: bool,
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
# first `sync_pause()` after nurseries open
child.expect(PROMPT)
assert_before(
child,
[
# pre-prompt line
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(child)
# ^NOTE^ subactor not spawned yet; don't need extra delay.
child.sendline('c')
# first `await tractor.pause()` inside `p.open_context()` body
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
# assert not in_prompt_msg(
# child,
# ['`greenback` portal opened!'],
# )
# should be same root task
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
# XXX, fwiw without a brief sleep here the SIGINT might actually
# trigger "subactor" cancellation by its parent before the
# shield-handler is engaged.
#
# => similar to the `delay` input to `do_ctlc()` below, setting
# this too low can cause the test to fail since the `subactor`
# suffers a race where the root/parent sends an actor-cancel
# prior to the context task hitting its pause point (and thus
# engaging the `sigint_shield()` handler in time); this value
# seems be good enuf?
time.sleep(0.6)
# one of the bg thread or subactor should have
# `Lock.acquire()`-ed
# (NOT both, which will result in REPL clobbering!)
attach_patts: dict[str, list[str]] = {
'subactor': [
"'start_n_sync_pause'",
"('subactor'",
],
'inline_root_bg_thread': [
"<Thread(inline_root_bg_thread",
"('root'",
],
'start_soon_root_bg_thread': [
"<Thread(start_soon_root_bg_thread",
"('root'",
],
}
conts: int = 0 # for debugging below matching logic on failure
while attach_patts:
child.sendline('c')
conts += 1
child.expect(PROMPT)
before = str(child.before.decode())
for key in attach_patts:
if key in before:
attach_key: str = key
expected_patts: str = attach_patts.pop(key)
assert_before(
child,
[_pause_msg]
+
expected_patts
)
break
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=attach_key,
# NOTE same as comment above
delay=0.4,
)
child.sendline('c')
# XXX TODO, weird threading bug it seems despite the
# `abandon_on_cancel: bool` setting to
# `trio.to_thread.run_sync()`..
with maybe_expect_timeout(
ctlc=ctlc,
):
child.expect(EOF)
def expect_any_of(
attach_patts: dict[str, list[str]],
child, # what type?
ctlc: bool = False,
prompt: str = _ctlc_ignore_header,
ctlc_delay: float = .4,
) -> list[str]:
'''
Receive any of a `list[str]` of patterns provided in
`attach_patts`.
Used to test racing prompts from multiple actors and/or
tasks using a common root process' `pdbp` REPL.
'''
assert attach_patts
child.expect(PROMPT)
before = str(child.before.decode())
for attach_key in attach_patts:
if attach_key in before:
expected_patts: str = attach_patts.pop(attach_key)
assert_before(
child,
expected_patts
)
break # from for
else:
pytest.fail(
f'No keys found?\n\n'
f'{attach_patts.keys()}\n\n'
f'{before}\n'
)
# ensure no other task/threads engaged a REPL
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
child,
other_patts,
)
if ctlc:
do_ctlc(
child,
patt=prompt,
# NOTE same as comment above
delay=ctlc_delay,
)
return expected_patts
@pytest.mark.ctlcs_bish
def test_sync_pause_from_aio_task(
spawn,
ctlc: bool
# ^TODO, fix for `asyncio`!!
):
'''
Verify we can use the `pdbp` REPL from an `asyncio.Task` spawned using
APIs in `.to_asyncio`.
`examples/debugging/asycio_bp.py`
'''
child = spawn('asyncio_bp')
# RACE on whether trio/asyncio task bps first
attach_patts: dict[str, list[str]] = {
# first pause in guest-mode (aka "infecting")
# `trio.Task`.
'trio-side': [
_pause_msg,
"<Task 'trio_ctx'",
"('aio_daemon'",
],
# `breakpoint()` from `asyncio.Task`.
'asyncio-side': [
_pause_msg,
"<Task pending name='Task-2' coro=<greenback_shim()",
"('aio_daemon'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
# NOW in race order,
# - the asyncio-task will error
# - the root-actor parent task will pause
#
attach_patts: dict[str, list[str]] = {
# error raised in `asyncio.Task`
"raise ValueError('asyncio side error!')": [
_crash_msg,
"<Task 'trio_ctx'",
"@ ('aio_daemon'",
"ValueError: asyncio side error!",
# XXX, we no longer show this frame by default!
# 'return await chan.receive()', # `.to_asyncio` impl internals in tb
],
# parent-side propagation via actor-nursery/portal
# "tractor._exceptions.RemoteActorError: remote task raised a 'ValueError'": [
"remote task raised a 'ValueError'": [
_crash_msg,
"src_uid=('aio_daemon'",
"('aio_daemon'",
],
# a final pause in root-actor
"<Task '__main__.main'": [
_pause_msg,
"<Task '__main__.main'",
"('root'",
],
}
while attach_patts:
expect_any_of(
attach_patts=attach_patts,
child=child,
ctlc=ctlc,
)
child.sendline('c')
assert not attach_patts
# final boxed error propagates to root
assert_before(
child,
[
_crash_msg,
"<Task '__main__.main'",
"('root'",
"remote task raised a 'ValueError'",
"ValueError: asyncio side error!",
]
)
if ctlc:
do_ctlc(
child,
# NOTE: setting this to 0 (or some other sufficient
# small val) can cause the test to fail since the
# `subactor` suffers a race where the root/parent
# sends an actor-cancel prior to it hitting its pause
# point; by def the value is 0.1
delay=0.4,
)
child.sendline('c')
# with maybe_expect_timeout():
child.expect(EOF)
def test_sync_pause_from_non_greenbacked_aio_task():
'''
Where the `breakpoint()` caller task is NOT spawned by
`tractor.to_asyncio` and thus never activates
a `greenback.ensure_portal()` beforehand, presumably bc the task
was started by some lib/dep as in often seen in the field.
Ensure sync pausing works when the pause is in,
- the root actor running in infected-mode?
|_ since we don't need any IPC to acquire the debug lock?
|_ is there some way to handle this like the non-main-thread case?
All other cases need to error out appropriately right?
- for any subactor we can't avoid needing the repl lock..
|_ is there a way to hook into `asyncio.ensure_future(obj)`?
'''
pass

View File

@ -0,0 +1,172 @@
'''
That "native" runtime-hackin toolset better be dang useful!
Verify the funtion of a variety of "developer-experience" tools we
offer from the `.devx` sub-pkg:
- use of the lovely `stackscope` for dumping actor `trio`-task trees
during operation and hangs.
TODO:
- demonstration of `CallerInfo` call stack frame filtering such that
for logging and REPL purposes a user sees exactly the layers needed
when debugging a problem inside the stack vs. in their app.
'''
import os
import signal
import time
from .conftest import (
expect,
assert_before,
in_prompt_msg,
PROMPT,
_pause_msg,
)
from pexpect.exceptions import (
# TIMEOUT,
EOF,
)
def test_shield_pause(
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
already cancelled `trio.CancelScope` and that you can step to the
next checkpoint wherein the cancelled will get raised.
'''
child = spawn(
'shield_hang_in_sub'
)
expect(
child,
'Yo my child hanging..?',
)
assert_before(
child,
[
'Entering shield sleep..',
'Enabling trace-trees on `SIGUSR1` since `stackscope` is installed @',
]
)
script_pid: int = child.pid
print(
f'Sending SIGUSR1 to {script_pid}\n'
f'(kill -s SIGUSR1 {script_pid})\n'
)
os.kill(
script_pid,
signal.SIGUSR1,
)
time.sleep(0.2)
expect(
child,
# end-of-tree delimiter
"end-of-\('root'",
)
assert_before(
child,
[
# 'Srying to dump `stackscope` tree..',
# 'Dumping `stackscope` tree for actor',
"('root'", # uid line
# TODO!? this used to show?
# -[ ] mk reproducable for @oremanj?
#
# parent block point (non-shielded)
# 'await trio.sleep_forever() # in root',
]
)
expect(
child,
# end-of-tree delimiter
"end-of-\('hanger'",
)
assert_before(
child,
[
# relay to the sub should be reported
'Relaying `SIGUSR1`[10] to sub-actor',
"('hanger'", # uid line
# TODO!? SEE ABOVE
# hanger LOC where it's shield-halted
# 'await trio.sleep_forever() # in subactor',
]
)
# simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since
# the sub is shield blocking and can't respond to SIGINT.
os.kill(
child.pid,
signal.SIGINT,
)
expect(
child,
'Shutting down actor runtime',
timeout=6,
)
assert_before(
child,
[
'raise KeyboardInterrupt',
# 'Shutting down actor runtime',
'#T-800 deployed to collect zombie B0',
"'--uid', \"('hanger',",
]
)
def test_breakpoint_hook_restored(
spawn,
):
'''
Ensures our actor runtime sets a custom `breakpoint()` hook
on open then restores the stdlib's default on close.
The hook state validation is done via `assert`s inside the
invoked script with only `breakpoint()` (not `tractor.pause()`)
calls used.
'''
child = spawn('restore_builtin_breakpoint')
child.expect(PROMPT)
assert_before(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
"first bp, tractor hook set",
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
"last bp, stdlib hook restored",
]
)
# since the stdlib hook was already restored there should be NO
# `tractor` `log.pdb()` content from console!
assert not in_prompt_msg(
child,
[
_pause_msg,
"<Task '__main__.main'",
"('root'",
],
)
child.sendline('c')
child.expect(EOF)

View File

@ -130,7 +130,7 @@ def test_multierror(
try: try:
await portal2.result() await portal2.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.boxed_type == AssertionError assert err.boxed_type is AssertionError
print("Look Maa that first actor failed hard, hehh") print("Look Maa that first actor failed hard, hehh")
raise raise
@ -182,7 +182,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions: for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.boxed_type == AssertionError assert exc.boxed_type is AssertionError
async def do_nothing(): async def do_nothing():
@ -504,7 +504,9 @@ def test_cancel_via_SIGINT_other_task(
if is_win(): # smh if is_win(): # smh
timeout += 1 timeout += 1
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED): async def spawn_and_sleep_forever(
task_status=trio.TASK_STATUS_IGNORED
):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
for i in range(3): for i in range(3):
await tn.run_in_actor( await tn.run_in_actor(

View File

@ -955,7 +955,7 @@ async def echo_back_sequence(
) )
await ctx.started() await ctx.started()
# await tractor.breakpoint() # await tractor.pause()
async with ctx.open_stream( async with ctx.open_stream(
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,

View File

@ -2,31 +2,53 @@
The hipster way to force SC onto the stdlib's "async": 'infection mode'. The hipster way to force SC onto the stdlib's "async": 'infection mode'.
''' '''
from typing import Optional, Iterable, Union
import asyncio import asyncio
import builtins import builtins
from contextlib import ExitStack
# from functools import partial
import itertools import itertools
import importlib import importlib
import os
from pathlib import Path
import signal
from typing import (
Callable,
Iterable,
Union,
)
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor import ( from tractor import (
current_actor,
Actor,
to_asyncio, to_asyncio,
RemoteActorError, RemoteActorError,
ContextCancelled, ContextCancelled,
_state,
) )
from tractor.trionics import BroadcastReceiver from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
@pytest.fixture(
scope='module',
)
def delay(debug_mode: bool) -> int:
if debug_mode:
return 999
else:
return 1
async def sleep_and_err( async def sleep_and_err(
sleep_for: float = 0.1, sleep_for: float = 0.1,
# just signature placeholders for compat with # just signature placeholders for compat with
# ``to_asyncio.open_channel_from()`` # ``to_asyncio.open_channel_from()``
to_trio: Optional[trio.MemorySendChannel] = None, to_trio: trio.MemorySendChannel|None = None,
from_trio: Optional[asyncio.Queue] = None, from_trio: asyncio.Queue|None = None,
): ):
if to_trio: if to_trio:
@ -36,7 +58,7 @@ async def sleep_and_err(
assert 0 assert 0
async def sleep_forever(): async def aio_sleep_forever():
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@ -44,20 +66,26 @@ async def trio_cancels_single_aio_task():
# spawn an ``asyncio`` task to run a func and return result # spawn an ``asyncio`` task to run a func and return result
with trio.move_on_after(.2): with trio.move_on_after(.2):
await tractor.to_asyncio.run_task(sleep_forever) await tractor.to_asyncio.run_task(aio_sleep_forever)
def test_trio_cancels_aio_on_actor_side(reg_addr): def test_trio_cancels_aio_on_actor_side(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
''' '''
Spawn an infected actor that is cancelled by the ``trio`` side Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis. task using std cancel scope apis.
''' '''
async def main(): async def main():
with trio.fail_after(1 + delay):
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr] registry_addrs=[reg_addr],
) as n: debug_mode=debug_mode,
await n.run_in_actor( ) as an:
await an.run_in_actor(
trio_cancels_single_aio_task, trio_cancels_single_aio_task,
infect_asyncio=True, infect_asyncio=True,
) )
@ -66,14 +94,22 @@ def test_trio_cancels_aio_on_actor_side(reg_addr):
async def asyncio_actor( async def asyncio_actor(
target: str, target: str,
expect_err: Exception|None = None expect_err: Exception|None = None
) -> None: ) -> None:
assert tractor.current_actor().is_infected_aio() # ensure internal runtime state is consistent
target = globals()[target] actor: Actor = tractor.current_actor()
assert (
actor.is_infected_aio()
and
actor._infected_aio
and
_state._runtime_vars['_is_infected_aio']
)
target: Callable = globals()[target]
if '.' in expect_err: if '.' in expect_err:
modpath, _, name = expect_err.rpartition('.') modpath, _, name = expect_err.rpartition('.')
@ -89,12 +125,17 @@ async def asyncio_actor(
except BaseException as err: except BaseException as err:
if expect_err: if expect_err:
assert isinstance(err, error_type) assert isinstance(err, error_type), (
f'{type(err)} is not {error_type}?'
)
raise raise
def test_aio_simple_error(reg_addr): def test_aio_simple_error(
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify a simple remote asyncio error propagates back through trio Verify a simple remote asyncio error propagates back through trio
to the parent actor. to the parent actor.
@ -103,9 +144,10 @@ def test_aio_simple_error(reg_addr):
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr] registry_addrs=[reg_addr],
) as n: debug_mode=debug_mode,
await n.run_in_actor( ) as an:
await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_and_err', target='sleep_and_err',
expect_err='AssertionError', expect_err='AssertionError',
@ -128,19 +170,24 @@ def test_aio_simple_error(reg_addr):
assert err assert err
assert isinstance(err, RemoteActorError) assert isinstance(err, RemoteActorError)
assert err.boxed_type == AssertionError assert err.boxed_type is AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify we can cancel a spawned asyncio task gracefully. Verify we can cancel a spawned asyncio task gracefully.
''' '''
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.run_in_actor( debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
infect_asyncio=True, infect_asyncio=True,
) )
@ -150,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
trio.run(main) trio.run(main)
def test_trio_cancels_aio(reg_addr): def test_trio_cancels_aio(
reg_addr: tuple[str, int],
):
''' '''
Much like the above test with ``tractor.Portal.cancel_actor()`` Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api. except we just use a standard ``trio`` cancellation api.
@ -161,10 +210,10 @@ def test_trio_cancels_aio(reg_addr):
with trio.move_on_after(1): with trio.move_on_after(1):
# cancel the nursery shortly after boot # cancel the nursery shortly after boot
async with tractor.open_nursery() as n: async with tractor.open_nursery() as tn:
await n.run_in_actor( await tn.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
infect_asyncio=True, infect_asyncio=True,
) )
@ -181,10 +230,14 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first" # this will block until the ``asyncio`` task sends a "first"
# message. # message.
with trio.fail_after(2): delay: int = 999 if tractor.debug_mode() else 1
with trio.fail_after(1 + delay):
try:
async with ( async with (
trio.open_nursery() as n, trio.open_nursery(
# TODO, for new `trio` / py3.13
# strict_exception_groups=False,
) as tn,
tractor.to_asyncio.open_channel_from( tractor.to_asyncio.open_channel_from(
sleep_and_err, sleep_and_err,
) as (first, chan), ) as (first, chan),
@ -193,12 +246,20 @@ async def trio_ctx(
assert first == 'start' assert first == 'start'
# spawn another asyncio task for the cuck of it. # spawn another asyncio task for the cuck of it.
n.start_soon( tn.start_soon(
tractor.to_asyncio.run_task, tractor.to_asyncio.run_task,
sleep_forever, aio_sleep_forever,
) )
await trio.sleep_forever() await trio.sleep_forever()
# TODO, factor this into a `trionics.collapse()`?
except* BaseException as beg:
# await tractor.pause(shield=True)
if len(excs := beg.exceptions) == 1:
raise excs[0]
else:
raise
@pytest.mark.parametrize( @pytest.mark.parametrize(
'parent_cancels', 'parent_cancels',
@ -206,8 +267,10 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format ids='parent_actor_cancels_child={}'.format
) )
def test_context_spawns_aio_task_that_errors( def test_context_spawns_aio_task_that_errors(
reg_addr, reg_addr: tuple[str, int],
delay: int,
parent_cancels: bool, parent_cancels: bool,
debug_mode: bool,
): ):
''' '''
Verify that spawning a task via an intertask channel ctx mngr that Verify that spawning a task via an intertask channel ctx mngr that
@ -216,14 +279,13 @@ def test_context_spawns_aio_task_that_errors(
''' '''
async def main(): async def main():
with trio.fail_after(1 + delay):
with trio.fail_after(2): async with tractor.open_nursery() as an:
async with tractor.open_nursery() as n: p = await an.start_actor(
p = await n.start_actor(
'aio_daemon', 'aio_daemon',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
# debug_mode=True, debug_mode=debug_mode,
loglevel='cancel', loglevel='cancel',
) )
async with ( async with (
@ -272,7 +334,7 @@ def test_context_spawns_aio_task_that_errors(
err = excinfo.value err = excinfo.value
assert isinstance(err, expect) assert isinstance(err, expect)
assert err.boxed_type == AssertionError assert err.boxed_type is AssertionError
async def aio_cancel(): async def aio_cancel():
@ -281,23 +343,38 @@ async def aio_cancel():
''' '''
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
task = asyncio.current_task()
# cancel and enter sleep # cancel and enter sleep
task = asyncio.current_task()
task.cancel() task.cancel()
await sleep_forever() await aio_sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): def test_aio_cancelled_from_aio_causes_trio_cancelled(
reg_addr: tuple,
delay: int,
):
'''
When the `asyncio.Task` cancels itself the `trio` side should
also cancel and teardown and relay the cancellation cross-process
to the parent caller.
'''
async def main(): async def main():
async with tractor.open_nursery() as n:
await n.run_in_actor( an: tractor.ActorNursery
async with tractor.open_nursery() as an:
p: tractor.Portal = await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='aio_cancel', target='aio_cancel',
expect_err='tractor.to_asyncio.AsyncioCancelled', expect_err='tractor.to_asyncio.AsyncioCancelled',
infect_asyncio=True, infect_asyncio=True,
) )
# NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here
# to avoid indent levels.
with trio.fail_after(1 + delay):
await p.wait_for_result()
with pytest.raises( with pytest.raises(
expected_exception=(RemoteActorError, ExceptionGroup), expected_exception=(RemoteActorError, ExceptionGroup),
@ -305,15 +382,15 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
trio.run(main) trio.run(main)
# might get multiple `trio.Cancelled`s as well inside an inception # might get multiple `trio.Cancelled`s as well inside an inception
err = excinfo.value err: RemoteActorError|ExceptionGroup = excinfo.value
if isinstance(err, ExceptionGroup): if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile( excs = err.exceptions
lambda exc: not isinstance(exc, tractor.RemoteActorError), assert len(excs) == 1
err.exceptions final_exc = excs[0]
)) assert isinstance(final_exc, tractor.RemoteActorError)
assert err
# ensure boxed error is correct # relayed boxed error should be our `trio`-task's
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
assert err.boxed_type == to_asyncio.AsyncioCancelled assert err.boxed_type == to_asyncio.AsyncioCancelled
@ -323,15 +400,18 @@ async def no_to_trio_in_args():
async def push_from_aio_task( async def push_from_aio_task(
sequence: Iterable, sequence: Iterable,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
expect_cancel: False, expect_cancel: False,
fail_early: bool, fail_early: bool,
exit_early: bool,
) -> None: ) -> None:
try: try:
# print('trying breakpoint')
# breakpoint()
# sync caller ctx manager # sync caller ctx manager
to_trio.send_nowait(True) to_trio.send_nowait(True)
@ -340,10 +420,27 @@ async def push_from_aio_task(
to_trio.send_nowait(i) to_trio.send_nowait(i)
await asyncio.sleep(0.001) await asyncio.sleep(0.001)
if i == 50 and fail_early: if (
i == 50
):
if fail_early:
print('Raising exc from aio side!')
raise Exception raise Exception
print('asyncio streamer complete!') if exit_early:
# TODO? really you could enforce the same
# SC-proto we use for actors here with asyncio
# such that a Return[None] msg would be
# implicitly delivered to the trio side?
#
# XXX => this might be the end-all soln for
# converting any-inter-task system (regardless
# of maybe-remote runtime or language) to be
# SC-compat no?
print(f'asyncio breaking early @ {i!r}')
break
print('asyncio streaming complete!')
except asyncio.CancelledError: except asyncio.CancelledError:
if not expect_cancel: if not expect_cancel:
@ -355,10 +452,10 @@ async def push_from_aio_task(
async def stream_from_aio( async def stream_from_aio(
trio_exit_early: bool = False,
exit_early: bool = False, trio_raise_err: bool = False,
raise_err: bool = False,
aio_raise_err: bool = False, aio_raise_err: bool = False,
aio_exit_early: bool = False,
fan_out: bool = False, fan_out: bool = False,
) -> None: ) -> None:
@ -371,8 +468,18 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
push_from_aio_task, push_from_aio_task,
sequence=seq, sequence=seq,
expect_cancel=raise_err or exit_early, expect_cancel=trio_raise_err or trio_exit_early,
fail_early=aio_raise_err, fail_early=aio_raise_err,
exit_early=aio_exit_early,
# such that we can test exit early cases
# for each side explicitly.
suppress_graceful_exits=(not(
aio_exit_early
or
trio_exit_early
))
) as (first, chan): ) as (first, chan):
assert first is True assert first is True
@ -384,17 +491,28 @@ async def stream_from_aio(
], ],
): ):
async for value in chan: async for value in chan:
print(f'trio received {value}') print(f'trio received: {value!r}')
# XXX, debugging EoC not being handled correctly
# in `transate_aio_errors()`..
# if value is None:
# await tractor.pause(shield=True)
pulled.append(value) pulled.append(value)
if value == 50: if value == 50:
if raise_err: if trio_raise_err:
raise Exception raise Exception
elif exit_early: elif trio_exit_early:
print('`consume()` breaking early!\n')
break break
print('returning from `consume()`..\n')
# run 2 tasks each pulling from
# the inter-task-channel with the 2nd
# using a fan-out `BroadcastReceiver`.
if fan_out: if fan_out:
# start second task that get's the same stream value set.
async with ( async with (
# NOTE: this has to come first to avoid # NOTE: this has to come first to avoid
@ -402,19 +520,31 @@ async def stream_from_aio(
# tasks are joined.. # tasks are joined..
chan.subscribe() as br, chan.subscribe() as br,
trio.open_nursery() as n, trio.open_nursery() as tn,
): ):
n.start_soon(consume, br) # start 2nd task that get's broadcast the same
# value set.
tn.start_soon(consume, br)
await consume(chan) await consume(chan)
else: else:
await consume(chan) await consume(chan)
except BaseException as err:
import logging
log = logging.getLogger()
log.exception('aio-subactor errored!\n')
raise err
finally: finally:
if ( if not (
not raise_err and trio_raise_err
not exit_early and or
not aio_raise_err trio_exit_early
or
aio_raise_err
or
aio_exit_early
): ):
if fan_out: if fan_out:
# we get double the pulled values in the # we get double the pulled values in the
@ -424,22 +554,27 @@ async def stream_from_aio(
assert list(sorted(pulled)) == expect assert list(sorted(pulled)) == expect
else: else:
# await tractor.pause()
assert pulled == expect assert pulled == expect
else: else:
assert not fan_out assert not fan_out
assert pulled == expect[:51] assert pulled == expect[:51]
print('trio guest mode task completed!') print('trio guest-mode task completed!')
assert chan._aio_task.done()
@pytest.mark.parametrize( @pytest.mark.parametrize(
'fan_out', [False, True], 'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format ids='fan_out_w_chan_subscribe={}'.format
) )
def test_basic_interloop_channel_stream(reg_addr, fan_out): def test_basic_interloop_channel_stream(
reg_addr: tuple[str, int],
fan_out: bool,
):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
infect_asyncio=True, infect_asyncio=True,
fan_out=fan_out, fan_out=fan_out,
@ -453,10 +588,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here? # TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(reg_addr): def test_trio_error_cancels_intertask_chan(reg_addr):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
raise_err=True, trio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should trigger remote actor error # should trigger remote actor error
@ -466,28 +601,119 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
trio.run(main) trio.run(main)
# ensure boxed error type # ensure boxed error type
excinfo.value.boxed_type == Exception excinfo.value.boxed_type is Exception
def test_trio_closes_early_and_channel_exits(reg_addr): def test_trio_closes_early_causes_aio_checkpoint_raise(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
'''
Check that if the `trio`-task "exits early and silently" (in this
case during `async for`-ing the inter-task-channel via
a `break`-from-loop), we raise `TrioTaskExited` on the
`asyncio`-side which also then bubbles up through the
`open_channel_from()` block indicating that the `asyncio.Task`
hit a ran another checkpoint despite the `trio.Task` exit.
'''
async def main(): async def main():
async with tractor.open_nursery() as n: with trio.fail_after(1 + delay):
portal = await n.run_in_actor( async with tractor.open_nursery(
debug_mode=debug_mode,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
exit_early=True, trio_exit_early=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should raise RAE diectly # should raise RAE diectly
await portal.result() print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit # should be a quiet exit on a simple channel exit
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main) trio.run(main)
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
def test_aio_exits_early_relays_AsyncioTaskExited(
# TODO, parametrize the 3 possible trio side conditions:
# - trio blocking on receive, aio exits early
# - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int],
debug_mode: bool,
delay: int,
):
'''
Check that if the `asyncio`-task "exits early and silently" (in this
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
it `break`s from the loop), we raise `AsyncioTaskExited` on the
`trio`-side which then DOES NOT BUBBLE up through the
`open_channel_from()` block UNLESS,
- the trio.Task also errored/cancelled, in which case we wrap
both errors in an eg
- the trio.Task was blocking on rxing a value from the
`InterLoopTaskChannel`.
'''
async def main(): async def main():
async with tractor.open_nursery() as n: with trio.fail_after(1 + delay):
portal = await n.run_in_actor( async with tractor.open_nursery(
debug_mode=debug_mode,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
trio_exit_early=False,
aio_exit_early=True,
)
# should raise RAE diectly
print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
exc = excinfo.value
# TODO, wow bug!
# -[ ] bp handler not replaced!?!?
# breakpoint()
# import pdbp; pdbp.set_trace()
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(
reg_addr: tuple[str, int],
debug_mode: bool,
):
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
aio_raise_err=True, aio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
@ -502,23 +728,24 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
) as excinfo: ) as excinfo:
trio.run(main) trio.run(main)
excinfo.value.boxed_type == Exception excinfo.value.boxed_type is Exception
@tractor.context async def aio_echo_server(
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
async def aio_echo_server(
to_trio: trio.MemorySendChannel, to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
) -> None: ) -> None:
to_trio.send_nowait('start') to_trio.send_nowait('start')
while True: while True:
try:
msg = await from_trio.get() msg = await from_trio.get()
except to_asyncio.TrioTaskExited:
print(
'breaking aio echo loop due to `trio` exit!'
)
break
# echo the msg back # echo the msg back
to_trio.send_nowait(msg) to_trio.send_nowait(msg)
@ -531,15 +758,19 @@ async def trio_to_aio_echo_server(
print('exiting asyncio task') print('exiting asyncio task')
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context|None,
):
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
aio_echo_server, aio_echo_server,
) as (first, chan): ) as (first, chan):
assert first == 'start' assert first == 'start'
await ctx.started(first) await ctx.started(first)
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream: async for msg in stream:
print(f'asyncio echoing {msg}') print(f'asyncio echoing {msg}')
await chan.send(msg) await chan.send(msg)
@ -563,13 +794,15 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format, ids='raise_error={}'.format,
) )
def test_echoserver_detailed_mechanics( def test_echoserver_detailed_mechanics(
reg_addr, reg_addr: tuple[str, int],
debug_mode: bool,
raise_error_mid_stream, raise_error_mid_stream,
): ):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
p = await n.start_actor( debug_mode=debug_mode,
) as an:
p = await an.start_actor(
'aio_server', 'aio_server',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
@ -618,6 +851,243 @@ def test_echoserver_detailed_mechanics(
trio.run(main) trio.run(main)
@tractor.context
async def manage_file(
ctx: tractor.Context,
tmp_path_str: str,
send_sigint_to: str,
trio_side_is_shielded: bool = True,
bg_aio_task: bool = False,
):
'''
Start an `asyncio` task that just sleeps after registering a context
with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree
and ensure the stack is closed in the infected mode child.
To verify the teardown state just write a tmpfile to the `testdir`
and delete it on actor close.
'''
tmp_path: Path = Path(tmp_path_str)
tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file'
# create a the tmp file and tell the parent where it's at
assert not tmp_file.is_file()
tmp_file.touch()
stack: ExitStack = current_actor().lifetime_stack
stack.callback(tmp_file.unlink)
await ctx.started((
str(tmp_file),
os.getpid(),
))
# expect to be cancelled from here!
try:
# NOTE: turns out you don't even need to sched an aio task
# since the original issue, even though seemingly was due to
# the guest-run being abandoned + a `._debug.pause()` inside
# `._runtime._async_main()` (which was originally trying to
# debug the `.lifetime_stack` not closing), IS NOT actually
# the core issue?
#
# further notes:
#
# - `trio` only issues the " RuntimeWarning: Trio guest run
# got abandoned without properly finishing... weird stuff
# might happen" IFF you DO run a asyncio task here, BUT
# - the original issue of the `.lifetime_stack` not closing
# will still happen even if you don't run an `asyncio` task
# here even though the "abandon" messgage won't be shown..
#
# => ????? honestly i'm lost but it seems to be some issue
# with `asyncio` and SIGINT..
#
# honestly, this REALLY reminds me why i haven't used
# `asyncio` by choice in years.. XD
#
async with trio.open_nursery() as tn:
if bg_aio_task:
tn.start_soon(
tractor.to_asyncio.run_task,
aio_sleep_forever,
)
# XXX don't-need/doesn't-make-a-diff right
# since we're already doing it from parent?
# if send_sigint_to == 'child':
# os.kill(
# os.getpid(),
# signal.SIGINT,
# )
# XXX spend a half sec doing shielded checkpointing to
# ensure that despite the `trio`-side task ignoring the
# SIGINT, the `asyncio` side won't abandon the guest-run!
if trio_side_is_shielded:
with trio.CancelScope(shield=True):
for i in range(5):
await trio.sleep(0.1)
await trio.sleep_forever()
# signalled manually at the OS level (aka KBI) by the parent actor.
except KeyboardInterrupt:
print('child raised KBI..')
assert tmp_file.exists()
raise
raise RuntimeError('shoulda received a KBI?')
@pytest.mark.parametrize(
'trio_side_is_shielded',
[
False,
True,
],
ids=[
'trio_side_no_shielding',
'trio_side_does_shielded_work',
],
)
@pytest.mark.parametrize(
'send_sigint_to',
[
'child',
'parent',
],
ids='send_SIGINT_to={}'.format,
)
@pytest.mark.parametrize(
'bg_aio_task',
[
False,
# NOTE: (and see notes in `manage_file()` above as well) if
# we FOR SURE SPAWN AN AIO TASK in the child it seems the
# "silent-abandon" case (as is described in detail in
# `to_asyncio.run_as_asyncio_guest()`) does not happen and
# `asyncio`'s loop will at least abandon the `trio` side
# loudly? .. prolly the state-spot to start looking for
# a soln that results in NO ABANDONMENT.. XD
True,
],
ids=[
'bg_aio_task',
'just_trio_slee',
],
)
@pytest.mark.parametrize(
'wait_for_ctx',
[
False,
True,
],
ids=[
'raise_KBI_in_rent',
'wait_for_ctx',
],
)
def test_sigint_closes_lifetime_stack(
tmp_path: Path,
wait_for_ctx: bool,
bg_aio_task: bool,
trio_side_is_shielded: bool,
debug_mode: bool,
send_sigint_to: str,
):
'''
Ensure that an infected child can use the `Actor.lifetime_stack`
to make a file on boot and it's automatically cleaned up by the
actor-lifetime-linked exit stack closure.
'''
async def main():
delay = 999 if tractor.debug_mode() else 1
try:
an: tractor.ActorNursery
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
p: tractor.Portal = await an.start_actor(
'file_mngr',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
manage_file,
tmp_path_str=str(tmp_path),
send_sigint_to=send_sigint_to,
bg_aio_task=bg_aio_task,
trio_side_is_shielded=trio_side_is_shielded,
) as (ctx, first):
path_str, cpid = first
tmp_file: Path = Path(path_str)
assert tmp_file.exists()
# XXX originally to simulate what (hopefully)
# the below now triggers.. had to manually
# trigger a SIGINT from a ctl-c in the root.
# await trio.sleep_forever()
# XXX NOTE XXX signal infected-`asyncio` child to
# OS-cancel with SIGINT; this should trigger the
# bad `asyncio` cancel behaviour that can cause
# a guest-run abandon as was seen causing
# shm-buffer leaks in `piker`'s live quote stream
# susbys!
#
await trio.sleep(.2)
pid: int = (
cpid if send_sigint_to == 'child'
else os.getpid()
)
os.kill(
pid,
signal.SIGINT,
)
# XXX CASE 1: without the bug fixed, in
# the non-KBI-raised-in-parent case, this
# timeout should trigger!
if wait_for_ctx:
print('waiting for ctx outcome in parent..')
try:
with trio.fail_after(1 + delay):
await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid
raise
# XXX CASE 2: this seems to be the source of the
# original issue which exhibited BEFORE we put
# a `Actor.cancel_soon()` inside
# `run_as_asyncio_guest()`..
else:
raise KeyboardInterrupt
pytest.fail('should have raised some kinda error?!?')
except (
KeyboardInterrupt,
ContextCancelled,
):
# XXX CASE 2: without the bug fixed, in the
# KBI-raised-in-parent case, the actor teardown should
# never get run (silently abaondoned by `asyncio`..) and
# thus the file should leak!
assert not tmp_file.exists()
assert ctx.maybe_error
trio.run(main)
# TODO: debug_mode tests once we get support for `asyncio`! # TODO: debug_mode tests once we get support for `asyncio`!
# #
# -[ ] need tests to wrap both scripts: # -[ ] need tests to wrap both scripts:

View File

@ -170,7 +170,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
trio.run(main) trio.run(main)
rae = excinfo.value rae = excinfo.value
assert rae.boxed_type == TypeError assert rae.boxed_type is TypeError
@tractor.context @tractor.context

View File

@ -0,0 +1,248 @@
'''
Special attention cases for using "infect `asyncio`" mode from a root
actor; i.e. not using a std `trio.run()` bootstrap.
'''
import asyncio
from functools import partial
import pytest
import trio
import tractor
from tractor import (
to_asyncio,
)
from tests.test_infected_asyncio import (
aio_echo_server,
)
@pytest.mark.parametrize(
'raise_error_mid_stream',
[
False,
Exception,
KeyboardInterrupt,
],
ids='raise_error={}'.format,
)
def test_infected_root_actor(
raise_error_mid_stream: bool|Exception,
# conftest wide
loglevel: str,
debug_mode: bool,
):
'''
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
in the root actor.
'''
async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999):
first: str
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan),
):
assert first == 'start'
for i in range(1000):
await chan.send(i)
out = await chan.receive()
assert out == i
print(f'asyncio echoing {i}')
if (
raise_error_mid_stream
and
i == 500
):
raise raise_error_mid_stream
if out is None:
try:
out = await chan.receive()
except trio.EndOfChannel:
break
else:
raise RuntimeError(
'aio channel never stopped?'
)
if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream):
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
else:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
async def sync_and_err(
# just signature placeholders for compat with
# ``to_asyncio.open_channel_from()``
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
ev: asyncio.Event,
):
if to_trio:
to_trio.send_nowait('start')
await ev.wait()
raise RuntimeError('asyncio-side')
@pytest.mark.parametrize(
'aio_err_trigger',
[
'before_start_point',
'after_trio_task_starts',
'after_start_point',
],
ids='aio_err_triggered={}'.format
)
def test_trio_prestarted_task_bubbles(
aio_err_trigger: str,
# conftest wide
loglevel: str,
debug_mode: bool,
):
async def pre_started_err(
raise_err: bool = False,
pre_sleep: float|None = None,
aio_trigger: asyncio.Event|None = None,
task_status=trio.TASK_STATUS_IGNORED,
):
'''
Maybe pre-started error then sleep.
'''
if pre_sleep is not None:
print(f'Sleeping from trio for {pre_sleep!r}s !')
await trio.sleep(pre_sleep)
# signal aio-task to raise JUST AFTER this task
# starts but has not yet `.started()`
if aio_trigger:
print('Signalling aio-task to raise from `trio`!!')
aio_trigger.set()
if raise_err:
print('Raising from trio!')
raise TypeError('trio-side')
task_status.started()
await trio.sleep_forever()
async def _trio_main():
# with trio.fail_after(2):
with trio.fail_after(999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
async with (
tractor.open_root_actor(
debug_mode=False,
loglevel=loglevel,
),
):
# TODO, tests for this with 3.13 egs?
# from tractor.devx import open_crash_handler
# with open_crash_handler():
async with (
# where we'll start a sub-task that errors BEFORE
# calling `.started()` such that the error should
# bubble before the guest run terminates!
trio.open_nursery() as tn,
# THEN start an infect task which should error just
# after the trio-side's task does.
to_asyncio.open_channel_from(
partial(
sync_and_err,
ev=aio_ev,
)
) as (first, chan),
):
for i in range(5):
pre_sleep: float|None = None
last_iter: bool = (i == 4)
# TODO, missing cases?
# -[ ] error as well on
# 'after_start_point' case as well for
# another case?
raise_err: bool = False
if last_iter:
raise_err: bool = True
# trigger aio task to error on next loop
# tick/checkpoint
if aio_err_trigger == 'before_start_point':
aio_ev.set()
pre_sleep: float = 0
await tn.start(
pre_started_err,
raise_err,
pre_sleep,
(aio_ev if (
aio_err_trigger == 'after_trio_task_starts'
and
last_iter
) else None
),
)
if (
aio_err_trigger == 'after_start_point'
and
last_iter
):
aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
assert len(errs := rest_eg.exceptions) == 1
typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
assert len(rtes := rte_eg.exceptions) == 1
assert 'asyncio-side' in rtes[0].args[0]

View File

@ -271,7 +271,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
# the faster subtask was cancelled # the faster subtask was cancelled
break break
# await tractor.breakpoint() # await tractor.pause()
# await stream.receive() # await stream.receive()
print(f'final value: {value}') print(f'final value: {value}')

View File

@ -3,6 +3,10 @@ Reminders for oddities in `trio` that we need to stay aware of and/or
want to see changed. want to see changed.
''' '''
from contextlib import (
asynccontextmanager as acm,
)
import pytest import pytest
import trio import trio
from trio import TaskStatus from trio import TaskStatus
@ -80,3 +84,115 @@ def test_stashed_child_nursery(use_start_soon):
with pytest.raises(NameError): with pytest.raises(NameError):
trio.run(main) trio.run(main)
@pytest.mark.parametrize(
('unmask_from_canc', 'canc_from_finally'),
[
(True, False),
(True, True),
pytest.param(False, True,
marks=pytest.mark.xfail(reason="never raises!")
),
],
# TODO, ask ronny how to impl this .. XD
# ids='unmask_from_canc={0}, canc_from_finally={1}',#.format,
)
def test_acm_embedded_nursery_propagates_enter_err(
canc_from_finally: bool,
unmask_from_canc: bool,
):
'''
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
`.__context__` field when a user (by accident) re-raises from a `finally:`.
'''
import tractor
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery,
unmask_from: BaseException|None = trio.Cancelled
# TODO, maybe offer a collection?
# unmask_from: set[BaseException] = {
# trio.Cancelled,
# },
):
if not unmask_from:
yield
return
try:
yield
except* unmask_from as be_eg:
# TODO, if we offer `unmask_from: set`
# for masker_exc_type in unmask_from:
matches, rest = be_eg.split(unmask_from)
if not matches:
raise
for exc_match in be_eg.exceptions:
if (
(exc_ctx := exc_match.__context__)
and
type(exc_ctx) not in {
# trio.Cancelled, # always by default?
unmask_from,
}
):
exc_ctx.add_note(
f'\n'
f'WARNING: the above error was masked by a {unmask_from!r} !?!\n'
f'Are you always cancelling? Say from a `finally:` ?\n\n'
f'{tn!r}'
)
raise exc_ctx from exc_match
@acm
async def wraps_tn_that_always_cancels():
async with (
trio.open_nursery() as tn,
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=(
trio.Cancelled
if unmask_from_canc
else None
),
)
):
try:
yield tn
finally:
if canc_from_finally:
tn.cancel_scope.cancel()
await trio.lowlevel.checkpoint()
async def _main():
with tractor.devx.open_crash_handler() as bxerr:
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,
):
assert not tn.cancel_scope.cancel_called
assert 0
assert (
(err := bxerr.value)
and
type(err) is AssertionError
)
with pytest.raises(ExceptionGroup) as excinfo:
trio.run(_main)
eg: ExceptionGroup = excinfo.value
assert_eg, rest_eg = eg.split(AssertionError)
assert len(assert_eg.exceptions) == 1

View File

@ -1703,15 +1703,28 @@ class Context:
# TODO: expose as mod func instead! # TODO: expose as mod func instead!
structfmt = pretty_struct.Struct.pformat structfmt = pretty_struct.Struct.pformat
if self._in_overrun: if self._in_overrun:
log.warning( report: str = (
f'Queueing OVERRUN msg on caller task:\n\n'
f'{flow_body}' f'{flow_body}'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
over_q: deque = self._overflow_q
self._overflow_q.append(msg) self._overflow_q.append(msg)
if len(over_q) == over_q.maxlen:
report = (
'FAILED to queue OVERRUN msg, OVERAN the OVERRUN QUEUE !!\n\n'
+ report
)
# log.error(report)
log.debug(report)
else:
report = (
'Queueing OVERRUN msg on caller task:\n\n'
+ report
)
log.debug(report)
# XXX NOTE XXX # XXX NOTE XXX
# overrun is the ONLY case where returning early is fine! # overrun is the ONLY case where returning early is fine!
return False return False

View File

@ -20,6 +20,7 @@ Sub-process entry points.
""" """
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
import multiprocessing as mp
import os import os
import textwrap import textwrap
from typing import ( from typing import (
@ -64,20 +65,22 @@ def _mp_main(
''' '''
actor._forkserver_info = forkserver_info actor._forkserver_info = forkserver_info
from ._spawn import try_set_start_method from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method) spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method)
assert spawn_ctx
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f'Setting loglevel for {actor.uid} to {actor.loglevel}'
)
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
assert spawn_ctx # TODO: use scops headers like for `trio` below!
# (well after we libify it maybe..)
log.info( log.info(
f"Started new {spawn_ctx.current_process()} for {actor.uid}") f'Started new {spawn_ctx.current_process()} for {actor.uid}'
# f"parent_addr is {parent_addr}"
_state._current_actor = actor )
_state._current_actor: Actor = actor
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial( trio_main = partial(
async_main, async_main,
actor=actor, actor=actor,
@ -94,7 +97,9 @@ def _mp_main(
pass # handle it the same way trio does? pass # handle it the same way trio does?
finally: finally:
log.info(f"Subactor {actor.uid} terminated") log.info(
f'`mp`-subactor {actor.uid} exited'
)
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually # TODO: move this func to some kinda `.devx._conc_lang.py` eventually

View File

@ -82,6 +82,48 @@ class InternalError(RuntimeError):
''' '''
class AsyncioCancelled(Exception):

This is the main error-translation-semantics that changed, more or less being more pedantic about which side errored/cancelled/exited-gracefully and whether it was independent of the other side.

This is the main error-translation-semantics that changed, more or less being more pedantic about which side errored/cancelled/exited-gracefully and whether it was independent of the other side.
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
class AsyncioTaskExited(Exception):
'''
asyncio.Task "exited" translation error for use with the
`to_asyncio` APIs to be raised in the `trio` side task indicating
on `.run_task()`/`.open_channel_from()` exit that the aio side
exited early/silently.
'''
class TrioCancelled(Exception):
'''
Trio cancelled translation (non-base) error
for use with the `to_asyncio` module
to be raised in the `asyncio.Task` to indicate
that the `trio` side raised `Cancelled` or an error.
'''
class TrioTaskExited(Exception):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
This is very similar to how `trio.ClosedResource` acts as
a "clean shutdown" signal to the consumer side of a mem-chan,
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
'''
# NOTE: more or less should be close to these: # NOTE: more or less should be close to these:
# 'boxed_type', # 'boxed_type',
@ -127,8 +169,8 @@ _body_fields: list[str] = list(
def get_err_type(type_name: str) -> BaseException|None: def get_err_type(type_name: str) -> BaseException|None:
''' '''
Look up an exception type by name from the set of locally Look up an exception type by name from the set of locally known
known namespaces: namespaces:
- `builtins` - `builtins`
- `tractor._exceptions` - `tractor._exceptions`
@ -358,6 +400,13 @@ class RemoteActorError(Exception):
self._ipc_msg.src_type_str self._ipc_msg.src_type_str
) )
if not self._src_type:
raise TypeError(
f'Failed to lookup src error type with '
f'`tractor._exceptions.get_err_type()` :\n'
f'{self.src_type_str}'
)
return self._src_type return self._src_type
@property @property
@ -366,6 +415,9 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type. String-name of the (last hop's) boxed error type.
''' '''
# TODO, maybe support also serializing the
# `ExceptionGroup.exeptions: list[BaseException]` set under
# certain conditions?
bt: Type[BaseException] = self.boxed_type bt: Type[BaseException] = self.boxed_type
if bt: if bt:
return str(bt.__name__) return str(bt.__name__)
@ -609,6 +661,7 @@ class RemoteActorError(Exception):
# just after <Type( # just after <Type(
# |___ .. # |___ ..
tb_body_indent=1, tb_body_indent=1,
boxer_header=self.relay_uid,
) )
tail = '' tail = ''
@ -651,16 +704,10 @@ class RemoteActorError(Exception):
failing actor's remote env. failing actor's remote env.
''' '''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder # TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends: # metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611 # https://github.com/python-trio/trio/issues/611
src_type_ref: Type[BaseException] = self.src_type
return src_type_ref(self.tb_str) return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given # TODO: local recontruction of nested inception for a given
@ -786,8 +833,11 @@ class MsgTypeError(
''' '''
if ( if (
(_bad_msg := self.msgdata.get('_bad_msg')) (_bad_msg := self.msgdata.get('_bad_msg'))
and and (
isinstance(_bad_msg, PayloadMsg) isinstance(_bad_msg, PayloadMsg)
or
isinstance(_bad_msg, msgtypes.Start)
)
): ):
return _bad_msg return _bad_msg
@ -973,15 +1023,6 @@ class NoRuntime(RuntimeError):
"The root actor has not been initialized yet" "The root actor has not been initialized yet"
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
'''
class MessagingError(Exception): class MessagingError(Exception):
''' '''
IPC related msg (typing), transaction (ordering) or dialog IPC related msg (typing), transaction (ordering) or dialog
@ -989,7 +1030,6 @@ class MessagingError(Exception):
''' '''
def pack_error( def pack_error(
exc: BaseException|RemoteActorError, exc: BaseException|RemoteActorError,
@ -1143,19 +1183,51 @@ def unpack_error(
def is_multi_cancelled( def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup exc: BaseException|BaseExceptionGroup,
) -> bool:
ignore_nested: set[BaseException] = set(),
) -> bool|BaseExceptionGroup:
''' '''
Predicate to determine if a possible ``BaseExceptionGroup`` contains Predicate to determine if an `BaseExceptionGroup` only contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of some (maybe nested) set of sub-grouped exceptions (like only
cancelling a collection of subtasks. `trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
''' '''
if (
not ignore_nested
or
trio.Cancelled in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(exc, BaseExceptionGroup): if isinstance(exc, BaseExceptionGroup):
return exc.subgroup( matched_exc: BaseExceptionGroup|None = exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled) tuple(ignore_nested),
) is not None
# TODO, complain about why not allowed XD
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False return False
@ -1375,7 +1447,9 @@ def _mk_recv_mte(
any_pld: Any = msgpack.decode(msg.pld) any_pld: Any = msgpack.decode(msg.pld)
message: str = ( message: str = (
f'invalid `{msg_type.__qualname__}` msg payload\n\n' f'invalid `{msg_type.__qualname__}` msg payload\n\n'
f'value: `{any_pld!r}` does not match type-spec: ' f'{any_pld!r}\n\n'
f'has type {type(any_pld)!r}\n\n'
f'and does not match type-spec '
f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`' f'`{type(msg).__qualname__}.pld: {codec.pld_spec_str}`'
) )
bad_msg = msg bad_msg = msg

View File

@ -80,7 +80,7 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False, enable_stack_on_sig: bool = False,
# internal logging # internal logging
@ -95,6 +95,17 @@ async def open_root_actor(
hide_tb: bool = True, hide_tb: bool = True,
# XXX, proxied directly to `.devx._debug._maybe_enter_pm()`
# for REPL-entry logic.
debug_filter: Callable[
[BaseException|BaseExceptionGroup],
bool,
] = lambda err: not is_multi_cancelled(err),
# TODO, a way for actors to augment passing derived
# read-only state to sublayers?
# extra_rt_vars: dict|None = None,
) -> Actor: ) -> Actor:
''' '''
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
@ -233,14 +244,8 @@ async def open_root_actor(
and and
enable_stack_on_sig enable_stack_on_sig
): ):
try: from .devx._stackscope import enable_stack_on_sig
logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig
enable_stack_on_sig() enable_stack_on_sig()
except ImportError:
logger.warning(
'`stackscope` not installed for use in debug mode!'
)
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = [] ponged_addrs: list[tuple[str, int]] = []
@ -336,6 +341,10 @@ async def open_root_actor(
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
) )
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# Start up main task set via core actor-runtime nurseries. # Start up main task set via core actor-runtime nurseries.
try: try:
@ -377,6 +386,7 @@ async def open_root_actor(
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
# XXX NOTE XXX see equiv note inside # XXX NOTE XXX see equiv note inside
# `._runtime.Actor._stream_handler()` where in the # `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we # non-root or root-that-opened-this-mahually case we
@ -385,11 +395,15 @@ async def open_root_actor(
entered: bool = await _debug._maybe_enter_pm( entered: bool = await _debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
debug_filter=debug_filter,
) )
if ( if (
not entered not entered
and and
not is_multi_cancelled(err) not is_multi_cancelled(
err,
)
): ):
logger.exception('Root actor crashed\n') logger.exception('Root actor crashed\n')

View File

@ -59,6 +59,7 @@ from types import ModuleType
import warnings import warnings
import trio import trio
from trio._core import _run as trio_runtime
from trio import ( from trio import (
CancelScope, CancelScope,
Nursery, Nursery,
@ -80,6 +81,7 @@ from ._context import (
from .log import get_logger from .log import get_logger
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
InternalError,
ModuleNotExposed, ModuleNotExposed,
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
@ -98,6 +100,7 @@ from ._rpc import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._supervise import ActorNursery from ._supervise import ActorNursery
from trio._channel import MemoryChannelState
log = get_logger('tractor') log = get_logger('tractor')
@ -896,11 +899,15 @@ class Actor:
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid:{cid}\n' f'cid:{cid}\n'
) )
ctx._allow_overruns = allow_overruns ctx._allow_overruns: bool = allow_overruns
# adjust buffer size if specified # adjust buffer size if specified
state = ctx._send_chan._state # type: ignore state: MemoryChannelState = ctx._send_chan._state # type: ignore
if msg_buffer_size and state.max_buffer_size != msg_buffer_size: if (
msg_buffer_size
and
state.max_buffer_size != msg_buffer_size
):
state.max_buffer_size = msg_buffer_size state.max_buffer_size = msg_buffer_size
except KeyError: except KeyError:
@ -1094,7 +1101,36 @@ class Actor:
'`tractor.pause_from_sync()` not available!' '`tractor.pause_from_sync()` not available!'
) )
rvs['_is_root'] = False # XXX ensure the "infected `asyncio` mode" setting
# passed down from our spawning parent is consistent
# with `trio`-runtime initialization:
# - during sub-proc boot, the entrypoint func
# (`._entry.<spawn_backend>_main()`) should set
# `._infected_aio = True` before calling
# `run_as_asyncio_guest()`,
# - the value of `infect_asyncio: bool = True` as
# passed to `ActorNursery.start_actor()` must be
# the same as `_runtime_vars['_is_infected_aio']`
if (
(aio_rtv := rvs['_is_infected_aio'])
!=
(aio_attr := self._infected_aio)
):
raise InternalError(
'Parent sent runtime-vars that mismatch for the '
'"infected `asyncio` mode" settings ?!?\n\n'
f'rvs["_is_infected_aio"] = {aio_rtv}\n'
f'self._infected_aio = {aio_attr}\n'
)
if aio_rtv:
assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest
# ^TODO^ possibly add a `sniffio` or
# `trio` pub-API for `is_guest_mode()`?
rvs['_is_root'] = False # obvi XD
# update process-wide globals
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
# XXX: ``msgspec`` doesn't support serializing tuples # XXX: ``msgspec`` doesn't support serializing tuples

View File

@ -44,6 +44,8 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
'_is_infected_aio': False,
# for `tractor.pause_from_sync()` & `breakpoint()` support # for `tractor.pause_from_sync()` & `breakpoint()` support
'use_greenback': False, 'use_greenback': False,
} }
@ -70,7 +72,8 @@ def current_actor(
''' '''
if ( if (
err_on_no_runtime err_on_no_runtime
and _current_actor is None and
_current_actor is None
): ):
msg: str = 'No local actor has been initialized yet?\n' msg: str = 'No local actor has been initialized yet?\n'
from ._exceptions import NoRuntime from ._exceptions import NoRuntime

View File

@ -158,6 +158,7 @@ class ActorNursery:
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
_rtv['_is_infected_aio'] = infect_asyncio
# allow setting debug policy per actor # allow setting debug policy per actor
if debug_mode is not None: if debug_mode is not None:

View File

@ -54,6 +54,25 @@ def examples_dir() -> pathlib.Path:
return repodir() / 'examples' return repodir() / 'examples'
def mk_cmd(
ex_name: str,
exs_subpath: str = 'debugging',
) -> str:
'''
Generate a shell command suitable to pass to ``pexpect.spawn()``.
'''
script_path: pathlib.Path = (
examples_dir()
/ exs_subpath
/ f'{ex_name}.py'
)
return ' '.join([
'python',
str(script_path)
])
@acm @acm
async def expect_ctxc( async def expect_ctxc(
yay: bool, yay: bool,

View File

@ -26,7 +26,7 @@ from ._debug import (
breakpoint as breakpoint, breakpoint as breakpoint,
pause as pause, pause as pause,
pause_from_sync as pause_from_sync, pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler, sigint_shield as sigint_shield,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback, maybe_init_greenback as maybe_init_greenback,

File diff suppressed because it is too large Load Diff

View File

@ -234,7 +234,7 @@ def find_caller_info(
_frame2callerinfo_cache: dict[FrameType, CallerInfo] = {} _frame2callerinfo_cache: dict[FrameType, CallerInfo] = {}
# TODO: -[x] move all this into new `.devx._code`! # TODO: -[x] move all this into new `.devx._frame_stack`!
# -[ ] consider rename to _callstack? # -[ ] consider rename to _callstack?
# -[ ] prolly create a `@runtime_api` dec? # -[ ] prolly create a `@runtime_api` dec?
# |_ @api_frame seems better? # |_ @api_frame seems better?
@ -286,3 +286,18 @@ def api_frame(
wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache wrapped._call_infos: dict[FrameType, CallerInfo] = _frame2callerinfo_cache
wrapped.__api_func__: bool = True wrapped.__api_func__: bool = True
return wrapper(wrapped) return wrapper(wrapped)
# TODO: something like this instead of the adhoc frame-unhiding
# blocks all over the runtime!! XD
# -[ ] ideally we can expect a certain error (set) and if something
# else is raised then all frames below the wrapped one will be
# un-hidden via `__tracebackhide__: bool = False`.
# |_ might need to dynamically mutate the code objs like
# `pdbp.hideframe()` does?
# -[ ] use this as a `@acm` decorator as introed in 3.10?
# @acm
# async def unhide_frame_when_not(
# error_set: set[BaseException],
# ) -> TracebackType:
# ...

View File

@ -24,19 +24,32 @@ disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations from __future__ import annotations
# from functools import partial
from threading import (
current_thread,
Thread,
RLock,
)
import multiprocessing as mp import multiprocessing as mp
from signal import ( from signal import (
signal, signal,
getsignal,
SIGUSR1, SIGUSR1,
SIGINT,
)
# import traceback
from types import ModuleType
from typing import (
Callable,
TYPE_CHECKING,
) )
import traceback
from typing import TYPE_CHECKING
import trio import trio
from tractor import ( from tractor import (
_state, _state,
log as logmod, log as logmod,
) )
from tractor.devx import _debug
log = logmod.get_logger(__name__) log = logmod.get_logger(__name__)
@ -51,26 +64,68 @@ if TYPE_CHECKING:
@trio.lowlevel.disable_ki_protection @trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None: def dump_task_tree() -> None:
import stackscope '''
from tractor.log import get_console_log Do a classic `stackscope.extract()` task-tree dump to console at
`.devx()` level.
'''
import stackscope
tree_str: str = str( tree_str: str = str(
stackscope.extract( stackscope.extract(
trio.lowlevel.current_root_task(), trio.lowlevel.current_root_task(),
recurse_child_tasks=True recurse_child_tasks=True
) )
) )
log = get_console_log(
name=__name__,
level='cancel',
)
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
thr: Thread = current_thread()
current_sigint_handler: Callable = getsignal(SIGINT)
if (
current_sigint_handler
is not
_debug.DebugStatus._trio_handler
):
sigint_handler_report: str = (
'The default `trio` SIGINT handler was replaced?!'
)
else:
sigint_handler_report: str = (
'The default `trio` SIGINT handler is in use?!'
)
# sclang symbology
# |_<object>
# |_(Task/Thread/Process/Actor
# |_{Supervisor/Scope
# |_[Storage/Memory/IPC-Stream/Data-Struct
log.devx( log.devx(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n' f'(>: {actor.uid!r}\n'
f' |_{mp.current_process()}\n\n' f' |_{mp.current_process()}\n'
f'{tree_str}\n' f' |_{thr}\n'
f' |_{actor}\n'
f'\n'
f'{sigint_handler_report}\n'
f'signal.getsignal(SIGINT) -> {current_sigint_handler!r}\n'
# f'\n'
# start-of-trace-tree delimiter (mostly for testing)
# f'------ {actor.uid!r} ------\n'
f'\n'
f'------ start-of-{actor.uid!r} ------\n'
f'|\n'
f'{tree_str}'
# end-of-trace-tree delimiter (mostly for testing)
f'|\n'
f'|_____ end-of-{actor.uid!r} ______\n'
) )
# TODO: can remove this right?
# -[ ] was original code from author
#
# print(
# 'DUMPING FROM PRINT\n'
# +
# content
# )
# import logging # import logging
# try: # try:
# with open("/dev/tty", "w") as tty: # with open("/dev/tty", "w") as tty:
@ -80,58 +135,130 @@ def dump_task_tree() -> None:
# "task_tree" # "task_tree"
# ).exception("Error printing task tree") # ).exception("Error printing task tree")
_handler_lock = RLock()
_tree_dumped: bool = False
def signal_handler(
def dump_tree_on_sig(
sig: int, sig: int,
frame: object, frame: object,
relay_to_subs: bool = True, relay_to_subs: bool = True,
) -> None: ) -> None:
global _tree_dumped, _handler_lock
with _handler_lock:
# if _tree_dumped:
# log.warning(
# 'Already dumped for this actor...??'
# )
# return
_tree_dumped = True
# actor: Actor = _state.current_actor()
log.devx(
'Trying to dump `stackscope` tree..\n'
)
try: try:
trio.lowlevel.current_trio_token( dump_task_tree()
).run_sync_soon(dump_task_tree) # await actor._service_n.start_soon(
# partial(
# trio.to_thread.run_sync,
# dump_task_tree,
# )
# )
# trio.lowlevel.current_trio_token().run_sync_soon(
# dump_task_tree
# )
except RuntimeError: except RuntimeError:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
# not in async context -- print a normal traceback # not in async context -- print a normal traceback
traceback.print_stack() # traceback.print_stack()
raise
except BaseException:
log.exception(
'Failed to dump `stackscope` tree..\n'
)
raise
# log.devx(
# 'Supposedly we dumped just fine..?'
# )
if not relay_to_subs: if not relay_to_subs:
return return
an: ActorNursery an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values(): for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType subproc: ProcessType
subactor: Actor subactor: Actor
for subactor, subproc, _ in an._children.values(): for subactor, subproc, _ in an._children.values():
log.devx( log.warning(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n' f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n' f'{subactor}\n'
f' |_{subproc}\n' f' |_{subproc}\n'
) )
if isinstance(subproc, trio.Process): # bc of course stdlib can't have a std API.. XD
match subproc:
case trio.Process():
subproc.send_signal(sig) subproc.send_signal(sig)
elif isinstance(subproc, mp.Process): case mp.Process():
subproc._send_signal(sig) subproc._send_signal(sig)
def enable_stack_on_sig( def enable_stack_on_sig(
sig: int = SIGUSR1 sig: int = SIGUSR1,
) -> None: ) -> ModuleType:
''' '''
Enable `stackscope` tracing on reception of a signal; by Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1. default this is SIGUSR1.
HOT TIP: a task/ctx-tree dump can be triggered from a shell with
fancy cmds.
For ex. from `bash` using `pgrep` and cmd-sustitution
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use:
>> kill -SIGUSR1 $(pgrep -f <part-of-cmd: str>)
OR without a sub-shell,
>> pkill --signal SIGUSR1 -f <part-of-cmd: str>
''' '''
try:
import stackscope
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
return None
handler: Callable|int = getsignal(sig)
if handler is dump_tree_on_sig:
log.devx(
'A `SIGUSR1` handler already exists?\n'
f'|_ {handler!r}\n'
)
return
signal( signal(
sig, sig,
signal_handler, dump_tree_on_sig,
) )
# NOTE: not the above can be triggered from log.devx(
# a (xonsh) shell using: 'Enabling trace-trees on `SIGUSR1` '
# kill -SIGUSR1 @$(pgrep -f '<cmd>') 'since `stackscope` is installed @ \n'
# f'{stackscope!r}\n\n'
# for example if you were looking to trace a `pytest` run f'With `SIGUSR1` handler\n'
# kill -SIGUSR1 @$(pgrep -f 'pytest') f'|_{dump_tree_on_sig}\n'
)
return stackscope

View File

@ -53,6 +53,7 @@ def pformat_boxed_tb(
tb_box_indent: int|None = None, tb_box_indent: int|None = None,
tb_body_indent: int = 1, tb_body_indent: int = 1,
boxer_header: str = '-'
) -> str: ) -> str:
''' '''
@ -88,10 +89,10 @@ def pformat_boxed_tb(
tb_box: str = ( tb_box: str = (
f'|\n' f'|\n'
f' ------ - ------\n' f' ------ {boxer_header} ------\n'
f'{tb_body}' f'{tb_body}'
f' ------ - ------\n' f' ------ {boxer_header}- ------\n'
f'_|\n' f'_|'
) )
tb_box_indent: str = ( tb_box_indent: str = (
tb_box_indent tb_box_indent

View File

@ -258,20 +258,28 @@ class ActorContextInfo(Mapping):
def get_logger( def get_logger(
name: str|None = None,
name: str | None = None,
_root_name: str = _proj_name, _root_name: str = _proj_name,
logger: Logger|None = None,
# TODO, using `.config.dictConfig()` api?
# -[ ] SO answer with docs links
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
subsys_spec: str|None = None,
) -> StackLevelAdapter: ) -> StackLevelAdapter:
'''Return the package log or a sub-logger for ``name`` if provided. '''Return the package log or a sub-logger for ``name`` if provided.
''' '''
log: Logger log: Logger
log = rlog = logging.getLogger(_root_name) log = rlog = logger or logging.getLogger(_root_name)
if ( if (
name name
and name != _proj_name and
name != _proj_name
): ):
# NOTE: for handling for modules that use ``get_logger(__name__)`` # NOTE: for handling for modules that use ``get_logger(__name__)``
@ -283,7 +291,7 @@ def get_logger(
# since in python the {filename} is always this same # since in python the {filename} is always this same
# module-file. # module-file.
sub_name: None | str = None sub_name: None|str = None
rname, _, sub_name = name.partition('.') rname, _, sub_name = name.partition('.')
pkgpath, _, modfilename = sub_name.rpartition('.') pkgpath, _, modfilename = sub_name.rpartition('.')
@ -306,7 +314,10 @@ def get_logger(
# add our actor-task aware adapter which will dynamically look up # add our actor-task aware adapter which will dynamically look up
# the actor and task names at each log emit # the actor and task names at each log emit
logger = StackLevelAdapter(log, ActorContextInfo()) logger = StackLevelAdapter(
log,
ActorContextInfo(),
)
# additional levels # additional levels
for name, val in CUSTOM_LEVELS.items(): for name, val in CUSTOM_LEVELS.items():
@ -319,15 +330,25 @@ def get_logger(
def get_console_log( def get_console_log(
level: str | None = None, level: str|None = None,
logger: Logger|None = None,
**kwargs, **kwargs,
) -> LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it. ) -> LoggerAdapter:
''' '''
log = get_logger(**kwargs) # our root logger Get a `tractor`-style logging instance: a `Logger` wrapped in
logger = log.logger a `StackLevelAdapter` which injects various concurrency-primitive
(process, thread, task) fields and enables a `StreamHandler` that
writes on stderr using `colorlog` formatting.
Yeah yeah, i know we can use `logging.config.dictConfig()`. You do it.
'''
log = get_logger(
logger=logger,
**kwargs
) # set a root logger
logger: Logger = log.logger
if not level: if not level:
return log return log
@ -346,9 +367,13 @@ def get_console_log(
None, None,
) )
): ):
fmt = LOG_FORMAT
# if logger:
# fmt = None
handler = StreamHandler() handler = StreamHandler()
formatter = colorlog.ColoredFormatter( formatter = colorlog.ColoredFormatter(
LOG_FORMAT, fmt=fmt,
datefmt=DATE_FORMAT, datefmt=DATE_FORMAT,
log_colors=STD_PALETTE, log_colors=STD_PALETTE,
secondary_log_colors=BOLD_PALETTE, secondary_log_colors=BOLD_PALETTE,
@ -365,7 +390,7 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
log = get_logger('tractor') log: StackLevelAdapter = get_logger('tractor')
def at_least_level( def at_least_level(

View File

@ -41,8 +41,10 @@ import textwrap
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Protocol,
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
TypeVar,
Union, Union,
) )
from types import ModuleType from types import ModuleType
@ -181,7 +183,11 @@ def mk_dec(
dec_hook: Callable|None = None, dec_hook: Callable|None = None,
) -> MsgDec: ) -> MsgDec:
'''
Create an IPC msg decoder, normally used as the
`PayloadMsg.pld: PayloadT` field decoder inside a `PldRx`.
'''
return MsgDec( return MsgDec(
_dec=msgpack.Decoder( _dec=msgpack.Decoder(
type=spec, # like `MsgType[Any]` type=spec, # like `MsgType[Any]`
@ -227,6 +233,13 @@ def pformat_msgspec(
join_char: str = '\n', join_char: str = '\n',
) -> str: ) -> str:
'''
Pretty `str` format the `msgspec.msgpack.Decoder.type` attribute
for display in (console) log messages as a nice (maybe multiline)
presentation of all supported `Struct`s (subtypes) available for
typed decoding.
'''
dec: msgpack.Decoder = getattr(codec, 'dec', codec) dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join( return join_char.join(
mk_msgspec_table( mk_msgspec_table(
@ -630,31 +643,57 @@ def limit_msg_spec(
# # import pdbp; pdbp.set_trace() # # import pdbp; pdbp.set_trace()
# assert ext_codec.pld_spec == extended_spec # assert ext_codec.pld_spec == extended_spec
# yield ext_codec # yield ext_codec
#
# ^-TODO-^ is it impossible to make something like this orr!?
# TODO: make an auto-custom hook generator from a set of input custom
# types?
# -[ ] below is a proto design using a `TypeCodec` idea?
#
# type var for the expected interchange-lib's
# IPC-transport type when not available as a built-in
# serialization output.
WireT = TypeVar('WireT')
# TODO: make something similar to this inside `._codec` such that # TODO: some kinda (decorator) API for built-in subtypes
# user can just pass a type table of some sort? # that builds this implicitly by inspecting the `mro()`?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct` class TypeCodec(Protocol):
# and then call `.to_dict()` on them? '''
# -[x] we're going to need to re-impl all the stuff changed in the A per-custom-type wire-transport serialization translator
# runtime port such that it can handle dicts or `Msg`s? description type.
#
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]: '''
# ''' src_type: Type
# Deliver a `enc_hook()`/`dec_hook()` pair which does wire_type: WireT
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat def encode(obj: Type) -> WireT:
# with the original runtime implementation. ...
#
# Note: this is is/was primarly used while moving the core def decode(
# runtime over to using native `Msg`-struct types wherein we obj_type: Type[WireT],
# start with the send side emitting without loading obj: WireT,
# a typed-decoder and then later flipping the switch over to ) -> Type:
# load to the native struct types once all runtime usage has ...
# been adjusted appropriately.
#
# ''' class MsgpackTypeCodec(TypeCodec):
# return ( ...
# # enc_to_dict,
# dec_from_dict,
# ) def mk_codec_hooks(
type_codecs: list[TypeCodec],
) -> tuple[Callable, Callable]:
'''
Deliver a `enc_hook()`/`dec_hook()` pair which handle
manual convertion from an input `Type` set such that whenever
the `TypeCodec.filter()` predicate matches the
`TypeCodec.decode()` is called on the input native object by
the `dec_hook()` and whenever the
`isiinstance(obj, TypeCodec.type)` matches against an
`enc_hook(obj=obj)` the return value is taken from a
`TypeCodec.encode(obj)` callback.
'''
...

View File

@ -30,9 +30,9 @@ from msgspec import (
Struct as _Struct, Struct as _Struct,
structs, structs,
) )
from pprint import ( # from pprint import (
saferepr, # saferepr,
) # )
from tractor.log import get_logger from tractor.log import get_logger
@ -75,8 +75,8 @@ class DiffDump(UserList):
for k, left, right in self: for k, left, right in self:
repstr += ( repstr += (
f'({k},\n' f'({k},\n'
f'\t{repr(left)},\n' f' |_{repr(left)},\n'
f'\t{repr(right)},\n' f' |_{repr(right)},\n'
')\n' ')\n'
) )
repstr += ']\n' repstr += ']\n'
@ -144,15 +144,22 @@ def pformat(
field_indent=indent + field_indent, field_indent=indent + field_indent,
) )
else: # the `pprint` recursion-safe format: else:
val_str: str = repr(v)
# XXX LOL, below just seems to be f#$%in causing
# recursion errs..
#
# the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
try: # try:
val_str: str = saferepr(v) # val_str: str = saferepr(v)
except Exception: # except Exception:
log.exception( # log.exception(
'Failed to `saferepr({type(struct)})` !?\n' # 'Failed to `saferepr({type(struct)})` !?\n'
) # )
return _Struct.__repr__(struct) # raise
# return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -203,12 +210,7 @@ class Struct(
return sin_props return sin_props
pformat = pformat pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def __repr__(self) -> str: def __repr__(self) -> str:
try: try:
return pformat(self) return pformat(self)
@ -218,6 +220,13 @@ class Struct(
) )
return _Struct.__repr__(self) return _Struct.__repr__(self)
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def copy( def copy(
self, self,
update: dict | None = None, update: dict | None = None,
@ -267,13 +276,15 @@ class Struct(
fi.type(getattr(self, fi.name)), fi.type(getattr(self, fi.name)),
) )
# TODO: make a mod func instead and just point to it here for
# method impl?
def __sub__( def __sub__(
self, self,
other: Struct, other: Struct,
) -> DiffDump[tuple[str, Any, Any]]: ) -> DiffDump[tuple[str, Any, Any]]:
''' '''
Compare fields/items key-wise and return a ``DiffDump`` Compare fields/items key-wise and return a `DiffDump`
for easy visual REPL comparison B) for easy visual REPL comparison B)
''' '''
@ -290,3 +301,42 @@ class Struct(
)) ))
return diffs return diffs
@classmethod
def fields_diff(
cls,
other: dict|Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Very similar to `PrettyStruct.__sub__()` except accepts an
input `other: dict` (presumably that would normally be called
like `Struct(**other)`) which returns a `DiffDump` of the
fields of the struct and the `dict`'s fields.
'''
nullish = object()
consumed: dict = other.copy()
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(cls):
field_name: str = fi.name
# ours: Any = getattr(self, field_name)
theirs: Any = consumed.pop(field_name, nullish)
if theirs is nullish:
diffs.append((
field_name,
f'{fi.type!r}',
'NOT-DEFINED in `other: dict`',
))
# when there are lingering fields in `other` that this struct
# DOES NOT define we also append those.
if consumed:
for k, v in consumed.items():
diffs.append((
k,
f'NOT-DEFINED for `{cls.__name__}`',
f'`other: dict` has value = {v!r}',
))
return diffs

File diff suppressed because it is too large Load Diff

View File

@ -382,7 +382,7 @@ class BroadcastReceiver(ReceiveChannel):
# likely it makes sense to unwind back to the # likely it makes sense to unwind back to the
# underlying? # underlying?
# import tractor # import tractor
# await tractor.breakpoint() # await tractor.pause()
log.warning( log.warning(
f'Only one sub left for {self}?\n' f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?' 'We can probably unwind from breceiver?'