Compare commits
No commits in common. "9133f42b07e69cbae2d8c5da077397b946cf4525" and "a870df68c0c91d510b0ca52c8cc98ca3d3a9b30b" have entirely different histories.
9133f42b07
...
a870df68c0
|
@ -289,35 +289,23 @@ async def aio_cancel():
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(0.5)
|
||||||
|
task = asyncio.current_task()
|
||||||
|
|
||||||
# cancel and enter sleep
|
# cancel and enter sleep
|
||||||
task = asyncio.current_task()
|
|
||||||
task.cancel()
|
task.cancel()
|
||||||
await aio_sleep_forever()
|
await aio_sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
'''
|
|
||||||
When the `asyncio.Task` cancels itself the `trio` side cshould
|
|
||||||
also cancel and teardown and relay the cancellation cross-process
|
|
||||||
to the caller (parent).
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def main():
|
async def main():
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
an: tractor.ActorNursery
|
await n.run_in_actor(
|
||||||
async with tractor.open_nursery() as an:
|
|
||||||
p: tractor.Portal = await an.run_in_actor(
|
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='aio_cancel',
|
target='aio_cancel',
|
||||||
expect_err='tractor.to_asyncio.AsyncioCancelled',
|
expect_err='tractor.to_asyncio.AsyncioCancelled',
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
# NOTE: normally the `an.__aexit__()` waits on the
|
|
||||||
# portal's result but we do it explicitly here
|
|
||||||
# to avoid indent levels.
|
|
||||||
with trio.fail_after(1):
|
|
||||||
await p.wait_for_result()
|
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
expected_exception=(RemoteActorError, ExceptionGroup),
|
expected_exception=(RemoteActorError, ExceptionGroup),
|
||||||
|
@ -325,7 +313,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||||
err: RemoteActorError|ExceptionGroup = excinfo.value
|
err = excinfo.value
|
||||||
if isinstance(err, ExceptionGroup):
|
if isinstance(err, ExceptionGroup):
|
||||||
err = next(itertools.dropwhile(
|
err = next(itertools.dropwhile(
|
||||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
||||||
|
@ -333,8 +321,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
))
|
))
|
||||||
assert err
|
assert err
|
||||||
|
|
||||||
# relayed boxed error should be our `trio`-task's
|
# ensure boxed error is correct
|
||||||
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
|
||||||
assert err.boxed_type == to_asyncio.AsyncioCancelled
|
assert err.boxed_type == to_asyncio.AsyncioCancelled
|
||||||
|
|
||||||
|
|
||||||
|
@ -643,8 +630,6 @@ def test_echoserver_detailed_mechanics(
|
||||||
async def manage_file(
|
async def manage_file(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
tmp_path_str: str,
|
tmp_path_str: str,
|
||||||
send_sigint_to: str,
|
|
||||||
trio_side_is_shielded: bool = True,
|
|
||||||
bg_aio_task: bool = False,
|
bg_aio_task: bool = False,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -694,62 +679,33 @@ async def manage_file(
|
||||||
# => ????? honestly i'm lost but it seems to be some issue
|
# => ????? honestly i'm lost but it seems to be some issue
|
||||||
# with `asyncio` and SIGINT..
|
# with `asyncio` and SIGINT..
|
||||||
#
|
#
|
||||||
|
# XXX NOTE XXX SO, if this LINE IS UNCOMMENTED and
|
||||||
|
# `run_as_asyncio_guest()` is written WITHOUT THE
|
||||||
|
# `.cancel_soon()` soln, both of these tests will pass ??
|
||||||
|
# so maybe it has something to do with `asyncio` loop init
|
||||||
|
# state?!?
|
||||||
# honestly, this REALLY reminds me why i haven't used
|
# honestly, this REALLY reminds me why i haven't used
|
||||||
# `asyncio` by choice in years.. XD
|
# `asyncio` by choice in years.. XD
|
||||||
#
|
#
|
||||||
async with trio.open_nursery() as tn:
|
# await tractor.to_asyncio.run_task(aio_sleep_forever)
|
||||||
if bg_aio_task:
|
if bg_aio_task:
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
tractor.to_asyncio.run_task,
|
tractor.to_asyncio.run_task,
|
||||||
aio_sleep_forever,
|
aio_sleep_forever,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX don't-need/doesn't-make-a-diff right
|
await trio.sleep_forever()
|
||||||
# since we're already doing it from parent?
|
|
||||||
# if send_sigint_to == 'child':
|
|
||||||
# os.kill(
|
|
||||||
# os.getpid(),
|
|
||||||
# signal.SIGINT,
|
|
||||||
# )
|
|
||||||
|
|
||||||
# XXX spend a half sec doing shielded checkpointing to
|
|
||||||
# ensure that despite the `trio`-side task ignoring the
|
|
||||||
# SIGINT, the `asyncio` side won't abandon the guest-run!
|
|
||||||
if trio_side_is_shielded:
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
for i in range(5):
|
|
||||||
await trio.sleep(0.1)
|
|
||||||
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
# signalled manually at the OS level (aka KBI) by the parent actor.
|
# signalled manually at the OS level (aka KBI) by the parent actor.
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print('child raised KBI..')
|
print('child raised KBI..')
|
||||||
assert tmp_file.exists()
|
assert tmp_file.exists()
|
||||||
raise
|
raise
|
||||||
|
else:
|
||||||
raise RuntimeError('shoulda received a KBI?')
|
raise RuntimeError('shoulda received a KBI?')
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'trio_side_is_shielded',
|
|
||||||
[
|
|
||||||
False,
|
|
||||||
True,
|
|
||||||
],
|
|
||||||
ids=[
|
|
||||||
'trio_side_no_shielding',
|
|
||||||
'trio_side_does_shielded_work',
|
|
||||||
],
|
|
||||||
)
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'send_sigint_to',
|
|
||||||
[
|
|
||||||
'child',
|
|
||||||
'parent',
|
|
||||||
],
|
|
||||||
ids='send_SIGINT_to={}'.format,
|
|
||||||
)
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'bg_aio_task',
|
'bg_aio_task',
|
||||||
[
|
[
|
||||||
|
@ -784,9 +740,6 @@ def test_sigint_closes_lifetime_stack(
|
||||||
tmp_path: Path,
|
tmp_path: Path,
|
||||||
wait_for_ctx: bool,
|
wait_for_ctx: bool,
|
||||||
bg_aio_task: bool,
|
bg_aio_task: bool,
|
||||||
trio_side_is_shielded: bool,
|
|
||||||
debug_mode: bool,
|
|
||||||
send_sigint_to: str,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Ensure that an infected child can use the `Actor.lifetime_stack`
|
Ensure that an infected child can use the `Actor.lifetime_stack`
|
||||||
|
@ -796,11 +749,8 @@ def test_sigint_closes_lifetime_stack(
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
try:
|
try:
|
||||||
an: tractor.ActorNursery
|
async with tractor.open_nursery() as n:
|
||||||
async with tractor.open_nursery(
|
p = await n.start_actor(
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as an:
|
|
||||||
p: tractor.Portal = await an.start_actor(
|
|
||||||
'file_mngr',
|
'file_mngr',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -808,9 +758,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
async with p.open_context(
|
async with p.open_context(
|
||||||
manage_file,
|
manage_file,
|
||||||
tmp_path_str=str(tmp_path),
|
tmp_path_str=str(tmp_path),
|
||||||
send_sigint_to=send_sigint_to,
|
|
||||||
bg_aio_task=bg_aio_task,
|
bg_aio_task=bg_aio_task,
|
||||||
trio_side_is_shielded=trio_side_is_shielded,
|
|
||||||
) as (ctx, first):
|
) as (ctx, first):
|
||||||
|
|
||||||
path_str, cpid = first
|
path_str, cpid = first
|
||||||
|
@ -829,13 +777,10 @@ def test_sigint_closes_lifetime_stack(
|
||||||
# shm-buffer leaks in `piker`'s live quote stream
|
# shm-buffer leaks in `piker`'s live quote stream
|
||||||
# susbys!
|
# susbys!
|
||||||
#
|
#
|
||||||
|
# await trio.sleep(.5)
|
||||||
await trio.sleep(.2)
|
await trio.sleep(.2)
|
||||||
pid: int = (
|
|
||||||
cpid if send_sigint_to == 'child'
|
|
||||||
else os.getpid()
|
|
||||||
)
|
|
||||||
os.kill(
|
os.kill(
|
||||||
pid,
|
cpid,
|
||||||
signal.SIGINT,
|
signal.SIGINT,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -845,7 +790,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
if wait_for_ctx:
|
if wait_for_ctx:
|
||||||
print('waiting for ctx outcome in parent..')
|
print('waiting for ctx outcome in parent..')
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(.7):
|
||||||
await ctx.wait_for_result()
|
await ctx.wait_for_result()
|
||||||
except tractor.ContextCancelled as ctxc:
|
except tractor.ContextCancelled as ctxc:
|
||||||
assert ctxc.canceller == ctx.chan.uid
|
assert ctxc.canceller == ctx.chan.uid
|
||||||
|
|
|
@ -929,17 +929,6 @@ class MessagingError(Exception):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
class AsyncioCancelled(Exception):
|
|
||||||
'''
|
|
||||||
Asyncio cancelled translation (non-base) error
|
|
||||||
for use with the ``to_asyncio`` module
|
|
||||||
to be raised in the ``trio`` side task
|
|
||||||
|
|
||||||
NOTE: this should NOT inherit from `asyncio.CancelledError` or
|
|
||||||
tests should break!
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
|
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException|RemoteActorError,
|
exc: BaseException|RemoteActorError,
|
||||||
|
|
|
@ -33,12 +33,11 @@ from typing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._exceptions import AsyncioCancelled
|
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
)
|
)
|
||||||
from tractor.devx import _debug
|
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
from tractor.devx import _debug
|
||||||
from tractor.trionics._broadcast import (
|
from tractor.trionics._broadcast import (
|
||||||
broadcast_receiver,
|
broadcast_receiver,
|
||||||
BroadcastReceiver,
|
BroadcastReceiver,
|
||||||
|
@ -52,10 +51,7 @@ from outcome import (
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = ['run_task', 'run_as_asyncio_guest']
|
||||||
'run_task',
|
|
||||||
'run_as_asyncio_guest',
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
@ -159,16 +155,15 @@ def _run_asyncio_task(
|
||||||
*,
|
*,
|
||||||
qsize: int = 1,
|
qsize: int = 1,
|
||||||
provide_channels: bool = False,
|
provide_channels: bool = False,
|
||||||
hide_tb: bool = False,
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> LinkedTaskChannel:
|
) -> LinkedTaskChannel:
|
||||||
'''
|
'''
|
||||||
Run an ``asyncio`` async function or generator in a task, return
|
Run an ``asyncio`` async function or generator in a task, return
|
||||||
or stream the result back to the caller `trio.lowleve.Task`.
|
or stream the result back to ``trio``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__ = True
|
||||||
if not tractor.current_actor().is_infected_aio():
|
if not tractor.current_actor().is_infected_aio():
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"`infect_asyncio` mode is not enabled!?"
|
"`infect_asyncio` mode is not enabled!?"
|
||||||
|
@ -229,7 +224,6 @@ def _run_asyncio_task(
|
||||||
try:
|
try:
|
||||||
result = await coro
|
result = await coro
|
||||||
except BaseException as aio_err:
|
except BaseException as aio_err:
|
||||||
chan._aio_err = aio_err
|
|
||||||
if isinstance(aio_err, CancelledError):
|
if isinstance(aio_err, CancelledError):
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'`asyncio` task was cancelled..\n'
|
'`asyncio` task was cancelled..\n'
|
||||||
|
@ -238,6 +232,7 @@ def _run_asyncio_task(
|
||||||
log.exception(
|
log.exception(
|
||||||
'`asyncio` task errored\n'
|
'`asyncio` task errored\n'
|
||||||
)
|
)
|
||||||
|
chan._aio_err = aio_err
|
||||||
raise
|
raise
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -273,7 +268,7 @@ def _run_asyncio_task(
|
||||||
aio_task_complete
|
aio_task_complete
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
chan._aio_task: asyncio.Task = task
|
chan._aio_task = task
|
||||||
|
|
||||||
# XXX TODO XXX get this actually workin.. XD
|
# XXX TODO XXX get this actually workin.. XD
|
||||||
# maybe setup `greenback` for `asyncio`-side task REPLing
|
# maybe setup `greenback` for `asyncio`-side task REPLing
|
||||||
|
@ -289,19 +284,19 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
def cancel_trio(task: asyncio.Task) -> None:
|
def cancel_trio(task: asyncio.Task) -> None:
|
||||||
'''
|
'''
|
||||||
Cancel the calling `trio` task on error.
|
Cancel the calling ``trio`` task on error.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
nonlocal chan
|
nonlocal chan
|
||||||
aio_err: BaseException|None = chan._aio_err
|
aio_err = chan._aio_err
|
||||||
task_err: BaseException|None = None
|
task_err: BaseException|None = None
|
||||||
|
|
||||||
# only to avoid `asyncio` complaining about uncaptured
|
# only to avoid ``asyncio`` complaining about uncaptured
|
||||||
# task exceptions
|
# task exceptions
|
||||||
try:
|
try:
|
||||||
res: Any = task.result()
|
task.exception()
|
||||||
except BaseException as terr:
|
except BaseException as terr:
|
||||||
task_err: BaseException = terr
|
task_err = terr
|
||||||
|
|
||||||
msg: str = (
|
msg: str = (
|
||||||
'Infected `asyncio` task {etype_str}\n'
|
'Infected `asyncio` task {etype_str}\n'
|
||||||
|
@ -333,49 +328,42 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
if task_err is None:
|
if task_err is None:
|
||||||
assert aio_err
|
assert aio_err
|
||||||
# wait, wut?
|
aio_err.with_traceback(aio_err.__traceback__)
|
||||||
# aio_err.with_traceback(aio_err.__traceback__)
|
# log.error(
|
||||||
|
# 'infected task errorred'
|
||||||
|
# )
|
||||||
|
|
||||||
# TODO: show when cancellation originated
|
# TODO: show that the cancellation originated
|
||||||
# from each side more pedantically?
|
# from the ``trio`` side? right?
|
||||||
# elif (
|
# elif type(aio_err) is CancelledError:
|
||||||
# type(aio_err) is CancelledError
|
|
||||||
# and # trio was the cause?
|
|
||||||
# cancel_scope.cancel_called
|
|
||||||
# ):
|
|
||||||
# log.cancel(
|
# log.cancel(
|
||||||
# 'infected task was cancelled by `trio`-side'
|
# 'infected task was cancelled'
|
||||||
# )
|
# )
|
||||||
# raise aio_err from task_err
|
|
||||||
|
|
||||||
# XXX: if not already, alway cancel the scope
|
# if cancel_scope.cancelled:
|
||||||
# on a task error in case the trio task is blocking on
|
# raise aio_err from err
|
||||||
# a checkpoint.
|
|
||||||
|
# XXX: alway cancel the scope on error
|
||||||
|
# in case the trio task is blocking
|
||||||
|
# on a checkpoint.
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
|
||||||
if (
|
|
||||||
task_err
|
|
||||||
and
|
|
||||||
aio_err is not task_err
|
|
||||||
):
|
|
||||||
raise aio_err from task_err
|
|
||||||
|
|
||||||
# raise any `asyncio` side error.
|
# raise any `asyncio` side error.
|
||||||
raise aio_err
|
raise aio_err
|
||||||
|
|
||||||
log.info(
|
|
||||||
'`trio` received final result from {task}\n'
|
|
||||||
f'|_{res}\n'
|
|
||||||
)
|
|
||||||
# TODO: do we need this?
|
|
||||||
# if task_err:
|
|
||||||
# cancel_scope.cancel()
|
|
||||||
# raise task_err
|
|
||||||
|
|
||||||
task.add_done_callback(cancel_trio)
|
task.add_done_callback(cancel_trio)
|
||||||
return chan
|
return chan
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncioCancelled(CancelledError):
|
||||||
|
'''
|
||||||
|
Asyncio cancelled translation (non-base) error
|
||||||
|
for use with the ``to_asyncio`` module
|
||||||
|
to be raised in the ``trio`` side task
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def translate_aio_errors(
|
async def translate_aio_errors(
|
||||||
|
|
||||||
|
@ -398,9 +386,7 @@ async def translate_aio_errors(
|
||||||
) -> None:
|
) -> None:
|
||||||
aio_err = chan._aio_err
|
aio_err = chan._aio_err
|
||||||
if (
|
if (
|
||||||
aio_err is not None
|
aio_err is not None and
|
||||||
and
|
|
||||||
# not isinstance(aio_err, CancelledError)
|
|
||||||
type(aio_err) != CancelledError
|
type(aio_err) != CancelledError
|
||||||
):
|
):
|
||||||
# always raise from any captured asyncio error
|
# always raise from any captured asyncio error
|
||||||
|
@ -432,17 +418,13 @@ async def translate_aio_errors(
|
||||||
):
|
):
|
||||||
aio_err = chan._aio_err
|
aio_err = chan._aio_err
|
||||||
if (
|
if (
|
||||||
task.cancelled()
|
task.cancelled() and
|
||||||
and
|
|
||||||
type(aio_err) is CancelledError
|
type(aio_err) is CancelledError
|
||||||
):
|
):
|
||||||
# if an underlying `asyncio.CancelledError` triggered this
|
# if an underlying ``asyncio.CancelledError`` triggered this
|
||||||
# channel close, raise our (non-``BaseException``) wrapper
|
# channel close, raise our (non-``BaseException``) wrapper
|
||||||
# error: ``AsyncioCancelled`` from that source error.
|
# error: ``AsyncioCancelled`` from that source error.
|
||||||
raise AsyncioCancelled(
|
raise AsyncioCancelled from aio_err
|
||||||
f'Task cancelled\n'
|
|
||||||
f'|_{task}\n'
|
|
||||||
) from aio_err
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
@ -485,8 +467,8 @@ async def run_task(
|
||||||
|
|
||||||
) -> Any:
|
) -> Any:
|
||||||
'''
|
'''
|
||||||
Run an `asyncio` async function or generator in a task, return
|
Run an ``asyncio`` async function or generator in a task, return
|
||||||
or stream the result back to `trio`.
|
or stream the result back to ``trio``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# simple async func
|
# simple async func
|
||||||
|
@ -544,27 +526,10 @@ async def open_channel_from(
|
||||||
chan._to_trio.close()
|
chan._to_trio.close()
|
||||||
|
|
||||||
|
|
||||||
class AsyncioRuntimeTranslationError(RuntimeError):
|
|
||||||
'''
|
|
||||||
We failed to correctly relay runtime semantics and/or maintain SC
|
|
||||||
supervision rules cross-event-loop.
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
|
|
||||||
def run_as_asyncio_guest(
|
def run_as_asyncio_guest(
|
||||||
trio_main: Callable,
|
trio_main: Callable,
|
||||||
# ^-NOTE-^ when spawned with `infected_aio=True` this func is
|
|
||||||
# normally `Actor._async_main()` as is passed by some boostrap
|
|
||||||
# entrypoint like `._entry._trio_main()`.
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# ^-TODO-^ technically whatever `trio_main` returns.. we should
|
|
||||||
# try to use func-typevar-params at leaast by 3.13!
|
|
||||||
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#callback-protocols
|
|
||||||
# -[ ] https://peps.python.org/pep-0646/#using-type-variable-tuples-in-functions
|
|
||||||
# -[ ] https://typing.readthedocs.io/en/latest/spec/callables.html#unpack-for-keyword-arguments
|
|
||||||
# -[ ] https://peps.python.org/pep-0718/
|
|
||||||
'''
|
'''
|
||||||
Entry for an "infected ``asyncio`` actor".
|
Entry for an "infected ``asyncio`` actor".
|
||||||
|
|
||||||
|
@ -590,13 +555,7 @@ def run_as_asyncio_guest(
|
||||||
# :)
|
# :)
|
||||||
|
|
||||||
async def aio_main(trio_main):
|
async def aio_main(trio_main):
|
||||||
'''
|
|
||||||
Main `asyncio.Task` which calls
|
|
||||||
`trio.lowlevel.start_guest_run()` to "infect" the `asyncio`
|
|
||||||
event-loop by embedding the `trio` scheduler allowing us to
|
|
||||||
boot the `tractor` runtime and connect back to our parent.
|
|
||||||
|
|
||||||
'''
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
trio_done_fut = asyncio.Future()
|
trio_done_fut = asyncio.Future()
|
||||||
startup_msg: str = (
|
startup_msg: str = (
|
||||||
|
@ -605,22 +564,17 @@ def run_as_asyncio_guest(
|
||||||
'-> built a `trio`-done future\n'
|
'-> built a `trio`-done future\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: shoudn't this be done in the guest-run trio task?
|
if debug_mode():
|
||||||
# if debug_mode():
|
# XXX make it obvi we know this isn't supported yet!
|
||||||
# # XXX make it obvi we know this isn't supported yet!
|
log.error(
|
||||||
# log.error(
|
'Attempting to enter unsupported `greenback` init '
|
||||||
# 'Attempting to enter unsupported `greenback` init '
|
'from `asyncio` task..'
|
||||||
# 'from `asyncio` task..'
|
)
|
||||||
# )
|
await _debug.maybe_init_greenback(
|
||||||
# await _debug.maybe_init_greenback(
|
force_reload=True,
|
||||||
# force_reload=True,
|
)
|
||||||
# )
|
|
||||||
|
|
||||||
def trio_done_callback(main_outcome):
|
def trio_done_callback(main_outcome):
|
||||||
log.info(
|
|
||||||
f'trio_main finished with\n'
|
|
||||||
f'|_{main_outcome!r}'
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(main_outcome, Error):
|
if isinstance(main_outcome, Error):
|
||||||
error: BaseException = main_outcome.error
|
error: BaseException = main_outcome.error
|
||||||
|
@ -640,6 +594,7 @@ def run_as_asyncio_guest(
|
||||||
|
|
||||||
else:
|
else:
|
||||||
trio_done_fut.set_result(main_outcome)
|
trio_done_fut.set_result(main_outcome)
|
||||||
|
log.runtime(f'trio_main finished: {main_outcome!r}')
|
||||||
|
|
||||||
startup_msg += (
|
startup_msg += (
|
||||||
f'-> created {trio_done_callback!r}\n'
|
f'-> created {trio_done_callback!r}\n'
|
||||||
|
@ -658,48 +613,26 @@ def run_as_asyncio_guest(
|
||||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||||
done_callback=trio_done_callback,
|
done_callback=trio_done_callback,
|
||||||
)
|
)
|
||||||
fute_err: BaseException|None = None
|
|
||||||
try:
|
try:
|
||||||
|
# TODO: better SIGINT handling since shielding seems to
|
||||||
|
# make NO DIFFERENCE XD
|
||||||
|
# -[ ] maybe this is due to 3.11's recent SIGINT handling
|
||||||
|
# changes and we can better work with/around it?
|
||||||
|
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
|
||||||
out: Outcome = await asyncio.shield(trio_done_fut)
|
out: Outcome = await asyncio.shield(trio_done_fut)
|
||||||
|
# NOTE `Error.unwrap()` will raise
|
||||||
# NOTE will raise (via `Error.unwrap()`) from any
|
|
||||||
# exception packed into the guest-run's `main_outcome`.
|
|
||||||
return out.unwrap()
|
return out.unwrap()
|
||||||
|
|
||||||
except (
|
except asyncio.CancelledError:
|
||||||
# XXX special SIGINT-handling is required since
|
|
||||||
# `asyncio.shield()`-ing seems to NOT handle that case as
|
|
||||||
# per recent changes in 3.11:
|
|
||||||
# https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption
|
|
||||||
#
|
|
||||||
# NOTE: further, apparently ONLY need to handle this
|
|
||||||
# special SIGINT case since all other `asyncio`-side
|
|
||||||
# errors can be processed via our `chan._aio_err`
|
|
||||||
# relaying (right?); SIGINT seems to be totally diff
|
|
||||||
# error path in `asyncio`'s runtime..?
|
|
||||||
asyncio.CancelledError,
|
|
||||||
|
|
||||||
) as fute_err:
|
|
||||||
err_message: str = (
|
|
||||||
'main `asyncio` task '
|
|
||||||
)
|
|
||||||
if isinstance(fute_err, asyncio.CancelledError):
|
|
||||||
err_message += 'was cancelled!\n'
|
|
||||||
else:
|
|
||||||
err_message += f'errored with {out.error!r}\n'
|
|
||||||
|
|
||||||
actor: tractor.Actor = tractor.current_actor()
|
actor: tractor.Actor = tractor.current_actor()
|
||||||
log.exception(
|
log.exception(
|
||||||
err_message
|
'`asyncio`-side main task was cancelled!\n'
|
||||||
+
|
'Cancelling actor-runtime..\n'
|
||||||
'Cancelling `trio`-side `tractor`-runtime..\n'
|
|
||||||
f'c)>\n'
|
f'c)>\n'
|
||||||
f' |_{actor}.cancel_soon()\n'
|
f' |_{actor}.cancel_soon()\n'
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: reduce this comment bloc since abandon issues are
|
|
||||||
# now solved?
|
|
||||||
#
|
|
||||||
# XXX NOTE XXX the next LOC is super important!!!
|
# XXX NOTE XXX the next LOC is super important!!!
|
||||||
# => without it, we can get a guest-run abandonment case
|
# => without it, we can get a guest-run abandonment case
|
||||||
# where asyncio will not trigger `trio` in a final event
|
# where asyncio will not trigger `trio` in a final event
|
||||||
|
@ -748,55 +681,16 @@ def run_as_asyncio_guest(
|
||||||
# is apparently a working fix!
|
# is apparently a working fix!
|
||||||
actor.cancel_soon()
|
actor.cancel_soon()
|
||||||
|
|
||||||
# XXX NOTE XXX pump the `asyncio` event-loop to allow
|
# XXX NOTE XXX PUMP the asyncio event loop to allow `trio`-side to
|
||||||
# `trio`-side to `trio`-guest-run to complete and
|
# `trio`-guest-run to complete and teardown !!
|
||||||
# teardown !!
|
|
||||||
#
|
#
|
||||||
# *WITHOUT THIS* the guest-run can get race-conditionally abandoned!!
|
# XXX WITHOUT THIS the guest-run gets race-conditionally
|
||||||
# XD
|
# abandoned by `asyncio`!!
|
||||||
#
|
# XD XD XD
|
||||||
await asyncio.sleep(.1) # `delay` can't be 0 either XD
|
await asyncio.shield(
|
||||||
while not trio_done_fut.done():
|
asyncio.sleep(.1) # NOPE! it can't be 0 either XD
|
||||||
log.runtime(
|
)
|
||||||
'Waiting on main guest-run `asyncio` task to complete..\n'
|
raise
|
||||||
f'|_trio_done_fut: {trio_done_fut}\n'
|
|
||||||
)
|
|
||||||
await asyncio.sleep(.1)
|
|
||||||
|
|
||||||
# XXX: don't actually need the shield.. seems to
|
|
||||||
# make no difference (??) and we know it spawns an
|
|
||||||
# internal task..
|
|
||||||
# await asyncio.shield(asyncio.sleep(.1))
|
|
||||||
|
|
||||||
# XXX alt approach but can block indefinitely..
|
|
||||||
# so don't use?
|
|
||||||
# loop._run_once()
|
|
||||||
|
|
||||||
try:
|
|
||||||
return trio_done_fut.result()
|
|
||||||
except asyncio.exceptions.InvalidStateError as state_err:
|
|
||||||
|
|
||||||
# XXX be super dupere noisy about abandonment issues!
|
|
||||||
aio_task: asyncio.Task = asyncio.current_task()
|
|
||||||
message: str = (
|
|
||||||
'The `asyncio`-side task likely exited before the '
|
|
||||||
'`trio`-side guest-run completed!\n\n'
|
|
||||||
)
|
|
||||||
if fute_err:
|
|
||||||
message += (
|
|
||||||
f'The main {aio_task}\n'
|
|
||||||
f'STOPPED due to {type(fute_err)}\n\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
message += (
|
|
||||||
f'Likely something inside our guest-run-as-task impl is '
|
|
||||||
f'not effectively waiting on the `trio`-side to complete ?!\n'
|
|
||||||
f'This code -> {aio_main!r}\n\n'
|
|
||||||
|
|
||||||
'Below you will likely see a '
|
|
||||||
'"RuntimeWarning: Trio guest run got abandoned.." !!\n'
|
|
||||||
)
|
|
||||||
raise AsyncioRuntimeTranslationError(message) from state_err
|
|
||||||
|
|
||||||
# might as well if it's installed.
|
# might as well if it's installed.
|
||||||
try:
|
try:
|
||||||
|
@ -804,7 +698,7 @@ def run_as_asyncio_guest(
|
||||||
loop = uvloop.new_event_loop()
|
loop = uvloop.new_event_loop()
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
except ImportError:
|
except ImportError:
|
||||||
log.runtime('`uvloop` not available..')
|
pass
|
||||||
|
|
||||||
return asyncio.run(
|
return asyncio.run(
|
||||||
aio_main(trio_main),
|
aio_main(trio_main),
|
||||||
|
|
Loading…
Reference in New Issue