Compare commits

..

No commits in common. "e646ce5c0d2735b996af82053322978fae5d6795" and "e8111e40f9c50c5a3a4dc64bdab0815f0cba6a80" have entirely different histories.

11 changed files with 365 additions and 975 deletions

View File

@ -39,6 +39,7 @@ async def main(
loglevel='devx', loglevel='devx',
) as an, ) as an,
): ):
ptl: tractor.Portal = await an.start_actor( ptl: tractor.Portal = await an.start_actor(
'hanger', 'hanger',
enable_modules=[__name__], enable_modules=[__name__],
@ -53,16 +54,13 @@ async def main(
print( print(
'Yo my child hanging..?\n' 'Yo my child hanging..?\n'
# "i'm a user who wants to see a `stackscope` tree!\n" 'Sending SIGUSR1 to see a tree-trace!\n'
) )
# XXX simulate the wrapping test's "user actions" # XXX simulate the wrapping test's "user actions"
# (i.e. if a human didn't run this manually but wants to # (i.e. if a human didn't run this manually but wants to
# know what they should do to reproduce test behaviour) # know what they should do to reproduce test behaviour)
if from_test: if from_test:
print(
f'Sending SIGUSR1 to {cpid!r}!\n'
)
os.kill( os.kill(
cpid, cpid,
signal.SIGUSR1, signal.SIGUSR1,

View File

@ -30,7 +30,7 @@ from ..conftest import (
@pytest.fixture @pytest.fixture
def spawn( def spawn(
start_method, start_method,
testdir: pytest.Pytester, testdir: pytest.Testdir,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
) -> Callable[[str], None]: ) -> Callable[[str], None]:
@ -44,32 +44,16 @@ def spawn(
'`pexpect` based tests only supported on `trio` backend' '`pexpect` based tests only supported on `trio` backend'
) )
def unset_colors():
'''
Python 3.13 introduced colored tracebacks that break patt
matching,
https://docs.python.org/3/using/cmdline.html#envvar-PYTHON_COLORS
https://docs.python.org/3/using/cmdline.html#using-on-controlling-color
'''
import os
os.environ['PYTHON_COLORS'] = '0'
def _spawn( def _spawn(
cmd: str, cmd: str,
**mkcmd_kwargs, **mkcmd_kwargs,
): ):
unset_colors()
return testdir.spawn( return testdir.spawn(
cmd=mk_cmd( cmd=mk_cmd(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
), ),
expect_timeout=3, expect_timeout=3,
# preexec_fn=unset_colors,
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
) )
# such that test-dep can pass input script name. # such that test-dep can pass input script name.
@ -99,14 +83,6 @@ def ctlc(
'https://github.com/goodboy/tractor/issues/320' 'https://github.com/goodboy/tractor/issues/320'
) )
if mark.name == 'ctlcs_bish':
pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
f'The test and/or underlying example script can *sometimes* run fine '
f'locally but more then likely until the cpython peeps get their sh#$ together, '
f'this test will definitely not behave like `trio` under SIGINT..\n'
)
if use_ctlc: if use_ctlc:
# XXX: disable pygments highlighting for auto-tests # XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle # since some envs (like actions CI) will struggle

View File

@ -6,9 +6,6 @@ All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually. equivalent `examples/debugging/` scripts manually.
''' '''
from contextlib import (
contextmanager as cm,
)
# from functools import partial # from functools import partial
# import itertools # import itertools
import time import time
@ -18,7 +15,7 @@ import time
import pytest import pytest
from pexpect.exceptions import ( from pexpect.exceptions import (
TIMEOUT, # TIMEOUT,
EOF, EOF,
) )
@ -35,23 +32,7 @@ from .conftest import (
# _repl_fail_msg, # _repl_fail_msg,
) )
@cm
def maybe_expect_timeout(
ctlc: bool = False,
) -> None:
try:
yield
except TIMEOUT:
# breakpoint()
if ctlc:
pytest.xfail(
'Some kinda redic threading SIGINT bug i think?\n'
'See the notes in `examples/debugging/sync_bp.py`..\n'
)
raise
@pytest.mark.ctlcs_bish
def test_pause_from_sync( def test_pause_from_sync(
spawn, spawn,
ctlc: bool, ctlc: bool,
@ -86,10 +67,10 @@ def test_pause_from_sync(
child.expect(PROMPT) child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel! # XXX shouldn't see gb loaded message with PDB loglevel!
# assert not in_prompt_msg( assert not in_prompt_msg(
# child, child,
# ['`greenback` portal opened!'], ['`greenback` portal opened!'],
# ) )
# should be same root task # should be same root task
assert_before( assert_before(
child, child,
@ -181,13 +162,6 @@ def test_pause_from_sync(
) )
child.sendline('c') child.sendline('c')
# XXX TODO, weird threading bug it seems despite the
# `abandon_on_cancel: bool` setting to
# `trio.to_thread.run_sync()`..
with maybe_expect_timeout(
ctlc=ctlc,
):
child.expect(EOF) child.expect(EOF)
@ -246,10 +220,8 @@ def expect_any_of(
return expected_patts return expected_patts
@pytest.mark.ctlcs_bish
def test_sync_pause_from_aio_task( def test_sync_pause_from_aio_task(
spawn, spawn,
ctlc: bool ctlc: bool
# ^TODO, fix for `asyncio`!! # ^TODO, fix for `asyncio`!!
): ):
@ -298,12 +270,10 @@ def test_sync_pause_from_aio_task(
# error raised in `asyncio.Task` # error raised in `asyncio.Task`
"raise ValueError('asyncio side error!')": [ "raise ValueError('asyncio side error!')": [
_crash_msg, _crash_msg,
'return await chan.receive()', # `.to_asyncio` impl internals in tb
"<Task 'trio_ctx'", "<Task 'trio_ctx'",
"@ ('aio_daemon'", "@ ('aio_daemon'",
"ValueError: asyncio side error!", "ValueError: asyncio side error!",
# XXX, we no longer show this frame by default!
# 'return await chan.receive()', # `.to_asyncio` impl internals in tb
], ],
# parent-side propagation via actor-nursery/portal # parent-side propagation via actor-nursery/portal
@ -355,7 +325,6 @@ def test_sync_pause_from_aio_task(
) )
child.sendline('c') child.sendline('c')
# with maybe_expect_timeout():
child.expect(EOF) child.expect(EOF)

View File

@ -15,7 +15,6 @@ TODO:
''' '''
import os import os
import signal import signal
import time
from .conftest import ( from .conftest import (
expect, expect,
@ -54,39 +53,41 @@ def test_shield_pause(
] ]
) )
script_pid: int = child.pid
print( print(
f'Sending SIGUSR1 to {script_pid}\n' 'Sending SIGUSR1 to see a tree-trace!',
f'(kill -s SIGUSR1 {script_pid})\n'
) )
os.kill( os.kill(
script_pid, child.pid,
signal.SIGUSR1, signal.SIGUSR1,
) )
time.sleep(0.2)
expect( expect(
child, child,
# end-of-tree delimiter # end-of-tree delimiter
"end-of-\('root'", "------ \('root', ",
) )
assert_before( assert_before(
child, child,
[ [
# 'Srying to dump `stackscope` tree..', 'Trying to dump `stackscope` tree..',
# 'Dumping `stackscope` tree for actor', 'Dumping `stackscope` tree for actor',
"('root'", # uid line "('root'", # uid line
# TODO!? this used to show?
# -[ ] mk reproducable for @oremanj?
#
# parent block point (non-shielded) # parent block point (non-shielded)
# 'await trio.sleep_forever() # in root', 'await trio.sleep_forever() # in root',
] ]
) )
# expect(
# child,
# # relay to the sub should be reported
# 'Relaying `SIGUSR1`[10] to sub-actor',
# )
expect( expect(
child, child,
# end-of-tree delimiter # end-of-tree delimiter
"end-of-\('hanger'", "------ \('hanger', ",
) )
assert_before( assert_before(
child, child,
@ -96,11 +97,11 @@ def test_shield_pause(
"('hanger'", # uid line "('hanger'", # uid line
# TODO!? SEE ABOVE
# hanger LOC where it's shield-halted # hanger LOC where it's shield-halted
# 'await trio.sleep_forever() # in subactor', 'await trio.sleep_forever() # in subactor',
] ]
) )
# breakpoint()
# simulate the user sending a ctl-c to the hanging program. # simulate the user sending a ctl-c to the hanging program.
# this should result in the terminator kicking in since # this should result in the terminator kicking in since

View File

@ -130,7 +130,7 @@ def test_multierror(
try: try:
await portal2.result() await portal2.result()
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.boxed_type is AssertionError assert err.boxed_type == AssertionError
print("Look Maa that first actor failed hard, hehh") print("Look Maa that first actor failed hard, hehh")
raise raise
@ -182,7 +182,7 @@ def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
for exc in exceptions: for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError) assert isinstance(exc, tractor.RemoteActorError)
assert exc.boxed_type is AssertionError assert exc.boxed_type == AssertionError
async def do_nothing(): async def do_nothing():
@ -504,9 +504,7 @@ def test_cancel_via_SIGINT_other_task(
if is_win(): # smh if is_win(): # smh
timeout += 1 timeout += 1
async def spawn_and_sleep_forever( async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
task_status=trio.TASK_STATUS_IGNORED
):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
for i in range(3): for i in range(3):
await tn.run_in_actor( await tn.run_in_actor(

View File

@ -32,16 +32,6 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
@pytest.fixture(
scope='module',
)
def delay(debug_mode: bool) -> int:
if debug_mode:
return 999
else:
return 1
async def sleep_and_err( async def sleep_and_err(
sleep_for: float = 0.1, sleep_for: float = 0.1,
@ -69,23 +59,17 @@ async def trio_cancels_single_aio_task():
await tractor.to_asyncio.run_task(aio_sleep_forever) await tractor.to_asyncio.run_task(aio_sleep_forever)
def test_trio_cancels_aio_on_actor_side( def test_trio_cancels_aio_on_actor_side(reg_addr):
reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
):
''' '''
Spawn an infected actor that is cancelled by the ``trio`` side Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis. task using std cancel scope apis.
''' '''
async def main(): async def main():
with trio.fail_after(1 + delay):
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr], registry_addrs=[reg_addr]
debug_mode=debug_mode, ) as n:
) as an: await n.run_in_actor(
await an.run_in_actor(
trio_cancels_single_aio_task, trio_cancels_single_aio_task,
infect_asyncio=True, infect_asyncio=True,
) )
@ -132,10 +116,7 @@ async def asyncio_actor(
raise raise
def test_aio_simple_error( def test_aio_simple_error(reg_addr):
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify a simple remote asyncio error propagates back through trio Verify a simple remote asyncio error propagates back through trio
to the parent actor. to the parent actor.
@ -144,10 +125,9 @@ def test_aio_simple_error(
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr], registry_addrs=[reg_addr]
debug_mode=debug_mode, ) as n:
) as an: await n.run_in_actor(
await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='sleep_and_err', target='sleep_and_err',
expect_err='AssertionError', expect_err='AssertionError',
@ -173,19 +153,14 @@ def test_aio_simple_error(
assert err.boxed_type is AssertionError assert err.boxed_type is AssertionError
def test_tractor_cancels_aio( def test_tractor_cancels_aio(reg_addr):
reg_addr: tuple[str, int],
debug_mode: bool,
):
''' '''
Verify we can cancel a spawned asyncio task gracefully. Verify we can cancel a spawned asyncio task gracefully.
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode, portal = await n.run_in_actor(
) as an:
portal = await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='aio_sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
@ -197,9 +172,7 @@ def test_tractor_cancels_aio(
trio.run(main) trio.run(main)
def test_trio_cancels_aio( def test_trio_cancels_aio(reg_addr):
reg_addr: tuple[str, int],
):
''' '''
Much like the above test with ``tractor.Portal.cancel_actor()`` Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api. except we just use a standard ``trio`` cancellation api.
@ -230,8 +203,7 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first" # this will block until the ``asyncio`` task sends a "first"
# message. # message.
delay: int = 999 if tractor.debug_mode() else 1 with trio.fail_after(2):
with trio.fail_after(1 + delay):
try: try:
async with ( async with (
trio.open_nursery( trio.open_nursery(
@ -267,10 +239,8 @@ async def trio_ctx(
ids='parent_actor_cancels_child={}'.format ids='parent_actor_cancels_child={}'.format
) )
def test_context_spawns_aio_task_that_errors( def test_context_spawns_aio_task_that_errors(
reg_addr: tuple[str, int], reg_addr,
delay: int,
parent_cancels: bool, parent_cancels: bool,
debug_mode: bool,
): ):
''' '''
Verify that spawning a task via an intertask channel ctx mngr that Verify that spawning a task via an intertask channel ctx mngr that
@ -279,13 +249,13 @@ def test_context_spawns_aio_task_that_errors(
''' '''
async def main(): async def main():
with trio.fail_after(1 + delay): with trio.fail_after(2):
async with tractor.open_nursery() as an: async with tractor.open_nursery() as n:
p = await an.start_actor( p = await n.start_actor(
'aio_daemon', 'aio_daemon',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
debug_mode=debug_mode, # debug_mode=True,
loglevel='cancel', loglevel='cancel',
) )
async with ( async with (
@ -352,12 +322,11 @@ async def aio_cancel():
def test_aio_cancelled_from_aio_causes_trio_cancelled( def test_aio_cancelled_from_aio_causes_trio_cancelled(
reg_addr: tuple, reg_addr: tuple,
delay: int,
): ):
''' '''
When the `asyncio.Task` cancels itself the `trio` side should When the `asyncio.Task` cancels itself the `trio` side cshould
also cancel and teardown and relay the cancellation cross-process also cancel and teardown and relay the cancellation cross-process
to the parent caller. to the caller (parent).
''' '''
async def main(): async def main():
@ -373,7 +342,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# NOTE: normally the `an.__aexit__()` waits on the # NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here # portal's result but we do it explicitly here
# to avoid indent levels. # to avoid indent levels.
with trio.fail_after(1 + delay): with trio.fail_after(1):
await p.wait_for_result() await p.wait_for_result()
with pytest.raises( with pytest.raises(
@ -384,10 +353,11 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(
# might get multiple `trio.Cancelled`s as well inside an inception # might get multiple `trio.Cancelled`s as well inside an inception
err: RemoteActorError|ExceptionGroup = excinfo.value err: RemoteActorError|ExceptionGroup = excinfo.value
if isinstance(err, ExceptionGroup): if isinstance(err, ExceptionGroup):
excs = err.exceptions err = next(itertools.dropwhile(
assert len(excs) == 1 lambda exc: not isinstance(exc, tractor.RemoteActorError),
final_exc = excs[0] err.exceptions
assert isinstance(final_exc, tractor.RemoteActorError) ))
assert err
# relayed boxed error should be our `trio`-task's # relayed boxed error should be our `trio`-task's
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`. # cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
@ -400,18 +370,15 @@ async def no_to_trio_in_args():
async def push_from_aio_task( async def push_from_aio_task(
sequence: Iterable, sequence: Iterable,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
expect_cancel: False, expect_cancel: False,
fail_early: bool, fail_early: bool,
exit_early: bool,
) -> None: ) -> None:
try: try:
# print('trying breakpoint')
# breakpoint()
# sync caller ctx manager # sync caller ctx manager
to_trio.send_nowait(True) to_trio.send_nowait(True)
@ -420,27 +387,10 @@ async def push_from_aio_task(
to_trio.send_nowait(i) to_trio.send_nowait(i)
await asyncio.sleep(0.001) await asyncio.sleep(0.001)
if ( if i == 50 and fail_early:
i == 50
):
if fail_early:
print('Raising exc from aio side!')
raise Exception raise Exception
if exit_early: print('asyncio streamer complete!')
# TODO? really you could enforce the same
# SC-proto we use for actors here with asyncio
# such that a Return[None] msg would be
# implicitly delivered to the trio side?
#
# XXX => this might be the end-all soln for
# converting any-inter-task system (regardless
# of maybe-remote runtime or language) to be
# SC-compat no?
print(f'asyncio breaking early @ {i!r}')
break
print('asyncio streaming complete!')
except asyncio.CancelledError: except asyncio.CancelledError:
if not expect_cancel: if not expect_cancel:
@ -452,10 +402,9 @@ async def push_from_aio_task(
async def stream_from_aio( async def stream_from_aio(
trio_exit_early: bool = False, exit_early: bool = False,
trio_raise_err: bool = False, raise_err: bool = False,
aio_raise_err: bool = False, aio_raise_err: bool = False,
aio_exit_early: bool = False,
fan_out: bool = False, fan_out: bool = False,
) -> None: ) -> None:
@ -468,17 +417,8 @@ async def stream_from_aio(
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
push_from_aio_task, push_from_aio_task,
sequence=seq, sequence=seq,
expect_cancel=trio_raise_err or trio_exit_early, expect_cancel=raise_err or exit_early,
fail_early=aio_raise_err, fail_early=aio_raise_err,
exit_early=aio_exit_early,
# such that we can test exit early cases
# for each side explicitly.
suppress_graceful_exits=(not(
aio_exit_early
or
trio_exit_early
))
) as (first, chan): ) as (first, chan):
@ -491,19 +431,13 @@ async def stream_from_aio(
], ],
): ):
async for value in chan: async for value in chan:
print(f'trio received: {value!r}') print(f'trio received {value}')
# XXX, debugging EoC not being handled correctly
# in `transate_aio_errors()`..
# if value is None:
# await tractor.pause(shield=True)
pulled.append(value) pulled.append(value)
if value == 50: if value == 50:
if trio_raise_err: if raise_err:
raise Exception raise Exception
elif trio_exit_early: elif exit_early:
print('`consume()` breaking early!\n') print('`consume()` breaking early!\n')
break break
@ -520,11 +454,11 @@ async def stream_from_aio(
# tasks are joined.. # tasks are joined..
chan.subscribe() as br, chan.subscribe() as br,
trio.open_nursery() as tn, trio.open_nursery() as n,
): ):
# start 2nd task that get's broadcast the same # start 2nd task that get's broadcast the same
# value set. # value set.
tn.start_soon(consume, br) n.start_soon(consume, br)
await consume(chan) await consume(chan)
else: else:
@ -537,14 +471,10 @@ async def stream_from_aio(
finally: finally:
if not ( if (
trio_raise_err not raise_err and
or not exit_early and
trio_exit_early not aio_raise_err
or
aio_raise_err
or
aio_exit_early
): ):
if fan_out: if fan_out:
# we get double the pulled values in the # we get double the pulled values in the
@ -554,7 +484,6 @@ async def stream_from_aio(
assert list(sorted(pulled)) == expect assert list(sorted(pulled)) == expect
else: else:
# await tractor.pause()
assert pulled == expect assert pulled == expect
else: else:
assert not fan_out assert not fan_out
@ -568,13 +497,10 @@ async def stream_from_aio(
'fan_out', [False, True], 'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format ids='fan_out_w_chan_subscribe={}'.format
) )
def test_basic_interloop_channel_stream( def test_basic_interloop_channel_stream(reg_addr, fan_out):
reg_addr: tuple[str, int],
fan_out: bool,
):
async def main(): async def main():
async with tractor.open_nursery() as an: async with tractor.open_nursery() as n:
portal = await an.run_in_actor( portal = await n.run_in_actor(
stream_from_aio, stream_from_aio,
infect_asyncio=True, infect_asyncio=True,
fan_out=fan_out, fan_out=fan_out,
@ -588,10 +514,10 @@ def test_basic_interloop_channel_stream(
# TODO: parametrize the above test and avoid the duplication here? # TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(reg_addr): def test_trio_error_cancels_intertask_chan(reg_addr):
async def main(): async def main():
async with tractor.open_nursery() as an: async with tractor.open_nursery() as n:
portal = await an.run_in_actor( portal = await n.run_in_actor(
stream_from_aio, stream_from_aio,
trio_raise_err=True, raise_err=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should trigger remote actor error # should trigger remote actor error
@ -604,116 +530,43 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
excinfo.value.boxed_type is Exception excinfo.value.boxed_type is Exception
def test_trio_closes_early_causes_aio_checkpoint_raise( def test_trio_closes_early_and_channel_exits(
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
delay: int,
debug_mode: bool,
): ):
''' '''
Check that if the `trio`-task "exits early and silently" (in this Check that if the `trio`-task "exits early" on `async for`ing the
case during `async for`-ing the inter-task-channel via inter-task-channel (via a `break`) we exit silently from the
a `break`-from-loop), we raise `TrioTaskExited` on the `open_channel_from()` block and get a final `Return[None]` msg.
`asyncio`-side which also then bubbles up through the
`open_channel_from()` block indicating that the `asyncio.Task`
hit a ran another checkpoint despite the `trio.Task` exit.
''' '''
async def main(): async def main():
with trio.fail_after(1 + delay): with trio.fail_after(2):
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, # debug_mode=True,
# enable_stack_on_sig=True, # enable_stack_on_sig=True,
) as an: ) as n:
portal = await an.run_in_actor( portal = await n.run_in_actor(
stream_from_aio, stream_from_aio,
trio_exit_early=True, exit_early=True,
infect_asyncio=True, infect_asyncio=True,
) )
# should raise RAE diectly # should raise RAE diectly
print('waiting on final infected subactor result..') print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result() res: None = await portal.wait_for_result()
assert res is None assert res is None
print(f'infected subactor returned result: {res!r}\n') print('infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit # should be a quiet exit on a simple channel exit
with pytest.raises(RemoteActorError) as excinfo: trio.run(
trio.run(main) main,
# strict_exception_groups=False,
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
excinfo.value.boxed_type is to_asyncio.TrioTaskExited
def test_aio_exits_early_relays_AsyncioTaskExited(
# TODO, parametrize the 3 possible trio side conditions:
# - trio blocking on receive, aio exits early
# - trio cancelled AND aio exits early on its next tick
# - trio errors AND aio exits early on its next tick
reg_addr: tuple[str, int],
debug_mode: bool,
delay: int,
):
'''
Check that if the `asyncio`-task "exits early and silently" (in this
case during `push_from_aio_task()` pushing to the `InterLoopTaskChannel`
it `break`s from the loop), we raise `AsyncioTaskExited` on the
`trio`-side which then DOES NOT BUBBLE up through the
`open_channel_from()` block UNLESS,
- the trio.Task also errored/cancelled, in which case we wrap
both errors in an eg
- the trio.Task was blocking on rxing a value from the
`InterLoopTaskChannel`.
'''
async def main():
with trio.fail_after(1 + delay):
async with tractor.open_nursery(
debug_mode=debug_mode,
# enable_stack_on_sig=True,
) as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
trio_exit_early=False,
aio_exit_early=True,
) )
# should raise RAE diectly
print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print(f'infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
exc = excinfo.value
# TODO, wow bug!
# -[ ] bp handler not replaced!?!?
# breakpoint()
# import pdbp; pdbp.set_trace()
# ensure remote error is an explicit `AsyncioCancelled` sub-type
# which indicates to the aio task that the trio side exited
# silently WITHOUT raising a `trio.Cancelled` (which would
# normally be raised instead as a `AsyncioCancelled`).
assert exc.boxed_type is to_asyncio.AsyncioTaskExited
def test_aio_errors_and_channel_propagates_and_closes( def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
reg_addr: tuple[str, int],
debug_mode: bool,
):
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode, portal = await n.run_in_actor(
) as an:
portal = await an.run_in_actor(
stream_from_aio, stream_from_aio,
aio_raise_err=True, aio_raise_err=True,
infect_asyncio=True, infect_asyncio=True,
@ -739,13 +592,7 @@ async def aio_echo_server(
to_trio.send_nowait('start') to_trio.send_nowait('start')
while True: while True:
try:
msg = await from_trio.get() msg = await from_trio.get()
except to_asyncio.TrioTaskExited:
print(
'breaking aio echo loop due to `trio` exit!'
)
break
# echo the msg back # echo the msg back
to_trio.send_nowait(msg) to_trio.send_nowait(msg)
@ -794,15 +641,13 @@ async def trio_to_aio_echo_server(
ids='raise_error={}'.format, ids='raise_error={}'.format,
) )
def test_echoserver_detailed_mechanics( def test_echoserver_detailed_mechanics(
reg_addr: tuple[str, int], reg_addr,
debug_mode: bool,
raise_error_mid_stream, raise_error_mid_stream,
): ):
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery() as n:
debug_mode=debug_mode, p = await n.start_actor(
) as an:
p = await an.start_actor(
'aio_server', 'aio_server',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
@ -1007,8 +852,6 @@ def test_sigint_closes_lifetime_stack(
''' '''
async def main(): async def main():
delay = 999 if tractor.debug_mode() else 1
try: try:
an: tractor.ActorNursery an: tractor.ActorNursery
async with tractor.open_nursery( async with tractor.open_nursery(
@ -1059,7 +902,7 @@ def test_sigint_closes_lifetime_stack(
if wait_for_ctx: if wait_for_ctx:
print('waiting for ctx outcome in parent..') print('waiting for ctx outcome in parent..')
try: try:
with trio.fail_after(1 + delay): with trio.fail_after(1):
await ctx.wait_for_result() await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc: except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid assert ctxc.canceller == ctx.chan.uid

View File

@ -170,7 +170,7 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
trio.run(main) trio.run(main)
rae = excinfo.value rae = excinfo.value
assert rae.boxed_type is TypeError assert rae.boxed_type == TypeError
@tractor.context @tractor.context

View File

@ -39,7 +39,7 @@ def test_infected_root_actor(
''' '''
async def _trio_main(): async def _trio_main():
with trio.fail_after(2 if not debug_mode else 999): with trio.fail_after(2):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
async with ( async with (
@ -59,11 +59,7 @@ def test_infected_root_actor(
assert out == i assert out == i
print(f'asyncio echoing {i}') print(f'asyncio echoing {i}')
if ( if raise_error_mid_stream and i == 500:
raise_error_mid_stream
and
i == 500
):
raise raise_error_mid_stream raise raise_error_mid_stream
if out is None: if out is None:

View File

@ -82,39 +82,6 @@ class InternalError(RuntimeError):
''' '''
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
class AsyncioTaskExited(Exception):
'''
asyncio.Task "exited" translation error for use with the
`to_asyncio` APIs to be raised in the `trio` side task indicating
on `.run_task()`/`.open_channel_from()` exit that the aio side
exited early/silently.
'''
class TrioTaskExited(AsyncioCancelled):
'''
The `trio`-side task exited without explicitly cancelling the
`asyncio.Task` peer.
This is very similar to how `trio.ClosedResource` acts as
a "clean shutdown" signal to the consumer side of a mem-chan,
https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
'''
# NOTE: more or less should be close to these: # NOTE: more or less should be close to these:
# 'boxed_type', # 'boxed_type',
@ -160,8 +127,8 @@ _body_fields: list[str] = list(
def get_err_type(type_name: str) -> BaseException|None: def get_err_type(type_name: str) -> BaseException|None:
''' '''
Look up an exception type by name from the set of locally known Look up an exception type by name from the set of locally
namespaces: known namespaces:
- `builtins` - `builtins`
- `tractor._exceptions` - `tractor._exceptions`
@ -391,13 +358,6 @@ class RemoteActorError(Exception):
self._ipc_msg.src_type_str self._ipc_msg.src_type_str
) )
if not self._src_type:
raise TypeError(
f'Failed to lookup src error type with '
f'`tractor._exceptions.get_err_type()` :\n'
f'{self.src_type_str}'
)
return self._src_type return self._src_type
@property @property
@ -692,10 +652,16 @@ class RemoteActorError(Exception):
failing actor's remote env. failing actor's remote env.
''' '''
src_type_ref: Type[BaseException] = self.src_type
if not src_type_ref:
raise TypeError(
'Failed to lookup src error type:\n'
f'{self.src_type_str}'
)
# TODO: better tb insertion and all the fancier dunder # TODO: better tb insertion and all the fancier dunder
# metadata stuff as per `.__context__` etc. and friends: # metadata stuff as per `.__context__` etc. and friends:
# https://github.com/python-trio/trio/issues/611 # https://github.com/python-trio/trio/issues/611
src_type_ref: Type[BaseException] = self.src_type
return src_type_ref(self.tb_str) return src_type_ref(self.tb_str)
# TODO: local recontruction of nested inception for a given # TODO: local recontruction of nested inception for a given

View File

@ -35,7 +35,6 @@ from signal import (
signal, signal,
getsignal, getsignal,
SIGUSR1, SIGUSR1,
SIGINT,
) )
# import traceback # import traceback
from types import ModuleType from types import ModuleType
@ -49,7 +48,6 @@ from tractor import (
_state, _state,
log as logmod, log as logmod,
) )
from tractor.devx import _debug
log = logmod.get_logger(__name__) log = logmod.get_logger(__name__)
@ -78,45 +76,22 @@ def dump_task_tree() -> None:
) )
actor: Actor = _state.current_actor() actor: Actor = _state.current_actor()
thr: Thread = current_thread() thr: Thread = current_thread()
current_sigint_handler: Callable = getsignal(SIGINT)
if (
current_sigint_handler
is not
_debug.DebugStatus._trio_handler
):
sigint_handler_report: str = (
'The default `trio` SIGINT handler was replaced?!'
)
else:
sigint_handler_report: str = (
'The default `trio` SIGINT handler is in use?!'
)
# sclang symbology
# |_<object>
# |_(Task/Thread/Process/Actor
# |_{Supervisor/Scope
# |_[Storage/Memory/IPC-Stream/Data-Struct
log.devx( log.devx(
f'Dumping `stackscope` tree for actor\n' f'Dumping `stackscope` tree for actor\n'
f'(>: {actor.uid!r}\n' f'{actor.uid}:\n'
f' |_{mp.current_process()}\n' f'|_{mp.current_process()}\n'
f' |_{thr}\n' f' |_{thr}\n'
f' |_{actor}\n' f' |_{actor}\n\n'
f'\n'
f'{sigint_handler_report}\n'
f'signal.getsignal(SIGINT) -> {current_sigint_handler!r}\n'
# f'\n'
# start-of-trace-tree delimiter (mostly for testing) # start-of-trace-tree delimiter (mostly for testing)
# f'------ {actor.uid!r} ------\n' '------ - ------\n'
f'\n' '\n'
f'------ start-of-{actor.uid!r} ------\n' +
f'|\n' f'{tree_str}\n'
f'{tree_str}' +
# end-of-trace-tree delimiter (mostly for testing) # end-of-trace-tree delimiter (mostly for testing)
f'|\n' f'\n'
f'|_____ end-of-{actor.uid!r} ______\n' f'------ {actor.uid!r} ------\n'
) )
# TODO: can remove this right? # TODO: can remove this right?
# -[ ] was original code from author # -[ ] was original code from author
@ -148,11 +123,11 @@ def dump_tree_on_sig(
) -> None: ) -> None:
global _tree_dumped, _handler_lock global _tree_dumped, _handler_lock
with _handler_lock: with _handler_lock:
# if _tree_dumped: if _tree_dumped:
# log.warning( log.warning(
# 'Already dumped for this actor...??' 'Already dumped for this actor...??'
# ) )
# return return
_tree_dumped = True _tree_dumped = True
@ -186,9 +161,9 @@ def dump_tree_on_sig(
) )
raise raise
# log.devx( log.devx(
# 'Supposedly we dumped just fine..?' 'Supposedly we dumped just fine..?'
# ) )
if not relay_to_subs: if not relay_to_subs:
return return
@ -227,11 +202,11 @@ def enable_stack_on_sig(
(https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution) (https://www.gnu.org/software/bash/manual/bash.html#Command-Substitution)
you could use: you could use:
>> kill -SIGUSR1 $(pgrep -f <part-of-cmd: str>) >> kill -SIGUSR1 $(pgrep -f '<cmd>')
OR without a sub-shell, Or with with `xonsh` (which has diff capture-from-subproc syntax)
>> pkill --signal SIGUSR1 -f <part-of-cmd: str> >> kill -SIGUSR1 @$(pgrep -f '<cmd>')
''' '''
try: try:

File diff suppressed because it is too large Load Diff