forked from goodboy/tractor
1
0
Fork 0

Compare commits

...

18 Commits

Author SHA1 Message Date
Tyler Goodlet c37eee6011 Add breakpoint in parent example 2020-12-09 11:03:42 -05:00
Tyler Goodlet e637d9d23d Drop duplicate project-package name in msg header 2020-11-16 08:44:41 -05:00
Tyler Goodlet d345ba4f2f Raise from asyncio error; fixes mypy 2020-11-16 08:44:41 -05:00
Tyler Goodlet 0c3cd553d3 Tweak log msg 2020-11-16 08:44:41 -05:00
Tyler Goodlet ce5ed631e0 Log error 2020-11-16 08:44:41 -05:00
Tyler Goodlet f174aaf7d8 Drop uneeded parent cs cancel 2020-11-16 08:44:41 -05:00
Tyler Goodlet e830862181 Support asyncio actors with the trio spawner backend 2020-11-16 08:44:41 -05:00
Tyler Goodlet 8d9f0a12a4 Revert removal of `infect_asyncio` in nursery start methods 2020-11-16 08:44:41 -05:00
Tyler Goodlet a6963b2c6a Attempt to make mypy happy.. 2020-11-16 08:44:41 -05:00
Tyler Goodlet f0861b9d79 Add an obnoxious error message on internal failures 2020-11-16 08:44:41 -05:00
Tyler Goodlet a045ba489a Allow marking `asyncio` funcs declaring `to_trio` channel 2020-11-16 08:44:41 -05:00
Tyler Goodlet 0adcefcfd4 Wow, fix all the broken async func invoking code..
Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
2020-11-16 08:44:41 -05:00
Tyler Goodlet 270061508e Drop entrypoints from `Actor` 2020-11-16 08:44:41 -05:00
Tyler Goodlet 04794a1d8e Move asyncio guest mode entrypoint to `to_asyncio`
The function is useful if you want to run the "main process" under
`asyncio`. Until `trio` core wraps this better we'll keep our own copy
in the interim (there's a new "inside-out-guest" mode almost on
mainline so hang tight).
2020-11-16 08:44:41 -05:00
Tyler Goodlet d5079558f7 Propagate any spawned `asyncio` task error upwards
This should mostly maintain top level SC principles for any task spawned
using `tractor.to_asyncio.run()`. When the `asyncio` task completes make
sure to cancel the pertaining `trio` cancel scope and raise any error
that may have resulted.

Resolves #120
2020-11-16 08:44:41 -05:00
Tyler Goodlet 3e47795305 Try not masking SIGINT in child processes 2020-11-16 00:07:43 -05:00
Tyler Goodlet 02b20dd97c Make SIGINT handler kill the process tree
The std lib's `pdb` internals override SIGINT handling whenever one
enters the debugger repl. Force a handler that kills the tree if SIGINT
is triggered from the root actor, otherwise igore it since supervised
children should be managed already. This resolves an issue with guest
mode where `pdb` causes SIGINTs to be swallowed resulting in the host
loop never terminating the process tree.
2020-11-16 00:01:21 -05:00
Tyler Goodlet 5f55c7ca00 Add `Actor.cancel_soon()` for sync self destruct
Add a sync method that can be used to cancel the current actor from
a synchronous context. This is useful in debugging situations where
sync debugger code may need to kill the process tree.

Also, make the internal "lifetime stack" a global var; easier to manage
from client code that may was to add callbacks prior to the actor
runtime being fully setup.
2020-11-15 23:54:42 -05:00
11 changed files with 333 additions and 22 deletions

View File

@ -0,0 +1,61 @@
from functools import partial
import asyncio
import trio
import tractor
async def asyncio_loop_forever():
while True:
await asyncio.sleep(0.1)
# print('yo')
async def sleep_forever(asyncio=False):
if asyncio:
await tractor.to_asyncio.run_task(asyncio_loop_forever)
else:
await trio.sleep_forever()
async def stream_forever():
while True:
yield 'doggy'
async def main():
"""Test breakpoint in a streaming actor.
"""
await tractor.breakpoint()
async with tractor.open_nursery() as n:
p0 = await n.start_actor('streamer',
rpc_module_paths=[__name__])
p1 = await n.start_actor(
'sleep_forever',
rpc_module_paths=[__name__], infect_asyncio=True
)
# await n.run_in_actor('sleeper', sleep_forever)
# await n.run_in_actor('sleeper', sleep_forever)
async with trio.open_nursery() as ln:
ln.start_soon(
partial(
p1.run,
__name__,
'sleep_forever',
asyncio=True,
)
)
# ln.start_soon(p0.run, __name__, 'sleep_forever')
async for val in await p0.run(__name__, 'stream_forever'):
print(val)
await trio.sleep(1)
if __name__ == '__main__':
tractor.run(main, debug_mode=True, loglevel='trace')

