forked from goodboy/tractor
Get entry points reorg without asyncio compat
This is an edit to factor out changes needed for the `asyncio` in guest mode integration (which currently isn't tested well) so that later more pertinent changes (which are tested well) can be rebased off of this branch and merged into mainline sooner. The *infect_asyncio* branch will need to be rebased onto this branch as well before merge to mainline.reorg_entry_points
parent
8054bc7c70
commit
8e32199509
|
@ -20,7 +20,6 @@ from ._state import current_actor
|
||||||
from ._exceptions import RemoteActorError, ModuleNotExposed
|
from ._exceptions import RemoteActorError, ModuleNotExposed
|
||||||
from . import msg
|
from . import msg
|
||||||
from . import _spawn
|
from . import _spawn
|
||||||
from . import to_asyncio
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
@ -36,7 +35,6 @@ __all__ = [
|
||||||
'RemoteActorError',
|
'RemoteActorError',
|
||||||
'ModuleNotExposed',
|
'ModuleNotExposed',
|
||||||
'msg'
|
'msg'
|
||||||
'to_asyncio'
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
"""
|
"""
|
||||||
Process entry points.
|
Process entry points.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from typing import Tuple, Any, Awaitable
|
from typing import Tuple, Any
|
||||||
|
|
||||||
import trio # type: ignore
|
import trio # type: ignore
|
||||||
|
|
||||||
|
@ -12,52 +11,9 @@ from .log import get_console_log, get_logger
|
||||||
from . import _state
|
from . import _state
|
||||||
|
|
||||||
|
|
||||||
__all__ = ('run',)
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _asyncio_main(
|
|
||||||
trio_main: Awaitable,
|
|
||||||
) -> None:
|
|
||||||
"""Entry for an "infected ``asyncio`` actor".
|
|
||||||
|
|
||||||
Uh, oh. :o
|
|
||||||
|
|
||||||
It looks like your event loop has caught a case of the ``trio``s.
|
|
||||||
|
|
||||||
:()
|
|
||||||
|
|
||||||
Don't worry, we've heard you'll barely notice. You might hallucinate
|
|
||||||
a few more propagating errors and feel like your digestion has
|
|
||||||
slowed but if anything get's too bad your parents will know about
|
|
||||||
it.
|
|
||||||
|
|
||||||
:)
|
|
||||||
"""
|
|
||||||
async def aio_main(trio_main):
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
|
|
||||||
trio_done_fut = asyncio.Future()
|
|
||||||
|
|
||||||
def trio_done_callback(main_outcome):
|
|
||||||
log.info(f"trio_main finished: {main_outcome!r}")
|
|
||||||
trio_done_fut.set_result(main_outcome)
|
|
||||||
|
|
||||||
# start the infection: run trio on the asyncio loop in "guest mode"
|
|
||||||
log.info(f"Infecting asyncio process with {trio_main}")
|
|
||||||
trio.lowlevel.start_guest_run(
|
|
||||||
trio_main,
|
|
||||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
|
||||||
done_callback=trio_done_callback,
|
|
||||||
)
|
|
||||||
|
|
||||||
(await trio_done_fut).unwrap()
|
|
||||||
|
|
||||||
asyncio.run(aio_main(trio_main))
|
|
||||||
|
|
||||||
|
|
||||||
def _mp_main(
|
def _mp_main(
|
||||||
actor: 'Actor',
|
actor: 'Actor',
|
||||||
accept_addr: Tuple[str, int],
|
accept_addr: Tuple[str, int],
|
||||||
|
@ -90,11 +46,7 @@ def _mp_main(
|
||||||
parent_addr=parent_addr
|
parent_addr=parent_addr
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
if infect_asyncio:
|
trio.run(trio_main)
|
||||||
actor._infected_aio = True
|
|
||||||
_asyncio_main(trio_main)
|
|
||||||
else:
|
|
||||||
trio.run(trio_main)
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass # handle it the same way trio does?
|
pass # handle it the same way trio does?
|
||||||
log.info(f"Actor {actor.uid} terminated")
|
log.info(f"Actor {actor.uid} terminated")
|
||||||
|
|
|
@ -52,7 +52,6 @@ class ActorNursery:
|
||||||
rpc_module_paths: List[str] = None,
|
rpc_module_paths: List[str] = None,
|
||||||
loglevel: str = None, # set log level per subactor
|
loglevel: str = None, # set log level per subactor
|
||||||
nursery: trio.Nursery = None,
|
nursery: trio.Nursery = None,
|
||||||
infect_asyncio: bool = False,
|
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||||
|
|
||||||
|
@ -81,7 +80,6 @@ class ActorNursery:
|
||||||
self.errors,
|
self.errors,
|
||||||
bind_addr,
|
bind_addr,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
infect_asyncio=infect_asyncio,
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -93,7 +91,6 @@ class ActorNursery:
|
||||||
rpc_module_paths: Optional[List[str]] = None,
|
rpc_module_paths: Optional[List[str]] = None,
|
||||||
statespace: Dict[str, Any] = None,
|
statespace: Dict[str, Any] = None,
|
||||||
loglevel: str = None, # set log level per subactor
|
loglevel: str = None, # set log level per subactor
|
||||||
infect_asyncio: bool = False,
|
|
||||||
**kwargs, # explicit args to ``fn``
|
**kwargs, # explicit args to ``fn``
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
"""Spawn a new actor, run a lone task, then terminate the actor and
|
"""Spawn a new actor, run a lone task, then terminate the actor and
|
||||||
|
@ -112,7 +109,6 @@ class ActorNursery:
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
# use the run_in_actor nursery
|
# use the run_in_actor nursery
|
||||||
nursery=self._ria_nursery,
|
nursery=self._ria_nursery,
|
||||||
infect_asyncio=infect_asyncio,
|
|
||||||
)
|
)
|
||||||
# this marks the actor to be cancelled after its portal result
|
# this marks the actor to be cancelled after its portal result
|
||||||
# is retreived, see logic in `open_nursery()` below.
|
# is retreived, see logic in `open_nursery()` below.
|
||||||
|
|
|
@ -1,70 +0,0 @@
|
||||||
"""
|
|
||||||
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
|
|
||||||
"""
|
|
||||||
import asyncio
|
|
||||||
import inspect
|
|
||||||
from typing import (
|
|
||||||
Any,
|
|
||||||
Callable,
|
|
||||||
AsyncGenerator,
|
|
||||||
Awaitable,
|
|
||||||
Union,
|
|
||||||
)
|
|
||||||
|
|
||||||
import trio
|
|
||||||
|
|
||||||
|
|
||||||
async def _invoke(
|
|
||||||
from_trio,
|
|
||||||
to_trio,
|
|
||||||
coro
|
|
||||||
) -> Union[AsyncGenerator, Awaitable]:
|
|
||||||
"""Await or stream awaiable object based on type into
|
|
||||||
``trio`` memory channel.
|
|
||||||
"""
|
|
||||||
async def stream_from_gen(c):
|
|
||||||
async for item in c:
|
|
||||||
to_trio.put_nowait(item)
|
|
||||||
to_trio.put_nowait
|
|
||||||
|
|
||||||
async def just_return(c):
|
|
||||||
to_trio.put_nowait(await c)
|
|
||||||
|
|
||||||
if inspect.isasyncgen(coro):
|
|
||||||
return await stream_from_gen(coro)
|
|
||||||
elif inspect.iscoroutine(coro):
|
|
||||||
return await coro
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: make this some kind of tractor.to_asyncio.run()
|
|
||||||
async def run(
|
|
||||||
func: Callable,
|
|
||||||
qsize: int = 2**10,
|
|
||||||
**kwargs,
|
|
||||||
) -> Any:
|
|
||||||
"""Run an ``asyncio`` async function or generator in a task, return
|
|
||||||
or stream the result back to ``trio``.
|
|
||||||
"""
|
|
||||||
# ITC (inter task comms)
|
|
||||||
from_trio = asyncio.Queue(qsize)
|
|
||||||
to_trio, from_aio = trio.open_memory_channel(qsize)
|
|
||||||
|
|
||||||
# allow target func to accept/stream results manually
|
|
||||||
kwargs['to_trio'] = to_trio
|
|
||||||
kwargs['from_trio'] = to_trio
|
|
||||||
|
|
||||||
coro = func(**kwargs)
|
|
||||||
|
|
||||||
# start the asyncio task we submitted from trio
|
|
||||||
# TODO: try out ``anyio`` asyncio based tg here
|
|
||||||
asyncio.create_task(_invoke(from_trio, to_trio, coro))
|
|
||||||
|
|
||||||
# determine return type async func vs. gen
|
|
||||||
if inspect.isasyncgen(coro):
|
|
||||||
await from_aio.get()
|
|
||||||
elif inspect.iscoroutine(coro):
|
|
||||||
async def gen():
|
|
||||||
async for tick in from_aio:
|
|
||||||
yield tuple(tick)
|
|
||||||
|
|
||||||
return gen()
|
|
Loading…
Reference in New Issue