Compare commits

...

9 Commits

Author SHA1 Message Date
Tyler Goodlet e646ce5c0d Mask ctlc borked REPL tests
Namely the `tractor.pause_from_sync()` examples using both bg threads
and `asyncio` which seem to go into bad states where SIGINT is ignored..

Deats,
- add `maybe_expect_timeout()` cm to ensure the EOF hangs get
  `.xfail()`ed instead.
- @pytest.mark.ctlcs_bish` `test_pause_from_sync` and don't expect the
  greenback prompt msg.
- also mark `test_sync_pause_from_aio_task`.
2025-03-23 01:02:27 -04:00
Tyler Goodlet b6d800954a Repair/update `stackscope` test
Seems that on 3.13 it's not showing our script code in the output now?
Gotta get an example for @oremanj to see what's up but really it'd be
nice to just custom format stuff above `trio`'s runtime by def..

Anyway, update the `.devx._stackscope`,
- log formatting to be a little more "sclangy" lookin.
- change the per-actor "delimiter" lines style.
- report the `signal.getsignal(SIGINT)` which i needed in the
  `sync_bp.py` with ctl-c causing a hang..
- mask the `_tree_dumped` duplicator log report as well as the "dumped
  fine" one.
- add an example `pkill --signal SIGUSR1` cmdline.

Tweak the test to cope with,
- not showing our script lines now.. which i've commented in the
  `assert_before()` patts..
- to expect the newly formatted delimiter (ascii) lines to separate the
  root vs. hanger sub-actor sections.
2025-03-23 01:02:27 -04:00
Tyler Goodlet beb7097ab4 Add a mark to `pytest.xfail()` questionably conc py stuff (ur mam `.xfail()`s bish!) 2025-03-23 01:02:27 -04:00
Tyler Goodlet 724c22d266 Be extra sure to re-raise EoCs from translator
That is whenever `trio.EndOfChannel` is raised (presumably from the
`._to_trio.receive()` call inside `LinkedTaskChannel.receive()`) we need
to be extra certain that we let it bubble upward transparently DESPITE
special exc-as-signal handling that is normally suppressed from the aio
side; REPEAT we want to ALWAYS bubble any `trio_err ==
trio.EndOfChannel` in the `finally:` handler of `translate_aio_errors()`
despite `chan._trio_to_raise == AsyncioTaskExited` such that the
caller's iterable machinery will operate as normal when the inter-task
stream is stopped (again, presumably by the aio side task terminating
the inter-task stream).

Main impl deats for this,
- in the EoC handler block ensure we assign both `chan._trio_err` and
  the local `trio_err` as well as continue to re-raise.
- add a case to the match block in the `finally:` handler which FOR SURE
  re-raises any `type(trio_err) is EndOfChannel`!

Additionally fix a bad bug,
- a ref bug where we were NOT using the
  `except BaseException as _trio_err` to assign to `chan._trio_err` (by
  accident was missing the leading `_`..)

Unrelated impl tweak,
- move all `maybe_raise_aio_side_err()` content back to inline with its
  parent func - makes it easier to use `tractor.pause()` mostly Bp
- go back to trying to use `aio_task.set_exception(aio_taskc)` for now
  even though i'm pretty sure we're going to move to a try-fute-first
  style helper for this in the future.

Adjust some tests to match/mk-them-green,
- break from `aio_echo_server()` recv loop on
  `to_asyncio.TrioTaskExited` much like how you'd expect to (implicitly
  with a `for`) with a `trio.EndOfChannel`.
- toss in a masked `value is None` pause point i needed for debugging
  inf looping caused by not re-raising EoCs per the main patch
  description.
- add a debug-mode sized delay to root-infected test.
2025-03-23 01:02:27 -04:00
Tyler Goodlet ecd61226d8 More `debug_mode` test support, better nursery var names 2025-03-23 01:02:27 -04:00
Tyler Goodlet 69fd46e1ce Add per-side graceful-exit/cancel excs-as-signals
Such that any combination of task terminations/exits can be explicitly
handled and "dual side independent" crash cases re-raised in egs.

The main error-or-exit impl changes include,

- use of new per-side "signaling exceptions":
  - TrioTaskExited|TrioCancelled for signalling aio.
  - AsyncioTaskExited|AsyncioCancelled for signalling trio.

- NOT overloading the `LinkedTaskChannel._trio/aio_err` fields for
  err-as-signal relay and instead add a new pair of
  `._trio/aio_to_raise` maybe-exc-attrs which allow each side's
  task to specify what it would want the other side to raise to signal
  its/a termination outcome:
  - `._trio_to_raise: AsyncioTaskExited|AsyncioCancelled` to signal,
    |_ the aio task having returned while the trio side was still reading
       from the `asyncio.Queue` or is just not `.done()`.
    |_ the aio task being self or trio-request cancelled where
       a `asyncio.CancelledError` is raised and caught but NOT relayed
       as is back to trio; instead signal a "more explicit" exc type.
  - `._aio_to_raise: TrioTaskExited|TrioCancelled` to signal,
    |_ the trio task having returned while the aio side was still reading
       from the mem chan and indicating that the trio side might not
       care any more about future streamed values (like the
       `Stop/EndOfChannel` equivs for ipc `Context`s).
    |_ when the trio task canceld we do
        a `asyncio.Future.set_exception(TrioTaskExited())` to indicate
        to the aio side verbosely that it should cancel due to the trio
        parent.
  - `_aio/trio_err` are now left to only capturing the **actual**
    per-side task excs for introspection / other side's handling logic.

- supporting "graceful exits" depending on API in use from
  `translate_aio_errors()` such that if either side exits but the other
  side isn't expect to consume the final `return`ed value, we just exit
  silently, which required:
  - adding a `suppress_graceful_exits: bool` flag.
  - adjusting the `maybe_raise_aio_side_err()` logic to use that flag
    and suppress only on certain combos of `._trio_to_raise/._trio_err`.
  - prefer to raise `._trio_to_raise` when the aio-side is the src and
    vice versa.

- filling out pedantic logging for cancellation cases indicating which
  side is the cause.

- add a `LinkedTaskChannel._aio_result` modelled after our
  `Context._result` a a similar `.wait_for_result()` interface which
  allows maybe accessing the aio task's final return value if desired
  when using the `open_channel_from()` API.

- rename `cancel_trio()` done handler -> `signal_trio_when_done()`

Also some fairly major test suite updates,
- add a `delay: int` producing fixture which delivers a much larger
  timeout whenever `debug_mode` is set so that the REPL can be used
  without a surrounding cancel firing.
- add a new `test_aio_exits_early_relays_AsyncioTaskExited` including
  a paired `exit_early: bool` flag to `push_from_aio_task()`.
- adjust `test_trio_closes_early_causes_aio_checkpoint_raise` to expect
  a `to_asyncio.TrioTaskExited`.
2025-03-23 01:02:27 -04:00
Tyler Goodlet af660c1019 Another `is` fix.. 2025-03-23 01:02:27 -04:00
Tyler Goodlet 34e9e529d2 Unset `$PYTHON_COLORS` for test debugger suite..
Since obvi all our `pexpect` patterns aren't going to match with
a heck-ton of terminal color escape sequences in the output XD
2025-03-23 01:02:27 -04:00
Tyler Goodlet 816b82f9fe Tweak some test asserts to better `is` style 2025-03-23 01:02:27 -04:00
11 changed files with 975 additions and 365 deletions

View File

@ -39,7 +39,6 @@ async def main(
loglevel='devx', loglevel='devx',
) as an, ) as an,
): ):
ptl: tractor.Portal = await an.start_actor( ptl: tractor.Portal = await an.start_actor(
'hanger', 'hanger',
enable_modules=[__name__], enable_modules=[__name__],
@ -54,13 +53,16 @@ async def main(
print( print(
'Yo my child hanging..?\n' 'Yo my child hanging..?\n'
'Sending SIGUSR1 to see a tree-trace!\n' # "i'm a user who wants to see a `stackscope` tree!\n"
) )
# XXX simulate the wrapping test's "user actions" # XXX simulate the wrapping test's "user actions"
# (i.e. if a human didn't run this manually but wants to # (i.e. if a human didn't run this manually but wants to
# know what they should do to reproduce test behaviour) # know what they should do to reproduce test behaviour)
if from_test: if from_test:
print(
f'Sending SIGUSR1 to {cpid!r}!\n'
)
os.kill( os.kill(
cpid, cpid,
signal.SIGUSR1, signal.SIGUSR1,

View File

@ -30,7 +30,7 @@ from ..conftest import (
@pytest.fixture @pytest.fixture
def spawn( def spawn(
start_method, start_method,
testdir: pytest.Testdir, testdir: pytest.Pytester,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
) -> Callable[[str], None]: ) -> Callable[[str], None]:
@ -44,16 +44,32 @@ def spawn(
'`pexpect` based tests only supported on `trio` backend' '`pexpect` based tests only supported on `trio` backend'
) )
def unset_colors():
'''
Python 3.13 introduced colored tracebacks that break patt
matching,
https://docs.python.org/3/using/cmdline.html#envvar-PYTHON_COLORS
https://docs.python.org/3/using/cmdline.html#using-on-controlling-color
'''
import os
os.environ['PYTHON_COLORS'] = '0'
def _spawn( def _spawn(
cmd: str, cmd: str,
**mkcmd_kwargs, **mkcmd_kwargs,
): ):
unset_colors()
return testdir.spawn( return testdir.spawn(
cmd=mk_cmd( cmd=mk_cmd(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
), ),
expect_timeout=3, expect_timeout=3,
# preexec_fn=unset_colors,
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
) )
# such that test-dep can pass input script name. # such that test-dep can pass input script name.
@ -83,6 +99,14 @@ def ctlc(
'https://github.com/goodboy/tractor/issues/320' 'https://github.com/goodboy/tractor/issues/320'
) )
if mark.name == 'ctlcs_bish':
pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
f'The test and/or underlying example script can *sometimes* run fine '
f'locally but more then likely until the cpython peeps get their sh#$ together, '
f'this test will definitely not behave like `trio` under SIGINT..\n'
)
if use_ctlc: if use_ctlc:
# XXX: disable pygments highlighting for auto-tests # XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle # since some envs (like actions CI) will struggle

View File

@ -6,6 +6,9 @@ All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually. equivalent `examples/debugging/` scripts manually.
''' '''
from contextlib import (
contextmanager as cm,
)
# from functools import partial # from functools import partial
# import itertools # import itertools
import time import time
@ -15,7 +18,7 @@ import time
import pytest import pytest
from pexpect.exceptions import ( from pexpect.exceptions import (
# TIMEOUT, TIMEOUT,
EOF, EOF,
) )
@ -32,7 +35,23 @@ from .conftest import (
# _repl_fail_msg, # _repl_fail_msg,
) )
@cm
def maybe_expect_timeout(
ctlc: bool = False,
) -> None:
try:
yield
except TIMEOUT:
# breakpoint()
if ctlc:
pytest.xfail(
'Some kinda redic threading SIGINT bug i think?\n'
'See the notes in `examples/debugging/sync_bp.py`..\n'
)
raise
@pytest.mark.ctlcs_bish
def test_pause_from_sync( def test_pause_from_sync(
spawn, spawn,
ctlc: bool, ctlc: bool,
@ -67,10 +86,10 @@ def test_pause_from_sync(
child.expect(PROMPT) child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel! # XXX shouldn't see gb loaded message with PDB loglevel!
assert not in_prompt_msg( # assert not in_prompt_msg(
child, # child,
['`greenback` portal opened!'], # ['`greenback` portal opened!'],
) # )
# should be same root task # should be same root task
assert_before( assert_before(
child, child,
@ -162,7 +181,14 @@ def test_pause_from_sync(
) )
child.sendline('c') child.sendline('c')
child.expect(EOF)
# XXX TODO, weird threading bug it seems despite the
# `abandon_on_cancel: bool` setting to
# `trio.to_thread.run_sync()`..
with maybe_expect_timeout(
ctlc=ctlc,
):
child.expect(EOF)
def expect_any_of( def expect_any_of(
@ -220,8 +246,10 @@ def expect_any_of(
return expected_patts return expected_patts
@pytest.mark.ctlcs_bish
def test_sync_pause_from_aio_task( def test_sync_pause_from_aio_task(
spawn, spawn,
ctlc: bool ctlc: bool
# ^TODO, fix for `asyncio`!! # ^TODO, fix for `asyncio`!!
): ):
@ -270,10 +298,12 @@ def test_sync_pause_from_aio_task(
# error raised in `asyncio.Task` # error raised in `asyncio.Task`
"raise ValueError('asyncio side error!')": [ "raise ValueError('asyncio side error!')": [
_crash_msg, _crash_msg,
'return await chan.receive()', # `.to_asyncio` impl internals in tb
"<Task 'trio_ctx'", "<Task 'trio_ctx'",
"@ ('aio_daemon'", "@ ('aio_daemon'",
"ValueError: asyncio side error!", "ValueError: asyncio side error!",
# XXX, we no longer show this frame by default!
# 'return await chan.receive()', # `.to_asyncio` impl internals in tb
], ],
# parent-side propagation via actor-nursery/portal # parent-side propagation via actor-nursery/portal
@ -325,6 +355,7 @@ def test_sync_pause_from_aio_task(
) )
child.sendline('c') child.sendline('c')
# with maybe_expect_timeout():
child.expect(EOF) child.expect(EOF)

View File

@ -15,6 +15,7 @@ TODO:
''' '''
import os import os
import signal import signal
import time
from .conftest import ( from .conftest import (
expect, expect,
@ -53,41 +54,39 @@ def test_shield_pause(
] ]
) )
script_pid: int = child.pid
print( print(
'Sending SIGUSR1 to see a tree-trace!', f'Sending SIGUSR1 to {script_pid}\n'
f'(kill -s SIGUSR1 {script_pid})\n'
) )
os.kill( os.kill(
child.pid, script_pid,
signal.SIGUSR1, signal.SIGUSR1,
) )
time.sleep(0.2)
expect( expect(
child, child,
# end-of-tree delimiter # end-of-tree delimiter
"------ \('root', ", "end-of-\('root'",
) )
assert_before( assert_before(
child, child,
[ [
'Trying to dump `stackscope` tree..', # 'Srying to dump `stackscope` tree..',
'Dumping `stackscope` tree for actor', # 'Dumping `stackscope` tree for actor',
"('root'", # uid line "('root'", # uid line
# TODO!? this used to show?
# -[ ] mk reproducable for @oremanj?
#
# parent block point (non-shielded) # parent block point (non-shielded)
'await trio.sleep_forever() # in root', # 'await trio.sleep_forever() # in root',
] ]
) )
# expect(
# child,
# # relay to the sub should be reported
# 'Relaying `SIGUSR1`[10] to sub-actor',
# )
expect( expect(
child, child,
# end-of-tree delimiter # end-of-tree delimiter
"------ \('hanger', ", "end-of-\('hanger'",
) )
assert_before( assert_before(
child, child,
@ -97,11 +96,11 @@ def test_shield_pause(
"('hanger'", # uid line "('hanger'", # uid line
# TODO!? SEE ABOVE
# hanger LOC where it's shield-halted # hanger LOC where it's shield-halted
'await trio.sleep_forever() # in subactor', # 'await trio.sleep_forever() # in subactor',
] ]
) )
# breakpoint()
# simulate the user sending a ctl-c to the hanging program. # simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since # this should result in the terminator kicking in since

View File

@ -130,7 +130,7 @@ def test_multierror(
try: try:
await portal2.result() await portal2.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.boxed_type == AssertionError assert err.boxed_type is AssertionError
print("Look Maa that first actor failed hard, hehh") print("Look Maa that first actor failed hard, hehh")
raise raise
@ -182,7 +182,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions: for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.boxed_type == AssertionError assert exc.boxed_type is AssertionError
async def do_nothing(): async def do_nothing():
@ -504,7 +504,9 @@ def test_cancel_via_SIGINT_other_task(
if is_win(): # smh if is_win(): # smh
timeout += 1 timeout += 1
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED): async def spawn_and_sleep_forever(
task_status=trio.TASK_STATUS_IGNORED
):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
for i in range(3): for i in range(3):
await tn.run_in_actor( await tn.run_in_actor(

View File

@ -32,6 +32,16 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
@pytest.fixture(
scope='module',
)
def delay(debug_mode: bool) -> int:
if debug_mode:
return 999
else:
return 1
async def sleep_and_err( async def sleep_and_err(
sleep_for: float = 0.1, sleep_for: float = 0.1,
@ -59,20 +69,26 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(aio_sleep_forever) await tractor.to_asyncio.run_task(aio_sleep_forever)
def test_trio_cancels_aio_on_actor_side(reg_addr): def test_trio_cancels_aio_on_actor_side(
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
''' '''
Spawn an infected actor that is cancelled by the ``trio`` side Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis. task using std cancel scope apis.
''' '''
async def main(): async def main():
async with tractor.open_nursery( with trio.fail_after(1 + delay):
registry_addrs=[reg_addr] async with tractor.open_nursery(
) as n: registry_addrs=[reg_addr],
await n.run_in_actor( debug_mode=debug_mode,
trio_cancels_single_aio_task, ) as an:
infect_asyncio=True, await an.run_in_actor(
) trio_cancels_single_aio_task,
infect_asyncio=True,
)
trio.run(main) trio.run(main)
@ -116,7 +132,10 @@ async def asyncio_actor(
raise raise
def test_aio_simple_error(reg_addr): def test_aio_simple_error(
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify a simple remote asyncio error propagates back through trio Verify a simple remote asyncio error propagates back through trio
to the parent actor. to the parent actor.
@ -125,9 +144,10 @@ def test_aio_simple_error(reg_addr):
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr] registry_addrs=[reg_addr],
) as n: debug_mode=debug_mode,
await n.run_in_actor( ) as an:
await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_and_err', target='sleep_and_err',
expect_err='AssertionError', expect_err='AssertionError',
@ -153,14 +173,19 @@ def test_aio_simple_error(reg_addr):
assert err.boxed_type is AssertionError assert err.boxed_type is AssertionError
def test_tractor_cancels_aio(reg_addr): def test_tractor_cancels_aio(
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify we can cancel a spawned asyncio task gracefully. Verify we can cancel a spawned asyncio task gracefully.
''' '''
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.run_in_actor( debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='aio_sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
@ -172,7 +197,9 @@ def test_tractor_cancels_aio(reg_addr):
trio.run(main) trio.run(main)
def test_trio_cancels_aio(reg_addr): def test_trio_cancels_aio(
reg_addr: tuple[str, int],
):
''' '''
Much like the above test with ``tractor.Portal.cancel_actor()`` Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api. except we just use a standard ``trio`` cancellation api.
@ -203,7 +230,8 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first" # this will block until the ``asyncio`` task sends a "first"
# message. # message.
with trio.fail_after(2): delay: int = 999 if tractor.debug_mode() else 1
with trio.fail_after(1 + delay):
try: try:
async with ( async with (
trio.open_nursery( trio.open_nursery(
@ -239,8 +267,10 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format ids='parent_actor_cancels_child={}'.format
) )
def test_context_spawns_aio_task_that_errors( def test_context_spawns_aio_task_that_errors(
reg_addr, reg_addr: tuple[str, int],
delay: int,
parent_cancels: bool, parent_cancels: bool,
debug_mode: bool,
): ):
''' '''
Verify that spawning a task via an intertask channel ctx mngr that Verify that spawning a task via an intertask channel ctx mngr that
@ -249,13 +279,13 @@ def test_context_spawns_aio_task_that_errors(
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(1 + delay):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
p = await n.start_actor( p = await an.start_actor(
'aio_daemon', 'aio_daemon',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
# debug_mode=True, debug_mode=debug_mode,
loglevel='cancel', loglevel='cancel',
) )
async with ( async with (
@ -322,11 +352,12 @@ async def aio_cancel():
def test_aio_cancelled_from_aio_causes_trio_cancelled( def test_aio_cancelled_from_aio_causes_trio_cancelled(
reg_addr: tuple, reg_addr: tuple,
delay: int,
): ):
''' '''
When the `asyncio.Task` cancels itself the `trio` side cshould When the `asyncio.Task` cancels itself the `trio` side should
also cancel and teardown and relay the cancellation cross-process also cancel and teardown and relay the cancellation cross-process
to the caller (parent). to the parent caller.
''' '''
async def main(): async def main():
@ -342,7 +373,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# NOTE: normally the `an.__aexit__()` waits on the # NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here # portal's result but we do it explicitly here
# to avoid indent levels. # to avoid indent levels.
with trio.fail_after(1): with trio.fail_after(1 + delay):
await p.wait_for_result() await p.wait_for_result()
with pytest.raises( with pytest.raises(
@ -353,11 +384,10 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# might get multiple `trio.Cancelled`s as well inside an inception # might get multiple `trio.Cancelled`s as well inside an inception
err: RemoteActorError|ExceptionGroup = excinfo.value err: RemoteActorError|ExceptionGroup = excinfo.value
if isinstance(err, ExceptionGroup): if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile( excs = err.exceptions
lambda exc: not isinstance(exc, tractor.RemoteActorError), assert len(excs) == 1
err.exceptions final_exc = excs[0]
)) assert isinstance(final_exc, tractor.RemoteActorError)
assert err
# relayed boxed error should be our `trio`-task's # relayed boxed error should be our `trio`-task's
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`. # cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
@ -370,15 +400,18 @@ async def no_to_trio_in_args():
async def push_from_aio_task( async def push_from_aio_task(
sequence: Iterable, sequence: Iterable,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
expect_cancel: False, expect_cancel: False,
fail_early: bool, fail_early: bool,
exit_early: bool,
) -> None: ) -> None:
try: try:
# print('trying breakpoint')
# breakpoint()
# sync caller ctx manager # sync caller ctx manager
to_trio.send_nowait(True) to_trio.send_nowait(True)
@ -387,10 +420,27 @@ async def push_from_aio_task(
to_trio.send_nowait(i) to_trio.send_nowait(i)
await asyncio.sleep(0.001) await asyncio.sleep(0.001)
if i == 50 and fail_early: if (
raise Exception i == 50
):
if fail_early:
print('Raising exc from aio side!')
raise Exception
print('asyncio streamer complete!') if exit_early:
# TODO? really you could enforce the same
# SC-proto we use for actors here with asyncio
# such that a Return[None] msg would be
# implicitly delivered to the trio side?
#
# XXX => this might be the end-all soln for
# converting any-inter-task system (regardless
# of maybe-remote runtime or language) to be
# SC-compat no?
print(f'asyncio breaking early @ {i!r}')
break
print('asyncio streaming complete!')
except asyncio.CancelledError: except asyncio.CancelledError:
if not expect_cancel: if not expect_cancel:
@ -402,9 +452,10 @@ async def push_from_aio_task(
async def stream_from_aio( async def stream_from_aio(
exit_early: bool = False, trio_exit_early: bool = False,
raise_err: bool = False, trio_raise_err: bool = False,
aio_raise_err: bool = False, aio_raise_err: bool = False,
aio_exit_early: bool = False,
fan_out: bool = False, fan_out: bool = False,
) -> None: ) -> None:
@ -417,8 +468,17 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
push_from_aio_task, push_from_aio_task,
sequence=seq, sequence=seq,
expect_cancel=raise_err or exit_early, expect_cancel=trio_raise_err or trio_exit_early,
fail_early=aio_raise_err, fail_early=aio_raise_err,
exit_early=aio_exit_early,
# such that we can test exit early cases
# for each side explicitly.
suppress_graceful_exits=(not(
aio_exit_early
or
trio_exit_early
))
) as (first, chan): ) as (first, chan):
@ -431,13 +491,19 @@ async def stream_from_aio(
], ],
): ):
async for value in chan: async for value in chan:
print(f'trio received {value}') print(f'trio received: {value!r}')
# XXX, debugging EoC not being handled correctly
# in `transate_aio_errors()`..
# if value is None:
# await tractor.pause(shield=True)
pulled.append(value) pulled.append(value)
if value == 50: if value == 50:
if raise_err: if trio_raise_err:
raise Exception raise Exception
elif exit_early: elif trio_exit_early:
print('`consume()` breaking early!\n') print('`consume()` breaking early!\n')
break break
@ -454,11 +520,11 @@ async def stream_from_aio(
# tasks are joined.. # tasks are joined..
chan.subscribe() as br, chan.subscribe() as br,
trio.open_nursery() as n, trio.open_nursery() as tn,
): ):
# start 2nd task that get's broadcast the same # start 2nd task that get's broadcast the same
# value set. # value set.
n.start_soon(consume, br) tn.start_soon(consume, br)
await consume(chan) await consume(chan)
else: else:
@ -471,10 +537,14 @@ async def stream_from_aio(
finally: finally:
if ( if not (
not raise_err and trio_raise_err
not exit_early and or
not aio_raise_err trio_exit_early
or
aio_raise_err
or
aio_exit_early
): ):
if fan_out: if fan_out:
# we get double the pulled values in the # we get double the pulled values in the
@ -484,6 +554,7 @@ async def stream_from_aio(
assert list(sorted(pulled)) == expect assert list(sorted(pulled)) == expect
else: else:
# await tractor.pause()
assert pulled == expect assert pulled == expect
else: else:
assert not fan_out assert not fan_out
@ -497,10 +568,13 @@ async def stream_from_aio(
'fan_out', [False, True], 'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format ids='fan_out_w_chan_subscribe={}'.format
) )
def test_basic_interloop_channel_stream(reg_addr, fan_out): def test_basic_interloop_channel_stream(
reg_addr: tuple[str, int],
fan_out: bool,
):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
infect_asyncio=True, infect_asyncio=True,
fan_out=fan_out, fan_out=fan_out,
@ -514,10 +588,10 @@ def test_basic_interloop_channel_stream(reg_addr, fan_out):
# TODO: parametrize the above test and avoid the duplication here? # TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(reg_addr): def test_trio_error_cancels_intertask_chan(reg_addr):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
raise_err=True, trio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should trigger remote actor error # should trigger remote actor error
@ -530,43 +604,116 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
excinfo.value.boxed_type is Exception excinfo.value.boxed_type is Exception
def test_trio_closes_early_and_channel_exits( def test_trio_closes_early_causes_aio_checkpoint_raise(
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
): ):
''' '''
Check that if the `trio`-task "exits early" on `async for`ing the Check that if the `trio`-task "exits early and silently" (in this
inter-task-channel (via a `break`) we exit silently from the case during `async for`-ing the inter-task-channel via
`open_channel_from()` block and get a final `Return[None]` msg. a `break`-from-loop), we raise `TrioTaskExited` on the
`asyncio`-side which also then bubbles up through the
`open_channel_from()` block indicating that the `asyncio.Task`
hit a ran another checkpoint despite the `trio.Task` exit.
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(1 + delay):
async with tractor.open_nursery( async with tractor.open_nursery(
# debug_mode=True, debug_mode=debug_mode,
# enable_stack_on_sig=True, # enable_stack_on_sig=True,
) as n: ) as an:
portal = await n.run_in_actor( portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
exit_early=True, trio_exit_early=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should raise RAE diectly # should raise RAE diectly
print('waiting on final infected subactor result..') print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result() res: None = await portal.wait_for_result()
assert res is None assert res is None
print('infected subactor returned result: {res!r}\n') print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit # should be a quiet exit on a simple channel exit
trio.run( with pytest.raises(RemoteActorError) as excinfo:
main, trio.run(main)
# strict_exception_groups=False,
) # ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(reg_addr): def test_aio_exits_early_relays_AsyncioTaskExited(
# TODO, parametrize the 3 possible trio side conditions:
# - trio blocking on receive, aio exits early
# - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int],
debug_mode: bool,
delay: int,
):
'''
Check that if the `asyncio`-task "exits early and silently" (in this
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
it `break`s from the loop), we raise `AsyncioTaskExited` on the
`trio`-side which then DOES NOT BUBBLE up through the
`open_channel_from()` block UNLESS,
- the trio.Task also errored/cancelled, in which case we wrap
both errors in an eg
- the trio.Task was blocking on rxing a value from the
`InterLoopTaskChannel`.
'''
async def main(): async def main():
async with tractor.open_nursery() as n: with trio.fail_after(1 + delay):
portal = await n.run_in_actor( async with tractor.open_nursery(
debug_mode=debug_mode,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
trio_exit_early=False,
aio_exit_early=True,
)
# should raise RAE diectly
print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
exc = excinfo.value
# TODO, wow bug!
# -[ ] bp handler not replaced!?!?
# breakpoint()
# import pdbp; pdbp.set_trace()
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes(
reg_addr: tuple[str, int],
debug_mode: bool,
):
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
aio_raise_err=True, aio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
@ -592,7 +739,13 @@ async def aio_echo_server(
to_trio.send_nowait('start') to_trio.send_nowait('start')
while True: while True:
msg = await from_trio.get() try:
msg = await from_trio.get()
except to_asyncio.TrioTaskExited:
print(
'breaking aio echo loop due to `trio` exit!'
)
break
# echo the msg back # echo the msg back
to_trio.send_nowait(msg) to_trio.send_nowait(msg)
@ -641,13 +794,15 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format, ids='raise_error={}'.format,
) )
def test_echoserver_detailed_mechanics( def test_echoserver_detailed_mechanics(
reg_addr, reg_addr: tuple[str, int],
debug_mode: bool,
raise_error_mid_stream, raise_error_mid_stream,
): ):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
p = await n.start_actor( debug_mode=debug_mode,
) as an:
p = await an.start_actor(
'aio_server', 'aio_server',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
@ -852,6 +1007,8 @@ def test_sigint_closes_lifetime_stack(
''' '''
async def main(): async def main():
delay = 999 if tractor.debug_mode() else 1
try: try:
an: tractor.ActorNursery an: tractor.ActorNursery
async with tractor.open_nursery( async with tractor.open_nursery(
@ -902,7 +1059,7 @@ def test_sigint_closes_lifetime_stack(
if wait_for_ctx: if wait_for_ctx:
print('waiting for ctx outcome in parent..') print('waiting for ctx outcome in parent..')
try: try:
with trio.fail_after(1): with trio.fail_after(1 + delay):
await ctx.wait_for_result() await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc: except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid assert ctxc.canceller == ctx.chan.uid

View File

@ -170,7 +170,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
trio.run(main) trio.run(main)
rae = excinfo.value rae = excinfo.value
assert rae.boxed_type == TypeError assert rae.boxed_type is TypeError
@tractor.context @tractor.context

View File

@ -39,7 +39,7 @@ def test_infected_root_actor(
''' '''
async def _trio_main(): async def _trio_main():
with trio.fail_after(2): with trio.fail_after(2 if not debug_mode else 999):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
async with ( async with (
@ -59,7 +59,11 @@ def test_infected_root_actor(
assert out == i assert out == i
print(f'asyncio echoing {i}') print(f'asyncio echoing {i}')
if raise_error_mid_stream and i == 500: if (
raise_error_mid_stream
and
i == 500
):
raise raise_error_mid_stream raise raise_error_mid_stream
if out is None: if out is None:

View File

@ -82,6 +82,39 @@ class InternalError(RuntimeError):
''' '''
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
class AsyncioTaskExited(Exception):
'''
asyncio.Task "exited" translation error for use with the
`to_asyncio` APIs to be raised in the `trio` side task indicating
on `.run_task()`/`.open_channel_from()` exit that the aio side
exited early/silently.
'''
class TrioTaskExited(AsyncioCancelled):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
This is very similar to how `trio.ClosedResource` acts as
a "clean shutdown" signal to the consumer side of a mem-chan,
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
'''
# NOTE: more or less should be close to these: # NOTE: more or less should be close to these:
# 'boxed_type', # 'boxed_type',
@ -127,8 +160,8 @@ _body_fields: list[str] = list(
def get_err_type(type_name: str) -> BaseException|None: def get_err_type(type_name: str) -> BaseException|None:
''' '''
Look up an exception type by name from the set of locally Look up an exception type by name from the set of locally known
known namespaces: namespaces:
- `builtins` - `builtins`
- `tractor._exceptions` - `tractor._exceptions`
@ -358,6 +391,13 @@ class RemoteActorError(Exception):
self._ipc_msg.src_type_str self._ipc_msg.src_type_str
) )
if not self._src_type:
raise TypeError(
f'Failed to lookup src error type with '
f'`tractor._exceptions.get_err_type()` :\n'
f'{self.src_type_str}'
)
return self._src_type return self._src_type
@property @property
@ -652,16 +692,10 @@ class RemoteActorError(Exception):
failing actor's remote env. failing actor's remote env.
''' '''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder # TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends: # metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611 # https://github.com/python-trio/trio/issues/611
src_type_ref: Type[BaseException] = self.src_type
return src_type_ref(self.tb_str) return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given # TODO: local recontruction of nested inception for a given

View File

@ -35,6 +35,7 @@ from signal import (
signal, signal,
getsignal, getsignal,
SIGUSR1, SIGUSR1,
SIGINT,
) )
# import traceback # import traceback
from types import ModuleType from types import ModuleType
@ -48,6 +49,7 @@ from tractor import (
_state, _state,
log as logmod, log as logmod,
) )
from tractor.devx import _debug
log = logmod.get_logger(__name__) log = logmod.get_logger(__name__)
@ -76,22 +78,45 @@ def dump_task_tree() -> None:
) )
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
thr: Thread = current_thread() thr: Thread = current_thread()
current_sigint_handler: Callable = getsignal(SIGINT)
if (
current_sigint_handler
is not
_debug.DebugStatus._trio_handler
):
sigint_handler_report: str = (
'The default `trio` SIGINT handler was replaced?!'
)
else:
sigint_handler_report: str = (
'The default `trio` SIGINT handler is in use?!'
)
# sclang symbology
# |_<object>
# |_(Task/Thread/Process/Actor
# |_{Supervisor/Scope
# |_[Storage/Memory/IPC-Stream/Data-Struct
log.devx( log.devx(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree for actor\n'
f'{actor.uid}:\n' f'(>: {actor.uid!r}\n'
f'|_{mp.current_process()}\n' f' |_{mp.current_process()}\n'
f' |_{thr}\n' f' |_{thr}\n'
f' |_{actor}\n\n' f' |_{actor}\n'
# start-of-trace-tree delimiter (mostly for testing)
'------ - ------\n'
'\n'
+
f'{tree_str}\n'
+
# end-of-trace-tree delimiter (mostly for testing)
f'\n' f'\n'
f'------ {actor.uid!r} ------\n' f'{sigint_handler_report}\n'
f'signal.getsignal(SIGINT) -> {current_sigint_handler!r}\n'
# f'\n'
# start-of-trace-tree delimiter (mostly for testing)
# f'------ {actor.uid!r} ------\n'
f'\n'
f'------ start-of-{actor.uid!r} ------\n'
f'|\n'
f'{tree_str}'
# end-of-trace-tree delimiter (mostly for testing)
f'|\n'
f'|_____ end-of-{actor.uid!r} ______\n'
) )
# TODO: can remove this right? # TODO: can remove this right?
# -[ ] was original code from author # -[ ] was original code from author
@ -123,11 +148,11 @@ def dump_tree_on_sig(
) -> None: ) -> None:
global _tree_dumped, _handler_lock global _tree_dumped, _handler_lock
with _handler_lock: with _handler_lock:
if _tree_dumped: # if _tree_dumped:
log.warning( # log.warning(
'Already dumped for this actor...??' # 'Already dumped for this actor...??'
) # )
return # return
_tree_dumped = True _tree_dumped = True
@ -161,9 +186,9 @@ def dump_tree_on_sig(
) )
raise raise
log.devx( # log.devx(
'Supposedly we dumped just fine..?' # 'Supposedly we dumped just fine..?'
) # )
if not relay_to_subs: if not relay_to_subs:
return return
@ -202,11 +227,11 @@ def enable_stack_on_sig(
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution) (https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use: you could use:
>> kill -SIGUSR1 $(pgrep -f '<cmd>') >> kill -SIGUSR1 $(pgrep -f <part-of-cmd: str>)
Or with with `xonsh` (which has diff capture-from-subproc syntax) OR without a sub-shell,
>> kill -SIGUSR1 @$(pgrep -f '<cmd>') >> pkill --signal SIGUSR1 -f <part-of-cmd: str>
''' '''
try: try:

File diff suppressed because it is too large Load Diff