diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 026e257..5e99723 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -3,12 +3,15 @@ Process spawning. Mostly just wrapping around ``multiprocessing``. """ +import inspect import multiprocessing as mp - -# from . import log +import platform +from typing import Any, List, Dict import trio import trio_run_in_process +from trio_typing import TaskStatus +from async_generator import aclosing try: from multiprocessing import semaphore_tracker # type: ignore @@ -23,12 +26,24 @@ from typing import Tuple from . import _forkserver_override from ._state import current_actor -from ._actor import Actor +from .log import get_logger +from ._portal import Portal +from ._actor import Actor, ActorFailure +log = get_logger('tractor') + _ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore +if platform.system() == 'Windows': + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.WaitForSingleObject(proc.sentinel) +else: + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.wait_readable(proc.sentinel) + + def try_set_start_method(name: str) -> mp.context.BaseContext: """Attempt to set the start method for ``multiprocess.Process`` spawning. @@ -60,73 +75,203 @@ def is_main_process() -> bool: return mp.current_process().name == 'MainProcess' +async def exhaust_portal( + portal: Portal, + actor: Actor +) -> Any: + """Pull final result from portal (assuming it has one). + + If the main task is an async generator do our best to consume + what's left of it. + """ + try: + log.debug(f"Waiting on final result from {actor.uid}") + final = res = await portal.result() + # if it's an async-gen then alert that we're cancelling it + if inspect.isasyncgen(res): + final = [] + log.warning( + f"Blindly consuming asyncgen for {actor.uid}") + with trio.fail_after(1): + async with aclosing(res) as agen: + async for item in agen: + log.debug(f"Consuming item {item}") + final.append(item) + except (Exception, trio.MultiError) as err: + # we reraise in the parent task via a ``trio.MultiError`` + return err + else: + return final + + +async def cancel_on_completion( + portal: Portal, + actor: Actor, + errors: List[Exception], + task_status=trio.TASK_STATUS_IGNORED, +) -> None: + """Cancel actor gracefully once it's "main" portal's + result arrives. + + Should only be called for actors spawned with `run_in_actor()`. + """ + with trio.CancelScope() as cs: + task_status.started(cs) + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors[actor.uid] = result + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) + else: + log.info(f"Cancelling {portal.channel.uid} gracefully") + + # cancel the process now that we have a final result + await portal.cancel_actor() + + # XXX: lol, this will never get run without a shield above.. + # if cs.cancelled_caught: + # log.warning( + # "Result waiter was cancelled, process may have died") + + async def new_proc( name: str, - actor: Actor, + actor_nursery: 'ActorNursery', + subactor: Actor, + errors: Dict[str, Exception], # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], - nursery: trio.Nursery = None, + begin_wait_phase: trio.Event, use_trip: bool = True, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> mp.Process: """Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. """ - if use_trip: # trio_run_in_process - mng = trio_run_in_process.open_in_process( - actor._trip_main, - bind_addr, - parent_addr, - nursery=nursery, - ) - # XXX playing with trip logging - # l = log.get_console_log(level='debug', name=None, _root_name='trio-run-in-process') - # import logging - # logger = logging.getLogger("trio-run-in-process") - # logger.setLevel('DEBUG') - proc = await mng.__aenter__() - proc.mng = mng - return proc - else: - # use multiprocessing - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info - else: - fs_info = (None, None, None, None, None) + cancel_scope = None - return _ctx.Process( - target=actor._mp_main, - args=( + async with trio.open_nursery() as nursery: + if use_trip: + # trio_run_in_process + async with trio_run_in_process.open_in_process( + subactor._trip_main, bind_addr, - fs_info, - start_method, - parent_addr - ), - # daemon=True, - name=name, - ) + parent_addr, + ) as proc: + log.info(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, proc, portal) + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + await actor_nursery._join_procs.wait() + + if portal in actor_nursery._cancel_after_result_on_exit: + cancel_scope = await nursery.start( + cancel_on_completion, portal, subactor, errors) + + # TRIP blocks here until process is complete + else: + # `multiprocessing` + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) + + proc = _ctx.Process( + target=subactor._mp_main, + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) + + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") + + log.info(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = (subactor, proc, portal) + + # unblock parent task + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + # this is required to ensure synchronization + # with startup and registration of this actor in + # ActorNursery.run_in_actor() + await actor_nursery._join_procs.wait() + + if portal in actor_nursery._cancel_after_result_on_exit: + cancel_scope = await nursery.start( + cancel_on_completion, portal, subactor, errors) + + # TODO: timeout block here? + if proc.is_alive(): + await proc_waiter(proc) + proc.join() + + log.debug(f"Joined {proc}") + # pop child entry to indicate we are no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + f"Cancelling existing result waiter task for {subactor.uid}") + cancel_scope.cancel() diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 9808935..49f7838 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -1,42 +1,37 @@ """ ``trio`` inspired apis and helpers """ -import inspect -import importlib -import platform import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing import trio -from async_generator import asynccontextmanager, aclosing -import trio_run_in_process +from async_generator import asynccontextmanager from ._state import current_actor from .log import get_logger, get_loglevel -from ._actor import Actor, ActorFailure +from ._actor import Actor # , ActorFailure from ._portal import Portal from . import _spawn -if platform.system() == 'Windows': - async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.WaitForSingleObject(proc.sentinel) -else: - async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.wait_readable(proc.sentinel) - - log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor: Actor, nursery: trio.Nursery) -> None: + def __init__( + self, + actor: Actor, + ria_nursery: trio.Nursery, + da_nursery: trio.Nursery, + errors: Dict[str, Exception], + ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor - self._nursery = nursery + self._ria_nursery = ria_nursery + self._da_nursery = da_nursery self._children: Dict[ Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] @@ -45,10 +40,8 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False - # self._aexitstack = contextlib.AsyncExitStack() - - async def __aenter__(self): - return self + self._join_procs = trio.Event() + self.errors = errors async def start_actor( self, @@ -57,51 +50,34 @@ class ActorNursery: statespace: Optional[Dict[str, Any]] = None, rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor + nursery: trio.Nursery = None, ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() - mods = {} - for path in rpc_module_paths or (): - mod = importlib.import_module(path) - mods[path] = mod.__file__ - - actor = Actor( + subactor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=mods, + rpc_module_paths=rpc_module_paths, statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr assert parent_addr - proc = await _spawn.new_proc( + + # start a task to spawn a process + # blocks until process has been started and a portal setup + nursery = nursery or self._da_nursery + return await nursery.start( + _spawn.new_proc, name, - actor, + self, + subactor, + self.errors, bind_addr, parent_addr, - self._nursery, + nursery, ) - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - self._children[actor.uid] = (actor, proc, None) - - if not isinstance(proc, trio_run_in_process.process.Process): - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") - - log.info(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await self._actor.wait_for_peer(actor.uid) - portal = Portal(chan) - self._children[actor.uid] = (actor, proc, portal) - - return portal async def run_in_actor( self, @@ -127,6 +103,8 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, loglevel=loglevel, + # use the run_in_actor nursery + nursery=self._ria_nursery, ) # this marks the actor to be cancelled after its portal result # is retreived, see ``wait()`` below. @@ -140,153 +118,9 @@ class ActorNursery: async def wait(self) -> None: """Wait for all subactors to complete. - - This is probably the most complicated (and confusing, sorry) - function that does all the clever crap to deal with cancellation, - error propagation, and graceful subprocess tear down. """ - async def exhaust_portal(portal, actor): - """Pull final result from portal (assuming it has one). - - If the main task is an async generator do our best to consume - what's left of it. - """ - try: - log.debug(f"Waiting on final result from {actor.uid}") - final = res = await portal.result() - # if it's an async-gen then alert that we're cancelling it - if inspect.isasyncgen(res): - final = [] - log.warning( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") - final.append(item) - except (Exception, trio.MultiError) as err: - # we reraise in the parent task via a ``trio.MultiError`` - return err - else: - return final - - async def cancel_on_completion( - portal: Portal, - actor: Actor, - task_status=trio.TASK_STATUS_IGNORED, - ) -> None: - """Cancel actor gracefully once it's "main" portal's - result arrives. - - Should only be called for actors spawned with `run_in_actor()`. - """ - with trio.CancelScope() as cs: - task_status.started(cs) - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors.append(result) - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - else: - log.info(f"Cancelling {portal.channel.uid} gracefully") - - # cancel the process now that we have a final result - await portal.cancel_actor() - - # XXX: lol, this will never get run without a shield above.. - # if cs.cancelled_caught: - # log.warning( - # "Result waiter was cancelled, process may have died") - - async def wait_for_proc( - proc: mp.Process, - actor: Actor, - portal: Portal, - cancel_scope: Optional[trio._core._run.CancelScope] = None, - ) -> None: - # please god don't hang - if not isinstance(proc, trio_run_in_process.process.Process): - # TODO: timeout block here? - if proc.is_alive(): - await proc_waiter(proc) - proc.join() - else: - # trio_run_in_process blocking wait - if errors: - multierror = trio.MultiError(errors) - # import pdb; pdb.set_trace() - # try: - # with trio.CancelScope(shield=True): - # await proc.mng.__aexit__( - # type(multierror), - # multierror, - # multierror.__traceback__, - # ) - # except BaseException as err: - # import pdb; pdb.set_trace() - # pass - # else: - await proc.mng.__aexit__(None, None, None) - # proc.nursery.cancel_scope.cancel() - - log.debug(f"Joined {proc}") - # indicate we are no longer managing this subactor - self._children.pop(actor.uid) - - # proc terminated, cancel result waiter that may have - # been spawned in tandem if not done already - if cancel_scope: # and not portal._cancelled: - log.warning( - f"Cancelling existing result waiter task for {actor.uid}") - cancel_scope.cancel() - log.debug(f"Waiting on all subactors to complete") - # since we pop each child subactor on termination, - # iterate a copy - children = self._children.copy() - errors: List[Exception] = [] - # wait on run_in_actor() tasks, unblocks when all complete - async with trio.open_nursery() as nursery: - # async with self._nursery as nursery: - for subactor, proc, portal in children.values(): - cs = None - # portal from ``run_in_actor()`` - if portal in self._cancel_after_result_on_exit: - cs = await nursery.start( - cancel_on_completion, portal, subactor) - # TODO: how do we handle remote host spawned actors? - nursery.start_soon( - wait_for_proc, proc, subactor, portal, cs) - - if errors: - multierror = trio.MultiError(errors) - if not self.cancelled: - # bubble up error(s) here and expect to be called again - # once the nursery has been cancelled externally (ex. - # from within __aexit__() if an error is caught around - # ``self.wait()`` then, ``self.cancel()`` is called - # immediately, in the default supervisor strat, after - # which in turn ``self.wait()`` is called again.) - raise trio.MultiError(errors) - - # wait on all `start_actor()` subactors to complete - # if errors were captured above and we have not been cancelled - # then these ``start_actor()`` spawned actors will block until - # cancelled externally - children = self._children.copy() - async with trio.open_nursery() as nursery: - for subactor, proc, portal in children.values(): - # TODO: how do we handle remote host spawned actors? - nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) - - log.debug(f"All subactors for {self} have terminated") - if errors: - # always raise any error if we're also cancelled - raise trio.MultiError(errors) + self._join_procs.set() async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel @@ -304,7 +138,7 @@ class ActorNursery: log.debug(f"Cancelling nursery") with trio.move_on_after(3) as cs: - async with trio.open_nursery() as n: + async with trio.open_nursery() as nursery: for subactor, proc, portal in self._children.values(): if hard_kill: do_hard_kill(proc) @@ -331,59 +165,20 @@ class ActorNursery: # spawn cancel tasks for each sub-actor assert portal - n.start_soon(portal.cancel_actor) + nursery.start_soon(portal.cancel_actor) # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes if cs.cancelled_caught: log.error(f"Failed to gracefully cancel {self}, hard killing!") - async with trio.open_nursery() as n: + async with trio.open_nursery(): for subactor, proc, portal in self._children.values(): - n.start_soon(do_hard_kill, proc) + nursery.start_soon(do_hard_kill, proc) # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True await self.wait() - async def __aexit__(self, etype, value, tb): - """Wait on all subactor's main routines to complete. - """ - # XXX: this is effectively the (for now) lone - # cancellation/supervisor strategy (one-cancels-all) - # which exactly mimicks trio's behaviour - if etype is not None: - try: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case - # the `else:` block here might not complete? - # For now, shield both. - with trio.CancelScope(shield=True): - if etype in (trio.Cancelled, KeyboardInterrupt): - log.warning( - f"Nursery for {current_actor().uid} was " - f"cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {etype}, ") - await self.cancel() - except trio.MultiError as merr: - if value not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [value]) - raise - else: - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except (Exception, trio.MultiError) as err: - log.warning(f"Nursery cancelling due to {err}") - if self._children: - with trio.CancelScope(shield=True): - await self.cancel() - raise - - log.debug(f"Nursery teardown complete") - @asynccontextmanager async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: @@ -395,12 +190,67 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: if not actor: raise RuntimeError("No actor instance has been defined yet?") - # XXX we need this nursery because TRIP is doing all its stuff with + # XXX we use these nurseries because TRIP is doing all its stuff with # an `@asynccontextmanager` which has an internal nursery *and* the # task that opens a nursery must also close it - so we need a path - # in TRIP to make this all kinda work as well. Note I'm basically - # giving up for now - it's probably equivalent amounts of work to - # make TRIP vs. `multiprocessing` work here. - async with trio.open_nursery() as nursery: - async with ActorNursery(actor, nursery) as anursery: - yield anursery + # in TRIP to make this all kinda work as well. + errors: Dict[str, Exception] = {} + async with trio.open_nursery() as da_nursery: + try: + async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( + actor, ria_nursery, da_nursery, errors + ) + try: + # spawning of actors happens in this scope after + # we yield to the caller. + yield anursery + log.debug( + f"Waiting on subactors {anursery._children}" + "to complete" + ) + # anursery.wait() + # except (trio.Cancelled, KeyboardInterrupt) as err: + except (BaseException, Exception) as err: + anursery._join_procs.set() + try: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. + with trio.CancelScope(shield=True): + if err in (trio.Cancelled, KeyboardInterrupt): + log.warning( + f"Nursery for {current_actor().uid} was " + f"cancelled with {err}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") + await anursery.cancel() + except trio.MultiError as merr: + if err not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [err]) + else: + raise + + # last bit before first nursery block end + log.debug(f"Waiting on all subactors to complete") + anursery._join_procs.set() + # ria_nursery scope + except (Exception, trio.MultiError) as err: + log.warning(f"Nursery cancelling due to {err}") + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + raise + finally: + if errors: + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + if len(errors) > 1: + raise trio.MultiError(errors.values()) + else: + raise list(errors.values())[0] + log.debug(f"Nursery teardown complete")