View File

@ -22,6 +22,7 @@ from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem
from . import _spawn
from . import msg
from . import to_asyncio
__all__ = [
@ -38,7 +39,8 @@ __all__ = [
'MultiError',
'RemoteActorError',
'ModuleNotExposed',
'msg'
'msg',
'to_asyncio',
]
@ -110,7 +112,10 @@ async def _main(
else:
# start this local actor as the arbiter
actor = Arbiter(
name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs)
name or 'arbiter',
arbiter_addr=arbiter_addr,
**kwargs
)
# ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes.

View File

@ -162,6 +162,10 @@ def _get_mod_abspath(module):
return os.path.abspath(module.__file__)
# process-global stack closed at end on actor runtime teardown
_lifetime_stack: ExitStack = ExitStack()
class Actor:
"""The fundamental concurrency primitive.
@ -176,12 +180,14 @@ class Actor:
_root_n: Optional[trio.Nursery] = None
_service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None
_lifetime_stack: ExitStack = ExitStack()
# Information about `__main__` from parent
_parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
def __init__(
self,
name: str,
@ -531,8 +537,9 @@ class Actor:
# deadlock and other weird behaviour)
if func != self.cancel:
if isinstance(cs, Exception):
log.warning(f"Task for RPC func {func} failed with"
f"{cs}")
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
@ -770,7 +777,7 @@ class Actor:
# tear down all lifetime contexts
# api idea: ``tractor.open_context()``
log.warning("Closing all actor lifetime contexts")
self._lifetime_stack.close()
_lifetime_stack.close()
# Unregister actor from the arbiter
if registered_with_arbiter and (
@ -840,6 +847,14 @@ class Actor:
# signal the server is down since nursery above terminated
self._server_down.set()
def cancel_soon(self) -> None:
"""Cancel this actor asap; can be called from a sync context.
Schedules `.cancel()` to be run immediately just like when
cancelled by the parent.
"""
self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool:
"""Cancel this actor.
@ -988,6 +1003,9 @@ class Actor:
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
def is_infected_aio(self) -> bool:
return self._infected_aio
class Arbiter(Actor):
"""A special actor who knows all the other actors and always has

View File

@ -19,12 +19,15 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args()
subactor = Actor(
@ -36,5 +39,6 @@ if __name__ == "__main__":
_trio_main(
subactor,
parent_addr=args.parent_addr
)
parent_addr=args.parent_addr,
infect_asyncio=args.asyncio,
)

View File

@ -11,7 +11,6 @@ from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
from async_generator import aclosing
import tractor
import trio
from trio.testing import wait_all_tasks_blocked
from .log import get_logger
from . import _state
@ -132,13 +131,20 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
log.error(f"TTY lock released by {task_name}:{uid}")
def handler(signum, frame):
"""Block SIGINT while in debug to avoid deadlocks with cancellation.
def handler(signum, frame, *args):
"""Specialized debugger compatible SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
is always cancelled on ctrl-c.
"""
print(
"tractor ignores SIGINT while in debug mode\n"
"If you have a special need for it please open an issue.\n"
)
if is_root_process():
tractor.current_actor().cancel_soon()
else:
print(
"tractor ignores SIGINT while in debug mode\n"
"If you have a special need for it please open an issue.\n"
)
# don't allow those stdlib mofos to mess with sigint handler
@ -261,6 +267,7 @@ def _breakpoint(debug_func) -> Awaitable[None]:
# may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore
return
await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release
@ -269,7 +276,6 @@ def _breakpoint(debug_func) -> Awaitable[None]:
log.debug("Entering the synchronous world of pdb")
debug_func(actor)
# user code **must** await this!
return _bp()

View File

@ -10,6 +10,7 @@ import trio # type: ignore
from ._actor import Actor
from .log import get_console_log, get_logger
from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__)
@ -21,6 +22,7 @@ def _mp_main(
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
@ -46,7 +48,11 @@ def _mp_main(
parent_addr=parent_addr
)
try:
trio.run(trio_main)
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.info(f"Actor {actor.uid} terminated")
@ -54,17 +60,21 @@ def _mp_main(
def _trio_main(
actor: 'Actor',
parent_addr: Tuple[str, int] = None
*,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
"""
# Disable sigint handling in children;
# we don't need it thanks to our cancellation machinery.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# signal.signal(signal.SIGINT, signal.SIG_IGN)
# TODO: make a global func to set this or is it too hacky?
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -82,7 +92,11 @@ def _trio_main(
)
try:
trio.run(trio_main)
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI")

View File

@ -157,6 +157,7 @@ async def cancel_on_completion(
async def spawn_subactor(
subactor: 'Actor',
parent_addr: Tuple[str, int],
infect_asyncio: bool,
):
spawn_cmd = [
sys.executable,
@ -181,6 +182,10 @@ async def spawn_subactor(
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
proc = await trio.open_process(spawn_cmd)
try:
yield proc
@ -217,6 +222,7 @@ async def new_proc(
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
@ -232,6 +238,7 @@ async def new_proc(
async with spawn_subactor(
subactor,
parent_addr,
infect_asyncio=infect_asyncio
) as proc:
log.info(f"Started {proc}")
@ -321,6 +328,7 @@ async def new_proc(
fs_info,
start_method,
parent_addr,
infect_asyncio,
),
# daemon=True,
name=name,

View File

@ -41,9 +41,11 @@ def stream(func):
"""
func._tractor_stream_function = True
sig = inspect.signature(func)
if 'ctx' not in sig.parameters:
params = sig.parameters
if 'ctx' not in params and 'to_trio' not in params:
raise TypeError(
"The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`"
f"{func.__name__} must be `ctx: tractor.Context` "
"(Or ``to_trio`` if using ``asyncio`` in guest mode)."
)
return func

View File

@ -56,6 +56,7 @@ class ActorNursery:
rpc_module_paths: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
infect_asyncio: bool = False,
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
@ -89,6 +90,7 @@ class ActorNursery:
bind_addr,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
)
)
@ -101,6 +103,7 @@ class ActorNursery:
rpc_module_paths: Optional[List[str]] = None,
statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn``
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
@ -119,6 +122,7 @@ class ActorNursery:
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
)
# this marks the actor to be cancelled after its portal result
# is retreived, see logic in `open_nursery()` below.

