More trip WIP stuff working.. kinda

Get a few more things working:
- fail reliably when remote module loading goes awry
- do a real hacky job of module loading using `sys.path` stuffsies
- we're still totally borked when trying to spin up and quickly cancel
a bunch of subactors...

It's a small move forward I guess.
try_trip^2
Tyler Goodlet 2020-01-11 21:37:30 -05:00
parent 1b7cdfe512
commit afa640dcab
3 changed files with 77 additions and 18 deletions

View File

@ -5,10 +5,14 @@ from collections import defaultdict
from functools import partial
from itertools import chain
import importlib
import importlib.util
import inspect
import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional
from types import ModuleType
import sys
import os
import trio # type: ignore
from trio_typing import TaskStatus
@ -165,7 +169,7 @@ class Actor:
def __init__(
self,
name: str,
rpc_module_paths: List[str] = [],
rpc_module_paths: Dict[str, ModuleType] = {},
statespace: Optional[Dict[str, Any]] = None,
uid: str = None,
loglevel: str = None,
@ -226,9 +230,21 @@ class Actor:
code (if it exists).
"""
try:
for path in self.rpc_module_paths:
for path, absfilepath in self.rpc_module_paths.items():
log.debug(f"Attempting to import {path}")
self._mods[path] = importlib.import_module(path)
# spec = importlib.util.spec_from_file_location(
# path, absfilepath)
# mod = importlib.util.module_from_spec(spec)
# XXX append the allowed module to the python path
# which should allow for relative (at least downward)
# imports. Seems to be the only that will work currently
# to get `trio-run-in-process` to import modules we "send
# it".
sys.path.append(os.path.dirname(absfilepath))
# spec.loader.exec_module(mod)
mod = importlib.import_module(path)
self._mods[path] = mod
# if self.name != 'arbiter':
# importlib.import_module('doggy')

View File

@ -5,6 +5,9 @@ Mostly just wrapping around ``multiprocessing``.
"""
import multiprocessing as mp
# from . import log
import trio
import trio_run_in_process
try:
@ -63,6 +66,7 @@ async def new_proc(
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
nursery: trio.Nursery = None,
use_trip: bool = True,
) -> mp.Process:
"""Create a new ``multiprocessing.Process`` using the
@ -72,8 +76,14 @@ async def new_proc(
mng = trio_run_in_process.open_in_process(
actor._trip_main,
bind_addr,
parent_addr
parent_addr,
nursery=nursery,
)
# XXX playing with trip logging
# l = log.get_console_log(level='debug', name=None, _root_name='trio-run-in-process')
# import logging
# logger = logging.getLogger("trio-run-in-process")
# logger.setLevel('DEBUG')
proc = await mng.__aenter__()
proc.mng = mng
return proc

View File

@ -2,6 +2,7 @@
``trio`` inspired apis and helpers
"""
import inspect
import importlib
import platform
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any
@ -32,9 +33,10 @@ log = get_logger('tractor')
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(self, actor: Actor) -> None:
def __init__(self, actor: Actor, nursery: trio.Nursery) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
self._nursery = nursery
self._children: Dict[
Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]]
@ -43,6 +45,7 @@ class ActorNursery:
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
# self._aexitstack = contextlib.AsyncExitStack()
async def __aenter__(self):
return self
@ -56,10 +59,16 @@ class ActorNursery:
loglevel: str = None, # set log level per subactor
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
mods = {}
for path in rpc_module_paths or ():
mod = importlib.import_module(path)
mods[path] = mod.__file__
actor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [],
rpc_module_paths=mods,
statespace=statespace, # global proc state vars
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
@ -71,6 +80,7 @@ class ActorNursery:
actor,
bind_addr,
parent_addr,
self._nursery,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
@ -98,7 +108,7 @@ class ActorNursery:
name: str,
fn: typing.Callable,
bind_addr: Tuple[str, int] = ('127.0.0.1', 0),
rpc_module_paths: List[str] = [],
rpc_module_paths: Optional[List[str]] = None,
statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn``
@ -113,7 +123,7 @@ class ActorNursery:
mod_path = fn.__module__
portal = await self.start_actor(
name,
rpc_module_paths=[mod_path] + rpc_module_paths,
rpc_module_paths=[mod_path] + (rpc_module_paths or []),
bind_addr=bind_addr,
statespace=statespace,
loglevel=loglevel,
@ -195,7 +205,8 @@ class ActorNursery:
async def wait_for_proc(
proc: mp.Process,
actor: Actor,
cancel_scope: Optional[trio.CancelScope] = None,
portal: Portal,
cancel_scope: Optional[trio._core._run.CancelScope] = None,
) -> None:
# please god don't hang
if not isinstance(proc, trio_run_in_process.process.Process):
@ -205,6 +216,20 @@ class ActorNursery:
proc.join()
else:
# trio_run_in_process blocking wait
if errors:
multierror = trio.MultiError(errors)
# import pdb; pdb.set_trace()
# try:
# with trio.CancelScope(shield=True):
# await proc.mng.__aexit__(
# type(multierror),
# multierror,
# multierror.__traceback__,
# )
# except BaseException as err:
# import pdb; pdb.set_trace()
# pass
# else:
await proc.mng.__aexit__(None, None, None)
# proc.nursery.cancel_scope.cancel()
@ -214,7 +239,7 @@ class ActorNursery:
# proc terminated, cancel result waiter that may have
# been spawned in tandem if not done already
if cancel_scope:
if cancel_scope: # and not portal._cancelled:
log.warning(
f"Cancelling existing result waiter task for {actor.uid}")
cancel_scope.cancel()
@ -226,18 +251,19 @@ class ActorNursery:
errors: List[Exception] = []
# wait on run_in_actor() tasks, unblocks when all complete
async with trio.open_nursery() as nursery:
# async with self._nursery as nursery:
for subactor, proc, portal in children.values():
cs = None
# portal from ``run_in_actor()``
if portal in self._cancel_after_result_on_exit:
assert portal
cs = await nursery.start(
cancel_on_completion, portal, subactor)
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(
wait_for_proc, proc, subactor, cs)
wait_for_proc, proc, subactor, portal, cs)
if errors:
multierror = trio.MultiError(errors)
if not self.cancelled:
# bubble up error(s) here and expect to be called again
# once the nursery has been cancelled externally (ex.
@ -255,8 +281,7 @@ class ActorNursery:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values():
# TODO: how do we handle remote host spawned actors?
assert portal
nursery.start_soon(wait_for_proc, proc, subactor, cs)
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
log.debug(f"All subactors for {self} have terminated")
if errors:
@ -364,10 +389,18 @@ class ActorNursery:
async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery``.
"""
# TODO: figure out supervisors from erlang
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang
async with ActorNursery(actor) as nursery:
yield nursery
# XXX we need this nursery because TRIP is doing all its stuff with
# an `@asynccontextmanager` which has an internal nursery *and* the
# task that opens a nursery must also close it - so we need a path
# in TRIP to make this all kinda work as well. Note I'm basically
# giving up for now - it's probably equivalent amounts of work to
# make TRIP vs. `multiprocessing` work here.
async with trio.open_nursery() as nursery:
async with ActorNursery(actor, nursery) as anursery:
yield anursery