WIP trying out trio_run_in_process
parent
7c0efce84b
commit
1b7cdfe512
|
@ -225,9 +225,19 @@ class Actor:
|
|||
the original nursery we need to try and load the local module
|
||||
code (if it exists).
|
||||
"""
|
||||
for path in self.rpc_module_paths:
|
||||
log.debug(f"Attempting to import {path}")
|
||||
self._mods[path] = importlib.import_module(path)
|
||||
try:
|
||||
for path in self.rpc_module_paths:
|
||||
log.debug(f"Attempting to import {path}")
|
||||
self._mods[path] = importlib.import_module(path)
|
||||
|
||||
# if self.name != 'arbiter':
|
||||
# importlib.import_module('doggy')
|
||||
# from celery.contrib import rdb; rdb.set_trace()
|
||||
except ModuleNotFoundError:
|
||||
# it is expected the corresponding `ModuleNotExposed` error
|
||||
# will be raised later
|
||||
log.error(f"Failed to import {path} in {self.name}")
|
||||
raise
|
||||
|
||||
def _get_rpc_func(self, ns, funcname):
|
||||
try:
|
||||
|
@ -488,7 +498,7 @@ class Actor:
|
|||
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||
f"with last msg:\n{msg}")
|
||||
|
||||
def _fork_main(
|
||||
def _mp_main(
|
||||
self,
|
||||
accept_addr: Tuple[str, int],
|
||||
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
||||
|
@ -500,13 +510,17 @@ class Actor:
|
|||
self._forkserver_info = forkserver_info
|
||||
from ._spawn import try_set_start_method
|
||||
spawn_ctx = try_set_start_method(start_method)
|
||||
|
||||
if self.loglevel is not None:
|
||||
log.info(
|
||||
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
||||
get_console_log(self.loglevel)
|
||||
|
||||
log.info(
|
||||
f"Started new {spawn_ctx.current_process()} for {self.uid}")
|
||||
|
||||
_state._current_actor = self
|
||||
|
||||
log.debug(f"parent_addr is {parent_addr}")
|
||||
try:
|
||||
trio.run(partial(
|
||||
|
@ -515,6 +529,21 @@ class Actor:
|
|||
pass # handle it the same way trio does?
|
||||
log.info(f"Actor {self.uid} terminated")
|
||||
|
||||
async def _trip_main(
|
||||
self,
|
||||
accept_addr: Tuple[str, int],
|
||||
parent_addr: Tuple[str, int] = None
|
||||
) -> None:
|
||||
if self.loglevel is not None:
|
||||
log.info(
|
||||
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
||||
get_console_log(self.loglevel)
|
||||
|
||||
log.info(f"Started new TRIP process for {self.uid}")
|
||||
_state._current_actor = self
|
||||
await self._async_main(accept_addr, parent_addr=parent_addr)
|
||||
log.info(f"Actor {self.uid} terminated")
|
||||
|
||||
async def _async_main(
|
||||
self,
|
||||
accept_addr: Tuple[str, int],
|
||||
|
@ -584,6 +613,8 @@ class Actor:
|
|||
# blocks here as expected until the channel server is
|
||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||
except Exception as err:
|
||||
# if self.name == 'arbiter':
|
||||
# import pdb; pdb.set_trace()
|
||||
if not registered_with_arbiter:
|
||||
log.exception(
|
||||
f"Actor errored and failed to register with arbiter "
|
||||
|
@ -598,12 +629,18 @@ class Actor:
|
|||
f"Failed to ship error to parent "
|
||||
f"{self._parent_chan.uid}, channel was closed")
|
||||
log.exception("Actor errored:")
|
||||
|
||||
if isinstance(err, ModuleNotFoundError):
|
||||
raise
|
||||
else:
|
||||
# XXX wait, why?
|
||||
# causes a hang if I always raise..
|
||||
# A parent process does something weird here?
|
||||
raise
|
||||
|
||||
finally:
|
||||
# if self.name == 'arbiter':
|
||||
# import pdb; pdb.set_trace()
|
||||
if registered_with_arbiter:
|
||||
await self._do_unreg(arbiter_addr)
|
||||
# terminate actor once all it's peers (actors that connected
|
||||
|
|
|
@ -5,6 +5,8 @@ Mostly just wrapping around ``multiprocessing``.
|
|||
"""
|
||||
import multiprocessing as mp
|
||||
|
||||
import trio_run_in_process
|
||||
|
||||
try:
|
||||
from multiprocessing import semaphore_tracker # type: ignore
|
||||
resource_tracker = semaphore_tracker
|
||||
|
@ -55,54 +57,66 @@ def is_main_process() -> bool:
|
|||
return mp.current_process().name == 'MainProcess'
|
||||
|
||||
|
||||
def new_proc(
|
||||
async def new_proc(
|
||||
name: str,
|
||||
actor: Actor,
|
||||
# passed through to actor main
|
||||
bind_addr: Tuple[str, int],
|
||||
parent_addr: Tuple[str, int],
|
||||
use_trip: bool = True,
|
||||
) -> mp.Process:
|
||||
"""Create a new ``multiprocessing.Process`` using the
|
||||
spawn method as configured using ``try_set_start_method()``.
|
||||
"""
|
||||
start_method = _ctx.get_start_method()
|
||||
if start_method == 'forkserver':
|
||||
# XXX do our hackery on the stdlib to avoid multiple
|
||||
# forkservers (one at each subproc layer).
|
||||
fs = forkserver._forkserver
|
||||
curr_actor = current_actor()
|
||||
if is_main_process() and not curr_actor._forkserver_info:
|
||||
# if we're the "main" process start the forkserver only once
|
||||
# and pass its ipc info to downstream children
|
||||
# forkserver.set_forkserver_preload(rpc_module_paths)
|
||||
forkserver.ensure_running()
|
||||
fs_info = (
|
||||
fs._forkserver_address,
|
||||
fs._forkserver_alive_fd,
|
||||
getattr(fs, '_forkserver_pid', None),
|
||||
getattr(resource_tracker._resource_tracker, '_pid', None),
|
||||
resource_tracker._resource_tracker._fd,
|
||||
)
|
||||
else:
|
||||
assert curr_actor._forkserver_info
|
||||
fs_info = (
|
||||
fs._forkserver_address,
|
||||
fs._forkserver_alive_fd,
|
||||
fs._forkserver_pid,
|
||||
resource_tracker._resource_tracker._pid,
|
||||
resource_tracker._resource_tracker._fd,
|
||||
) = curr_actor._forkserver_info
|
||||
else:
|
||||
fs_info = (None, None, None, None, None)
|
||||
|
||||
return _ctx.Process( # type: ignore
|
||||
target=actor._fork_main,
|
||||
args=(
|
||||
if use_trip: # trio_run_in_process
|
||||
mng = trio_run_in_process.open_in_process(
|
||||
actor._trip_main,
|
||||
bind_addr,
|
||||
fs_info,
|
||||
start_method,
|
||||
parent_addr
|
||||
),
|
||||
# daemon=True,
|
||||
name=name,
|
||||
)
|
||||
)
|
||||
proc = await mng.__aenter__()
|
||||
proc.mng = mng
|
||||
return proc
|
||||
else:
|
||||
# use multiprocessing
|
||||
start_method = _ctx.get_start_method()
|
||||
if start_method == 'forkserver':
|
||||
# XXX do our hackery on the stdlib to avoid multiple
|
||||
# forkservers (one at each subproc layer).
|
||||
fs = forkserver._forkserver
|
||||
curr_actor = current_actor()
|
||||
if is_main_process() and not curr_actor._forkserver_info:
|
||||
# if we're the "main" process start the forkserver only once
|
||||
# and pass its ipc info to downstream children
|
||||
# forkserver.set_forkserver_preload(rpc_module_paths)
|
||||
forkserver.ensure_running()
|
||||
fs_info = (
|
||||
fs._forkserver_address,
|
||||
fs._forkserver_alive_fd,
|
||||
getattr(fs, '_forkserver_pid', None),
|
||||
getattr(resource_tracker._resource_tracker, '_pid', None),
|
||||
resource_tracker._resource_tracker._fd,
|
||||
)
|
||||
else:
|
||||
assert curr_actor._forkserver_info
|
||||
fs_info = (
|
||||
fs._forkserver_address,
|
||||
fs._forkserver_alive_fd,
|
||||
fs._forkserver_pid,
|
||||
resource_tracker._resource_tracker._pid,
|
||||
resource_tracker._resource_tracker._fd,
|
||||
) = curr_actor._forkserver_info
|
||||
else:
|
||||
fs_info = (None, None, None, None, None)
|
||||
|
||||
return _ctx.Process(
|
||||
target=actor._mp_main,
|
||||
args=(
|
||||
bind_addr,
|
||||
fs_info,
|
||||
start_method,
|
||||
parent_addr
|
||||
),
|
||||
# daemon=True,
|
||||
name=name,
|
||||
)
|
||||
|
|
|
@ -9,6 +9,7 @@ import typing
|
|||
|
||||
import trio
|
||||
from async_generator import asynccontextmanager, aclosing
|
||||
import trio_run_in_process
|
||||
|
||||
from ._state import current_actor
|
||||
from .log import get_logger, get_loglevel
|
||||
|
@ -64,20 +65,23 @@ class ActorNursery:
|
|||
arbiter_addr=current_actor()._arb_addr,
|
||||
)
|
||||
parent_addr = self._actor.accept_addr
|
||||
proc = _spawn.new_proc(
|
||||
assert parent_addr
|
||||
proc = await _spawn.new_proc(
|
||||
name,
|
||||
actor,
|
||||
bind_addr,
|
||||
parent_addr,
|
||||
)
|
||||
# `multiprocessing` only (since no async interface):
|
||||
# register the process before start in case we get a cancel
|
||||
# request before the actor has fully spawned - then we can wait
|
||||
# for it to fully come up before sending a cancel request
|
||||
self._children[actor.uid] = (actor, proc, None)
|
||||
|
||||
proc.start()
|
||||
if not proc.is_alive():
|
||||
raise ActorFailure("Couldn't start sub-actor?")
|
||||
if not isinstance(proc, trio_run_in_process.process.Process):
|
||||
proc.start()
|
||||
if not proc.is_alive():
|
||||
raise ActorFailure("Couldn't start sub-actor?")
|
||||
|
||||
log.info(f"Started {proc}")
|
||||
# wait for actor to spawn and connect back to us
|
||||
|
@ -193,12 +197,17 @@ class ActorNursery:
|
|||
actor: Actor,
|
||||
cancel_scope: Optional[trio.CancelScope] = None,
|
||||
) -> None:
|
||||
# TODO: timeout block here?
|
||||
if proc.is_alive():
|
||||
await proc_waiter(proc)
|
||||
|
||||
# please god don't hang
|
||||
proc.join()
|
||||
if not isinstance(proc, trio_run_in_process.process.Process):
|
||||
# TODO: timeout block here?
|
||||
if proc.is_alive():
|
||||
await proc_waiter(proc)
|
||||
proc.join()
|
||||
else:
|
||||
# trio_run_in_process blocking wait
|
||||
await proc.mng.__aexit__(None, None, None)
|
||||
# proc.nursery.cancel_scope.cancel()
|
||||
|
||||
log.debug(f"Joined {proc}")
|
||||
# indicate we are no longer managing this subactor
|
||||
self._children.pop(actor.uid)
|
||||
|
|
Loading…
Reference in New Issue