Compare commits

..

No commits in common. "9133f42b07e69cbae2d8c5da077397b946cf4525" and "a870df68c0c91d510b0ca52c8cc98ca3d3a9b30b" have entirely different histories.

3 changed files with 91 additions and 263 deletions

View File

@ -289,35 +289,23 @@ async def aio_cancel():
''' '''
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
task = asyncio.current_task()
# cancel and enter sleep # cancel and enter sleep
task = asyncio.current_task()
task.cancel() task.cancel()
await aio_sleep_forever() await aio_sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
'''
When the `asyncio.Task` cancels itself the `trio` side cshould
also cancel and teardown and relay the cancellation cross-process
to the caller (parent).
'''
async def main(): async def main():
async with tractor.open_nursery() as n:
an: tractor.ActorNursery await n.run_in_actor(
async with tractor.open_nursery() as an:
p: tractor.Portal = await an.run_in_actor(
asyncio_actor, asyncio_actor,
target='aio_cancel', target='aio_cancel',
expect_err='tractor.to_asyncio.AsyncioCancelled', expect_err='tractor.to_asyncio.AsyncioCancelled',
infect_asyncio=True, infect_asyncio=True,
) )
# NOTE: normally the `an.__aexit__()` waits on the
# portal's result but we do it explicitly here
# to avoid indent levels.
with trio.fail_after(1):
await p.wait_for_result()
with pytest.raises( with pytest.raises(
expected_exception=(RemoteActorError, ExceptionGroup), expected_exception=(RemoteActorError, ExceptionGroup),
@ -325,7 +313,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
trio.run(main) trio.run(main)
# might get multiple `trio.Cancelled`s as well inside an inception # might get multiple `trio.Cancelled`s as well inside an inception
err: RemoteActorError|ExceptionGroup = excinfo.value err = excinfo.value
if isinstance(err, ExceptionGroup): if isinstance(err, ExceptionGroup):
err = next(itertools.dropwhile( err = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError), lambda exc: not isinstance(exc, tractor.RemoteActorError),
@ -333,8 +321,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
)) ))
assert err assert err
# relayed boxed error should be our `trio`-task's # ensure boxed error is correct
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
assert err.boxed_type == to_asyncio.AsyncioCancelled assert err.boxed_type == to_asyncio.AsyncioCancelled
@ -643,8 +630,6 @@ def test_echoserver_detailed_mechanics(
async def manage_file( async def manage_file(
ctx: tractor.Context, ctx: tractor.Context,
tmp_path_str: str, tmp_path_str: str,
send_sigint_to: str,
trio_side_is_shielded: bool = True,
bg_aio_task: bool = False, bg_aio_task: bool = False,
): ):
''' '''
@ -694,62 +679,33 @@ async def manage_file(
# => ????? honestly i'm lost but it seems to be some issue # => ????? honestly i'm lost but it seems to be some issue
# with `asyncio` and SIGINT.. # with `asyncio` and SIGINT..
# #
# XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and
# `run_as_asyncio_guest()` is written WITHOUT THE
# `.cancel_soon()` soln, both of these tests will pass ??
# so maybe it has something to do with `asyncio` loop init
# state?!?
# honestly, this REALLY reminds me why i haven't used # honestly, this REALLY reminds me why i haven't used
# `asyncio` by choice in years.. XD # `asyncio` by choice in years.. XD
# #
async with trio.open_nursery() as tn: # await tractor.to_asyncio.run_task(aio_sleep_forever)
if bg_aio_task: if bg_aio_task:
async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
tractor.to_asyncio.run_task, tractor.to_asyncio.run_task,
aio_sleep_forever, aio_sleep_forever,
) )
# XXX don't-need/doesn't-make-a-diff right await trio.sleep_forever()
# since we're already doing it from parent?
# if send_sigint_to == 'child':
# os.kill(
# os.getpid(),
# signal.SIGINT,
# )
# XXX spend a half sec doing shielded checkpointing to
# ensure that despite the `trio`-side task ignoring the
# SIGINT, the `asyncio` side won't abandon the guest-run!
if trio_side_is_shielded:
with trio.CancelScope(shield=True):
for i in range(5):
await trio.sleep(0.1)
await trio.sleep_forever()
# signalled manually at the OS level (aka KBI) by the parent actor. # signalled manually at the OS level (aka KBI) by the parent actor.
except KeyboardInterrupt: except KeyboardInterrupt:
print('child raised KBI..') print('child raised KBI..')
assert tmp_file.exists() assert tmp_file.exists()
raise raise
else:
raise RuntimeError('shoulda received a KBI?') raise RuntimeError('shoulda received a KBI?')
@pytest.mark.parametrize(
'trio_side_is_shielded',
[
False,
True,
],
ids=[
'trio_side_no_shielding',
'trio_side_does_shielded_work',
],
)
@pytest.mark.parametrize(
'send_sigint_to',
[
'child',
'parent',
],
ids='send_SIGINT_to={}'.format,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'bg_aio_task', 'bg_aio_task',
[ [
@ -784,9 +740,6 @@ def test_sigint_closes_lifetime_stack(
tmp_path: Path, tmp_path: Path,
wait_for_ctx: bool, wait_for_ctx: bool,
bg_aio_task: bool, bg_aio_task: bool,
trio_side_is_shielded: bool,
debug_mode: bool,
send_sigint_to: str,
): ):
''' '''
Ensure that an infected child can use the `Actor.lifetime_stack` Ensure that an infected child can use the `Actor.lifetime_stack`
@ -796,11 +749,8 @@ def test_sigint_closes_lifetime_stack(
''' '''
async def main(): async def main():
try: try:
an: tractor.ActorNursery async with tractor.open_nursery() as n:
async with tractor.open_nursery( p = await n.start_actor(
debug_mode=debug_mode,
) as an:
p: tractor.Portal = await an.start_actor(
'file_mngr', 'file_mngr',
enable_modules=[__name__], enable_modules=[__name__],
infect_asyncio=True, infect_asyncio=True,
@ -808,9 +758,7 @@ def test_sigint_closes_lifetime_stack(
async with p.open_context( async with p.open_context(
manage_file, manage_file,
tmp_path_str=str(tmp_path), tmp_path_str=str(tmp_path),
send_sigint_to=send_sigint_to,
bg_aio_task=bg_aio_task, bg_aio_task=bg_aio_task,
trio_side_is_shielded=trio_side_is_shielded,
) as (ctx, first): ) as (ctx, first):
path_str, cpid = first path_str, cpid = first
@ -829,13 +777,10 @@ def test_sigint_closes_lifetime_stack(
# shm-buffer leaks in `piker`'s live quote stream # shm-buffer leaks in `piker`'s live quote stream
# susbys! # susbys!
# #
# await trio.sleep(.5)
await trio.sleep(.2) await trio.sleep(.2)
pid: int = (
cpid if send_sigint_to == 'child'
else os.getpid()
)
os.kill( os.kill(
pid, cpid,
signal.SIGINT, signal.SIGINT,
) )
@ -845,7 +790,7 @@ def test_sigint_closes_lifetime_stack(
if wait_for_ctx: if wait_for_ctx:
print('waiting for ctx outcome in parent..') print('waiting for ctx outcome in parent..')
try: try:
with trio.fail_after(1): with trio.fail_after(.7):
await ctx.wait_for_result() await ctx.wait_for_result()
except tractor.ContextCancelled as ctxc: except tractor.ContextCancelled as ctxc:
assert ctxc.canceller == ctx.chan.uid assert ctxc.canceller == ctx.chan.uid

View File

@ -929,17 +929,6 @@ class MessagingError(Exception):
''' '''
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
NOTE: this should NOT inherit from `asyncio.CancelledError` or
tests should break!
'''
def pack_error( def pack_error(
exc: BaseException|RemoteActorError, exc: BaseException|RemoteActorError,

View File

@ -33,12 +33,11 @@ from typing import (
) )
import tractor import tractor
from tractor._exceptions import AsyncioCancelled
from tractor._state import ( from tractor._state import (
debug_mode, debug_mode,
) )
from tractor.devx import _debug
from tractor.log import get_logger from tractor.log import get_logger
from tractor.devx import _debug
from tractor.trionics._broadcast import ( from tractor.trionics._broadcast import (
broadcast_receiver, broadcast_receiver,
BroadcastReceiver, BroadcastReceiver,
@ -52,10 +51,7 @@ from outcome import (
log = get_logger(__name__) log = get_logger(__name__)
__all__ = [ __all__ = ['run_task', 'run_as_asyncio_guest']
'run_task',
'run_as_asyncio_guest',
]
@dataclass @dataclass
@ -159,16 +155,15 @@ def _run_asyncio_task(
*, *,
qsize: int = 1, qsize: int = 1,
provide_channels: bool = False, provide_channels: bool = False,
hide_tb: bool = False,
**kwargs, **kwargs,
) -> LinkedTaskChannel: ) -> LinkedTaskChannel:
''' '''
Run an ``asyncio`` async function or generator in a task, return Run an ``asyncio`` async function or generator in a task, return
or stream the result back to the caller `trio.lowleve.Task`. or stream the result back to ``trio``.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__ = True
if not tractor.current_actor().is_infected_aio(): if not tractor.current_actor().is_infected_aio():
raise RuntimeError( raise RuntimeError(
"`infect_asyncio` mode is not enabled!?" "`infect_asyncio` mode is not enabled!?"
@ -229,7 +224,6 @@ def _run_asyncio_task(
try: try:
result = await coro result = await coro
except BaseException as aio_err: except BaseException as aio_err:
chan._aio_err = aio_err
if isinstance(aio_err, CancelledError): if isinstance(aio_err, CancelledError):
log.runtime( log.runtime(
'`asyncio` task was cancelled..\n' '`asyncio` task was cancelled..\n'
@ -238,6 +232,7 @@ def _run_asyncio_task(
log.exception( log.exception(
'`asyncio` task errored\n' '`asyncio` task errored\n'
) )
chan._aio_err = aio_err
raise raise
else: else:
@ -273,7 +268,7 @@ def _run_asyncio_task(
aio_task_complete aio_task_complete
) )
) )
chan._aio_task: asyncio.Task = task chan._aio_task = task
# XXX TODO XXX get this actually workin.. XD # XXX TODO XXX get this actually workin.. XD
# maybe setup `greenback` for `asyncio`-side task REPLing # maybe setup `greenback` for `asyncio`-side task REPLing
@ -289,19 +284,19 @@ def _run_asyncio_task(
def cancel_trio(task: asyncio.Task) -> None: def cancel_trio(task: asyncio.Task) -> None:
''' '''
Cancel the calling `trio` task on error. Cancel the calling ``trio`` task on error.
''' '''
nonlocal chan nonlocal chan
aio_err: BaseException|None = chan._aio_err aio_err = chan._aio_err
task_err: BaseException|None = None task_err: BaseException|None = None
# only to avoid `asyncio` complaining about uncaptured # only to avoid ``asyncio`` complaining about uncaptured
# task exceptions # task exceptions
try: try:
res: Any = task.result() task.exception()
except BaseException as terr: except BaseException as terr:
task_err: BaseException = terr task_err = terr
msg: str = ( msg: str = (
'Infected `asyncio` task {etype_str}\n' 'Infected `asyncio` task {etype_str}\n'
@ -333,49 +328,42 @@ def _run_asyncio_task(
if task_err is None: if task_err is None:
assert aio_err assert aio_err
# wait, wut? aio_err.with_traceback(aio_err.__traceback__)
# aio_err.with_traceback(aio_err.__traceback__) # log.error(
# 'infected task errorred'
# )
# TODO: show when cancellation originated # TODO: show that the cancellation originated
# from each side more pedantically? # from the ``trio`` side? right?
# elif ( # elif type(aio_err) is CancelledError:
# type(aio_err) is CancelledError
# and # trio was the cause?
# cancel_scope.cancel_called
# ):
# log.cancel( # log.cancel(
# 'infected task was cancelled by `trio`-side' # 'infected task was cancelled'
# ) # )
# raise aio_err from task_err
# XXX: if not already, alway cancel the scope # if cancel_scope.cancelled:
# on a task error in case the trio task is blocking on # raise aio_err from err
# a checkpoint.
# XXX: alway cancel the scope on error
# in case the trio task is blocking
# on a checkpoint.
cancel_scope.cancel() cancel_scope.cancel()
if (
task_err
and
aio_err is not task_err
):
raise aio_err from task_err
# raise any `asyncio` side error. # raise any `asyncio` side error.
raise aio_err raise aio_err
log.info(
'`trio` received final result from {task}\n'
f'|_{res}\n'
)
# TODO: do we need this?
# if task_err:
# cancel_scope.cancel()
# raise task_err
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
return chan return chan
class AsyncioCancelled(CancelledError):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
'''
@acm @acm
async def translate_aio_errors( async def translate_aio_errors(
@ -398,9 +386,7 @@ async def translate_aio_errors(
) -> None: ) -> None:
aio_err = chan._aio_err aio_err = chan._aio_err
if ( if (
aio_err is not None aio_err is not None and
and
# not isinstance(aio_err, CancelledError)
type(aio_err) != CancelledError type(aio_err) != CancelledError
): ):
# always raise from any captured asyncio error # always raise from any captured asyncio error
@ -432,17 +418,13 @@ async def translate_aio_errors(
): ):
aio_err = chan._aio_err aio_err = chan._aio_err
if ( if (
task.cancelled() task.cancelled() and
and
type(aio_err) is CancelledError type(aio_err) is CancelledError
): ):
# if an underlying `asyncio.CancelledError` triggered this # if an underlying ``asyncio.CancelledError`` triggered this
# channel close, raise our (non-``BaseException``) wrapper # channel close, raise our (non-``BaseException``) wrapper
# error: ``AsyncioCancelled`` from that source error. # error: ``AsyncioCancelled`` from that source error.
raise AsyncioCancelled( raise AsyncioCancelled from aio_err
f'Task cancelled\n'
f'|_{task}\n'
) from aio_err
else: else:
raise raise
@ -485,8 +467,8 @@ async def run_task(
) -> Any: ) -> Any:
''' '''
Run an `asyncio` async function or generator in a task, return Run an ``asyncio`` async function or generator in a task, return
or stream the result back to `trio`. or stream the result back to ``trio``.
''' '''
# simple async func # simple async func
@ -544,27 +526,10 @@ async def open_channel_from(
chan._to_trio.close() chan._to_trio.close()
class AsyncioRuntimeTranslationError(RuntimeError):
'''
We failed to correctly relay runtime semantics and/or maintain SC
supervision rules cross-event-loop.
'''
def run_as_asyncio_guest( def run_as_asyncio_guest(
trio_main: Callable, trio_main: Callable,
# ^-NOTE-^ when spawned with `infected_aio=True` this func is
# normally `Actor._async_main()` as is passed by some boostrap
# entrypoint like `._entry._trio_main()`.
) -> None: ) -> None:
# ^-TODO-^ technically whatever `trio_main` returns.. we should
# try to use func-typevar-params at leaast by 3.13!
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#callback-protocols
# -[ ] https://peps.python.org/pep-0646/#using-type-variable-tuples-in-functions
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#unpack-for-keyword-arguments
# -[ ] https://peps.python.org/pep-0718/
''' '''
Entry for an "infected ``asyncio`` actor". Entry for an "infected ``asyncio`` actor".
@ -590,13 +555,7 @@ def run_as_asyncio_guest(
# :) # :)
async def aio_main(trio_main): async def aio_main(trio_main):
'''
Main `asyncio.Task` which calls
`trio.lowlevel.start_guest_run()` to "infect" the `asyncio`
event-loop by embedding the `trio` scheduler allowing us to
boot the `tractor` runtime and connect back to our parent.
'''
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future() trio_done_fut = asyncio.Future()
startup_msg: str = ( startup_msg: str = (
@ -605,22 +564,17 @@ def run_as_asyncio_guest(
'-> built a `trio`-done future\n' '-> built a `trio`-done future\n'
) )
# TODO: shoudn't this be done in the guest-run trio task? if debug_mode():
# if debug_mode(): # XXX make it obvi we know this isn't supported yet!
# # XXX make it obvi we know this isn't supported yet! log.error(
# log.error( 'Attempting to enter unsupported `greenback` init '
# 'Attempting to enter unsupported `greenback` init ' 'from `asyncio` task..'
# 'from `asyncio` task..' )
# ) await _debug.maybe_init_greenback(
# await _debug.maybe_init_greenback( force_reload=True,
# force_reload=True, )
# )
def trio_done_callback(main_outcome): def trio_done_callback(main_outcome):
log.info(
f'trio_main finished with\n'
f'|_{main_outcome!r}'
)
if isinstance(main_outcome, Error): if isinstance(main_outcome, Error):
error: BaseException = main_outcome.error error: BaseException = main_outcome.error
@ -640,6 +594,7 @@ def run_as_asyncio_guest(
else: else:
trio_done_fut.set_result(main_outcome) trio_done_fut.set_result(main_outcome)
log.runtime(f'trio_main finished: {main_outcome!r}')
startup_msg += ( startup_msg += (
f'-> created {trio_done_callback!r}\n' f'-> created {trio_done_callback!r}\n'
@ -658,48 +613,26 @@ def run_as_asyncio_guest(
run_sync_soon_threadsafe=loop.call_soon_threadsafe, run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback, done_callback=trio_done_callback,
) )
fute_err: BaseException|None = None
try: try:
# TODO: better SIGINT handling since shielding seems to
# make NO DIFFERENCE XD
# -[ ] maybe this is due to 3.11's recent SIGINT handling
# changes and we can better work with/around it?
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
out: Outcome = await asyncio.shield(trio_done_fut) out: Outcome = await asyncio.shield(trio_done_fut)
# NOTE `Error.unwrap()` will raise
# NOTE will raise (via `Error.unwrap()`) from any
# exception packed into the guest-run's `main_outcome`.
return out.unwrap() return out.unwrap()
except ( except asyncio.CancelledError:
# XXX special SIGINT-handling is required since
# `asyncio.shield()`-ing seems to NOT handle that case as
# per recent changes in 3.11:
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
#
# NOTE: further, apparently ONLY need to handle this
# special SIGINT case since all other `asyncio`-side
# errors can be processed via our `chan._aio_err`
# relaying (right?); SIGINT seems to be totally diff
# error path in `asyncio`'s runtime..?
asyncio.CancelledError,
) as fute_err:
err_message: str = (
'main `asyncio` task '
)
if isinstance(fute_err, asyncio.CancelledError):
err_message += 'was cancelled!\n'
else:
err_message += f'errored with {out.error!r}\n'
actor: tractor.Actor = tractor.current_actor() actor: tractor.Actor = tractor.current_actor()
log.exception( log.exception(
err_message '`asyncio`-side main task was cancelled!\n'
+ 'Cancelling actor-runtime..\n'
'Cancelling `trio`-side `tractor`-runtime..\n'
f'c)>\n' f'c)>\n'
f' |_{actor}.cancel_soon()\n' f' |_{actor}.cancel_soon()\n'
) )
# TODO: reduce this comment bloc since abandon issues are
# now solved?
#
# XXX NOTE XXX the next LOC is super important!!! # XXX NOTE XXX the next LOC is super important!!!
# => without it, we can get a guest-run abandonment case # => without it, we can get a guest-run abandonment case
# where asyncio will not trigger `trio` in a final event # where asyncio will not trigger `trio` in a final event
@ -748,55 +681,16 @@ def run_as_asyncio_guest(
# is apparently a working fix! # is apparently a working fix!
actor.cancel_soon() actor.cancel_soon()
# XXX NOTE XXX pump the `asyncio` event-loop to allow # XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to
# `trio`-side to `trio`-guest-run to complete and # `trio`-guest-run to complete and teardown !!
# teardown !!
# #
# *WITHOUT THIS* the guest-run can get race-conditionally abandoned!! # XXX WITHOUT THIS the guest-run gets race-conditionally
# XD # abandoned by `asyncio`!!
# # XD XD XD
await asyncio.sleep(.1) # `delay` can't be 0 either XD await asyncio.shield(
while not trio_done_fut.done(): asyncio.sleep(.1) # NOPE! it can't be 0 either XD
log.runtime( )
'Waiting on main guest-run `asyncio` task to complete..\n' raise
f'|_trio_done_fut: {trio_done_fut}\n'
)
await asyncio.sleep(.1)
# XXX: don't actually need the shield.. seems to
# make no difference (??) and we know it spawns an
# internal task..
# await asyncio.shield(asyncio.sleep(.1))
# XXX alt approach but can block indefinitely..
# so don't use?
# loop._run_once()
try:
return trio_done_fut.result()
except asyncio.exceptions.InvalidStateError as state_err:
# XXX be super dupere noisy about abandonment issues!
aio_task: asyncio.Task = asyncio.current_task()
message: str = (
'The `asyncio`-side task likely exited before the '
'`trio`-side guest-run completed!\n\n'
)
if fute_err:
message += (
f'The main {aio_task}\n'
f'STOPPED due to {type(fute_err)}\n\n'
)
message += (
f'Likely something inside our guest-run-as-task impl is '
f'not effectively waiting on the `trio`-side to complete ?!\n'
f'This code -> {aio_main!r}\n\n'
'Below you will likely see a '
'"RuntimeWarning: Trio guest run got abandoned.." !!\n'
)
raise AsyncioRuntimeTranslationError(message) from state_err
# might as well if it's installed. # might as well if it's installed.
try: try:
@ -804,7 +698,7 @@ def run_as_asyncio_guest(
loop = uvloop.new_event_loop() loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
except ImportError: except ImportError:
log.runtime('`uvloop` not available..') pass
return asyncio.run( return asyncio.run(
aio_main(trio_main), aio_main(trio_main),