View File

@ -55,7 +55,15 @@ def get_logger(
'''Return the package log or a sub-log for `name` if provided.
'''
log = rlog = logging.getLogger(_root_name)
if name and name != _proj_name:
# handling for modules that use ``get_logger(__name__)`` to
# avoid duplicate project-package token in msg output
rname, _, tail = name.partition('.')
if rname == _root_name:
name = tail
log = rlog.getChild(name)
log.level = rlog.level

View File

@ -0,0 +1,181 @@
"""
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
"""
import asyncio
import inspect
from typing import (
Any,
Callable,
AsyncIterator,
Awaitable,
Union,
)
import trio
from .log import get_logger
from ._state import current_actor
log = get_logger(__name__)
__all__ = ['run_task', 'run_as_asyncio_guest']
async def run_coro(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
) -> None:
"""Await ``coro`` and relay result back to ``trio``.
"""
to_trio.send_nowait(await coro)
async def consume_asyncgen(
to_trio: trio.MemorySendChannel,
coro: AsyncIterator,
) -> None:
"""Stream async generator results back to ``trio``.
``from_trio`` might eventually be used here for
bidirectional streaming.
"""
async for item in coro:
to_trio.send_nowait(item)
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
assert current_actor().is_infected_aio()
# ITC (inter task comms)
from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
_treat_as_stream = True
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
cancel_scope = trio.CancelScope()
# start the asyncio task we submitted from trio
if inspect.isawaitable(coro):
task = asyncio.create_task(run_coro(to_trio, coro))
elif inspect.isasyncgen(coro):
task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else:
raise TypeError(f"No support for invoking {coro}")
aio_err = None
def cancel_trio(task):
"""Cancel the calling ``trio`` task on error.
"""
nonlocal err
aio_err = task.exception()
if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}")
cancel_scope.cancel()
task.add_done_callback(cancel_trio)
# async iterator
if inspect.isasyncgen(coro) or _treat_as_stream:
async def stream_results():
try:
with cancel_scope:
# stream values upward
async with from_aio:
async for item in from_aio:
yield item
except BaseException as err:
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
return stream_results()
# simple async func
try:
with cancel_scope:
# return single value
return await from_aio.receive()
# Do we need this?
except BaseException as err:
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
def run_as_asyncio_guest(
trio_main: Callable,
) -> None:
"""Entry for an "infected ``asyncio`` actor".
Uh, oh. :o
It looks like your event loop has caught a case of the ``trio``s.
:()
Don't worry, we've heard you'll barely notice. You might hallucinate
a few more propagating errors and feel like your digestion has
slowed but if anything get's too bad your parents will know about
it.
:)
"""
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
log.info(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
asyncio.run(aio_main(trio_main))