diff --git a/tractor/_actor.py b/tractor/_actor.py index e962115..b652e9f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -5,10 +5,14 @@ from collections import defaultdict from functools import partial from itertools import chain import importlib +import importlib.util import inspect import uuid import typing from typing import Dict, List, Tuple, Any, Optional +from types import ModuleType +import sys +import os import trio # type: ignore from trio_typing import TaskStatus @@ -165,7 +169,7 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: List[str] = [], + rpc_module_paths: Dict[str, ModuleType] = {}, statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, @@ -226,9 +230,21 @@ class Actor: code (if it exists). """ try: - for path in self.rpc_module_paths: + for path, absfilepath in self.rpc_module_paths.items(): log.debug(f"Attempting to import {path}") - self._mods[path] = importlib.import_module(path) + # spec = importlib.util.spec_from_file_location( + # path, absfilepath) + # mod = importlib.util.module_from_spec(spec) + + # XXX append the allowed module to the python path + # which should allow for relative (at least downward) + # imports. Seems to be the only that will work currently + # to get `trio-run-in-process` to import modules we "send + # it". + sys.path.append(os.path.dirname(absfilepath)) + # spec.loader.exec_module(mod) + mod = importlib.import_module(path) + self._mods[path] = mod # if self.name != 'arbiter': # importlib.import_module('doggy') diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 311ab0b..026e257 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,6 +5,9 @@ Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp +# from . import log + +import trio import trio_run_in_process try: @@ -63,6 +66,7 @@ async def new_proc( # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], + nursery: trio.Nursery = None, use_trip: bool = True, ) -> mp.Process: """Create a new ``multiprocessing.Process`` using the @@ -72,8 +76,14 @@ async def new_proc( mng = trio_run_in_process.open_in_process( actor._trip_main, bind_addr, - parent_addr + parent_addr, + nursery=nursery, ) + # XXX playing with trip logging + # l = log.get_console_log(level='debug', name=None, _root_name='trio-run-in-process') + # import logging + # logger = logging.getLogger("trio-run-in-process") + # logger.setLevel('DEBUG') proc = await mng.__aenter__() proc.mng = mng return proc diff --git a/tractor/_trionics.py b/tractor/_trionics.py index cfed406..9808935 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -2,6 +2,7 @@ ``trio`` inspired apis and helpers """ import inspect +import importlib import platform import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any @@ -32,9 +33,10 @@ log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor: Actor) -> None: + def __init__(self, actor: Actor, nursery: trio.Nursery) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor + self._nursery = nursery self._children: Dict[ Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] @@ -43,6 +45,7 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False + # self._aexitstack = contextlib.AsyncExitStack() async def __aenter__(self): return self @@ -56,10 +59,16 @@ class ActorNursery: loglevel: str = None, # set log level per subactor ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() + + mods = {} + for path in rpc_module_paths or (): + mod = importlib.import_module(path) + mods[path] = mod.__file__ + actor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths or [], + rpc_module_paths=mods, statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, @@ -71,6 +80,7 @@ class ActorNursery: actor, bind_addr, parent_addr, + self._nursery, ) # `multiprocessing` only (since no async interface): # register the process before start in case we get a cancel @@ -98,7 +108,7 @@ class ActorNursery: name: str, fn: typing.Callable, bind_addr: Tuple[str, int] = ('127.0.0.1', 0), - rpc_module_paths: List[str] = [], + rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` @@ -113,7 +123,7 @@ class ActorNursery: mod_path = fn.__module__ portal = await self.start_actor( name, - rpc_module_paths=[mod_path] + rpc_module_paths, + rpc_module_paths=[mod_path] + (rpc_module_paths or []), bind_addr=bind_addr, statespace=statespace, loglevel=loglevel, @@ -195,7 +205,8 @@ class ActorNursery: async def wait_for_proc( proc: mp.Process, actor: Actor, - cancel_scope: Optional[trio.CancelScope] = None, + portal: Portal, + cancel_scope: Optional[trio._core._run.CancelScope] = None, ) -> None: # please god don't hang if not isinstance(proc, trio_run_in_process.process.Process): @@ -205,7 +216,21 @@ class ActorNursery: proc.join() else: # trio_run_in_process blocking wait - await proc.mng.__aexit__(None, None, None) + if errors: + multierror = trio.MultiError(errors) + # import pdb; pdb.set_trace() + # try: + # with trio.CancelScope(shield=True): + # await proc.mng.__aexit__( + # type(multierror), + # multierror, + # multierror.__traceback__, + # ) + # except BaseException as err: + # import pdb; pdb.set_trace() + # pass + # else: + await proc.mng.__aexit__(None, None, None) # proc.nursery.cancel_scope.cancel() log.debug(f"Joined {proc}") @@ -214,7 +239,7 @@ class ActorNursery: # proc terminated, cancel result waiter that may have # been spawned in tandem if not done already - if cancel_scope: + if cancel_scope: # and not portal._cancelled: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -226,18 +251,19 @@ class ActorNursery: errors: List[Exception] = [] # wait on run_in_actor() tasks, unblocks when all complete async with trio.open_nursery() as nursery: + # async with self._nursery as nursery: for subactor, proc, portal in children.values(): cs = None # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: - assert portal cs = await nursery.start( cancel_on_completion, portal, subactor) # TODO: how do we handle remote host spawned actors? nursery.start_soon( - wait_for_proc, proc, subactor, cs) + wait_for_proc, proc, subactor, portal, cs) if errors: + multierror = trio.MultiError(errors) if not self.cancelled: # bubble up error(s) here and expect to be called again # once the nursery has been cancelled externally (ex. @@ -255,8 +281,7 @@ class ActorNursery: async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): # TODO: how do we handle remote host spawned actors? - assert portal - nursery.start_soon(wait_for_proc, proc, subactor, cs) + nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) log.debug(f"All subactors for {self} have terminated") if errors: @@ -364,10 +389,18 @@ class ActorNursery: async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: """Create and yield a new ``ActorNursery``. """ + # TODO: figure out supervisors from erlang + actor = current_actor() if not actor: raise RuntimeError("No actor instance has been defined yet?") - # TODO: figure out supervisors from erlang - async with ActorNursery(actor) as nursery: - yield nursery + # XXX we need this nursery because TRIP is doing all its stuff with + # an `@asynccontextmanager` which has an internal nursery *and* the + # task that opens a nursery must also close it - so we need a path + # in TRIP to make this all kinda work as well. Note I'm basically + # giving up for now - it's probably equivalent amounts of work to + # make TRIP vs. `multiprocessing` work here. + async with trio.open_nursery() as nursery: + async with ActorNursery(actor, nursery) as anursery: + yield anursery