forked from goodboy/tractor
1
0
Fork 0

Move asyncio guest mode entrypoint to `to_asyncio`

The function is useful if you want to run the "main process" under
`asyncio`. Until `trio` core wraps this better we'll keep our own copy
in the interim (there's a new "inside-out-guest" mode almost on
mainline so hang tight).
wip_fix_asyncio_gen_streaming
Tyler Goodlet 2020-07-01 13:38:40 -04:00
parent 9d4dba23a6
commit db15162f04
2 changed files with 59 additions and 3 deletions

View File

@ -8,6 +8,7 @@ import trio # type: ignore
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
from . import _state from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__) log = get_logger(__name__)
@ -44,6 +45,10 @@ def _mp_main(
parent_addr=parent_addr parent_addr=parent_addr
) )
try: try:
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
pass # handle it the same way trio does? pass # handle it the same way trio does?

View File

@ -13,10 +13,13 @@ from typing import (
import trio import trio
from .log import get_logger
from ._state import current_actor from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run']
__all__ = ['run_task']
async def _invoke( async def _invoke(
@ -40,7 +43,7 @@ async def _invoke(
return await coro return await coro
async def run( async def run_task(
func: Callable, func: Callable,
qsize: int = 2**10, qsize: int = 2**10,
**kwargs, **kwargs,
@ -97,3 +100,51 @@ async def run(
raise err raise err
return result() return result()
def run_as_asyncio_guest(
trio_main: Awaitable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
asyncio.run(aio_main(trio_main))