forked from goodboy/tractor
				
			
						commit
						ffd10e193e
					
				| 
						 | 
					@ -169,10 +169,13 @@ async def open_root_actor(
 | 
				
			||||||
                logger.exception("Actor crashed:")
 | 
					                logger.exception("Actor crashed:")
 | 
				
			||||||
                await _debug._maybe_enter_pm(err)
 | 
					                await _debug._maybe_enter_pm(err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # always re-raise
 | 
				
			||||||
                raise
 | 
					                raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            finally:
 | 
					            finally:
 | 
				
			||||||
                logger.info("Shutting down root actor")
 | 
					                logger.info("Shutting down root actor")
 | 
				
			||||||
                await actor.cancel()
 | 
					                with trio.CancelScope(shield=True):
 | 
				
			||||||
 | 
					                    await actor.cancel()
 | 
				
			||||||
    finally:
 | 
					    finally:
 | 
				
			||||||
        _state._current_actor = None
 | 
					        _state._current_actor = None
 | 
				
			||||||
        logger.info("Root actor terminated")
 | 
					        logger.info("Root actor terminated")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2,14 +2,13 @@
 | 
				
			||||||
Machinery for actor process spawning using multiple backends.
 | 
					Machinery for actor process spawning using multiple backends.
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
import inspect
 | 
					 | 
				
			||||||
import multiprocessing as mp
 | 
					import multiprocessing as mp
 | 
				
			||||||
import platform
 | 
					import platform
 | 
				
			||||||
from typing import Any, Dict, Optional
 | 
					from typing import Any, Dict, Optional
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
from trio_typing import TaskStatus
 | 
					from trio_typing import TaskStatus
 | 
				
			||||||
from async_generator import aclosing, asynccontextmanager
 | 
					from async_generator import asynccontextmanager
 | 
				
			||||||
 | 
					
 | 
				
			||||||
try:
 | 
					try:
 | 
				
			||||||
    from multiprocessing import semaphore_tracker  # type: ignore
 | 
					    from multiprocessing import semaphore_tracker  # type: ignore
 | 
				
			||||||
| 
						 | 
					@ -128,7 +127,9 @@ async def cancel_on_completion(
 | 
				
			||||||
    Should only be called for actors spawned with `run_in_actor()`.
 | 
					    Should only be called for actors spawned with `run_in_actor()`.
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    with trio.CancelScope() as cs:
 | 
					    with trio.CancelScope() as cs:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        task_status.started(cs)
 | 
					        task_status.started(cs)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # if this call errors we store the exception for later
 | 
					        # if this call errors we store the exception for later
 | 
				
			||||||
        # in ``errors`` which will be reraised inside
 | 
					        # in ``errors`` which will be reraised inside
 | 
				
			||||||
        # a MultiError and we still send out a cancel request
 | 
					        # a MultiError and we still send out a cancel request
 | 
				
			||||||
| 
						 | 
					@ -138,6 +139,7 @@ async def cancel_on_completion(
 | 
				
			||||||
            log.warning(
 | 
					            log.warning(
 | 
				
			||||||
                f"Cancelling {portal.channel.uid} after error {result}"
 | 
					                f"Cancelling {portal.channel.uid} after error {result}"
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            log.info(
 | 
					            log.info(
 | 
				
			||||||
                f"Cancelling {portal.channel.uid} gracefully "
 | 
					                f"Cancelling {portal.channel.uid} gracefully "
 | 
				
			||||||
| 
						 | 
					@ -202,7 +204,7 @@ async def spawn_subactor(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def new_proc(
 | 
					async def new_proc(
 | 
				
			||||||
    name: str,
 | 
					    name: str,
 | 
				
			||||||
    actor_nursery: 'ActorNursery',  # type: ignore
 | 
					    actor_nursery: 'ActorNursery',  # type: ignore  # noqa
 | 
				
			||||||
    subactor: Actor,
 | 
					    subactor: Actor,
 | 
				
			||||||
    errors: Dict[Tuple[str, str], Exception],
 | 
					    errors: Dict[Tuple[str, str], Exception],
 | 
				
			||||||
    # passed through to actor main
 | 
					    # passed through to actor main
 | 
				
			||||||
| 
						 | 
					@ -221,8 +223,8 @@ async def new_proc(
 | 
				
			||||||
    # mark the new actor with the global spawn method
 | 
					    # mark the new actor with the global spawn method
 | 
				
			||||||
    subactor._spawn_method = _spawn_method
 | 
					    subactor._spawn_method = _spawn_method
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async with trio.open_nursery() as nursery:
 | 
					    if use_trio_run_in_process or _spawn_method == 'trio':
 | 
				
			||||||
        if use_trio_run_in_process or _spawn_method == 'trio':
 | 
					        async with trio.open_nursery() as nursery:
 | 
				
			||||||
            async with spawn_subactor(
 | 
					            async with spawn_subactor(
 | 
				
			||||||
                subactor,
 | 
					                subactor,
 | 
				
			||||||
                parent_addr,
 | 
					                parent_addr,
 | 
				
			||||||
| 
						 | 
					@ -261,7 +263,11 @@ async def new_proc(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if portal in actor_nursery._cancel_after_result_on_exit:
 | 
					                if portal in actor_nursery._cancel_after_result_on_exit:
 | 
				
			||||||
                    cancel_scope = await nursery.start(
 | 
					                    cancel_scope = await nursery.start(
 | 
				
			||||||
                        cancel_on_completion, portal, subactor, errors)
 | 
					                        cancel_on_completion,
 | 
				
			||||||
 | 
					                        portal,
 | 
				
			||||||
 | 
					                        subactor,
 | 
				
			||||||
 | 
					                        errors
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # Wait for proc termination but **dont' yet** call
 | 
					                # Wait for proc termination but **dont' yet** call
 | 
				
			||||||
                # ``trio.Process.__aexit__()`` (it tears down stdio
 | 
					                # ``trio.Process.__aexit__()`` (it tears down stdio
 | 
				
			||||||
| 
						 | 
					@ -275,65 +281,108 @@ async def new_proc(
 | 
				
			||||||
                # no actor zombies allowed
 | 
					                # no actor zombies allowed
 | 
				
			||||||
                # with trio.CancelScope(shield=True):
 | 
					                # with trio.CancelScope(shield=True):
 | 
				
			||||||
                await proc.wait()
 | 
					                await proc.wait()
 | 
				
			||||||
        else:
 | 
					
 | 
				
			||||||
            # `multiprocessing`
 | 
					            log.debug(f"Joined {proc}")
 | 
				
			||||||
            assert _ctx
 | 
					            # pop child entry to indicate we no longer managing this subactor
 | 
				
			||||||
            start_method = _ctx.get_start_method()
 | 
					            subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
 | 
				
			||||||
            if start_method == 'forkserver':
 | 
					
 | 
				
			||||||
                # XXX do our hackery on the stdlib to avoid multiple
 | 
					            # cancel result waiter that may have been spawned in
 | 
				
			||||||
                # forkservers (one at each subproc layer).
 | 
					            # tandem if not done already
 | 
				
			||||||
                fs = forkserver._forkserver
 | 
					            if cancel_scope:
 | 
				
			||||||
                curr_actor = current_actor()
 | 
					                log.warning(
 | 
				
			||||||
                if is_main_process() and not curr_actor._forkserver_info:
 | 
					                    "Cancelling existing result waiter task for "
 | 
				
			||||||
                    # if we're the "main" process start the forkserver
 | 
					                    f"{subactor.uid}")
 | 
				
			||||||
                    # only once and pass its ipc info to downstream
 | 
					                cancel_scope.cancel()
 | 
				
			||||||
                    # children
 | 
					    else:
 | 
				
			||||||
                    # forkserver.set_forkserver_preload(enable_modules)
 | 
					        # `multiprocessing`
 | 
				
			||||||
                    forkserver.ensure_running()
 | 
					        # async with trio.open_nursery() as nursery:
 | 
				
			||||||
                    fs_info = (
 | 
					        await mp_new_proc(
 | 
				
			||||||
                        fs._forkserver_address,
 | 
					            name=name,
 | 
				
			||||||
                        fs._forkserver_alive_fd,
 | 
					            actor_nursery=actor_nursery,
 | 
				
			||||||
                        getattr(fs, '_forkserver_pid', None),
 | 
					            subactor=subactor,
 | 
				
			||||||
                        getattr(
 | 
					            errors=errors,
 | 
				
			||||||
                            resource_tracker._resource_tracker, '_pid', None),
 | 
					            # passed through to actor main
 | 
				
			||||||
                        resource_tracker._resource_tracker._fd,
 | 
					            bind_addr=bind_addr,
 | 
				
			||||||
                    )
 | 
					            parent_addr=parent_addr,
 | 
				
			||||||
                else:
 | 
					            _runtime_vars=_runtime_vars,
 | 
				
			||||||
                    assert curr_actor._forkserver_info
 | 
					            task_status=task_status,
 | 
				
			||||||
                    fs_info = (
 | 
					        )
 | 
				
			||||||
                        fs._forkserver_address,
 | 
					
 | 
				
			||||||
                        fs._forkserver_alive_fd,
 | 
					
 | 
				
			||||||
                        fs._forkserver_pid,
 | 
					async def mp_new_proc(
 | 
				
			||||||
                        resource_tracker._resource_tracker._pid,
 | 
					
 | 
				
			||||||
                        resource_tracker._resource_tracker._fd,
 | 
					    name: str,
 | 
				
			||||||
                     ) = curr_actor._forkserver_info
 | 
					    actor_nursery: 'ActorNursery',  # type: ignore  # noqa
 | 
				
			||||||
 | 
					    subactor: Actor,
 | 
				
			||||||
 | 
					    errors: Dict[Tuple[str, str], Exception],
 | 
				
			||||||
 | 
					    # passed through to actor main
 | 
				
			||||||
 | 
					    bind_addr: Tuple[str, int],
 | 
				
			||||||
 | 
					    parent_addr: Tuple[str, int],
 | 
				
			||||||
 | 
					    _runtime_vars: Dict[str, Any],  # serialized and sent to _child
 | 
				
			||||||
 | 
					    *,
 | 
				
			||||||
 | 
					    use_trio_run_in_process: bool = False,
 | 
				
			||||||
 | 
					    task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    async with trio.open_nursery() as nursery:
 | 
				
			||||||
 | 
					        assert _ctx
 | 
				
			||||||
 | 
					        start_method = _ctx.get_start_method()
 | 
				
			||||||
 | 
					        if start_method == 'forkserver':
 | 
				
			||||||
 | 
					            # XXX do our hackery on the stdlib to avoid multiple
 | 
				
			||||||
 | 
					            # forkservers (one at each subproc layer).
 | 
				
			||||||
 | 
					            fs = forkserver._forkserver
 | 
				
			||||||
 | 
					            curr_actor = current_actor()
 | 
				
			||||||
 | 
					            if is_main_process() and not curr_actor._forkserver_info:
 | 
				
			||||||
 | 
					                # if we're the "main" process start the forkserver
 | 
				
			||||||
 | 
					                # only once and pass its ipc info to downstream
 | 
				
			||||||
 | 
					                # children
 | 
				
			||||||
 | 
					                # forkserver.set_forkserver_preload(enable_modules)
 | 
				
			||||||
 | 
					                forkserver.ensure_running()
 | 
				
			||||||
 | 
					                fs_info = (
 | 
				
			||||||
 | 
					                    fs._forkserver_address,
 | 
				
			||||||
 | 
					                    fs._forkserver_alive_fd,
 | 
				
			||||||
 | 
					                    getattr(fs, '_forkserver_pid', None),
 | 
				
			||||||
 | 
					                    getattr(
 | 
				
			||||||
 | 
					                        resource_tracker._resource_tracker, '_pid', None),
 | 
				
			||||||
 | 
					                    resource_tracker._resource_tracker._fd,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                fs_info = (None, None, None, None, None)
 | 
					                assert curr_actor._forkserver_info
 | 
				
			||||||
 | 
					                fs_info = (
 | 
				
			||||||
 | 
					                    fs._forkserver_address,
 | 
				
			||||||
 | 
					                    fs._forkserver_alive_fd,
 | 
				
			||||||
 | 
					                    fs._forkserver_pid,
 | 
				
			||||||
 | 
					                    resource_tracker._resource_tracker._pid,
 | 
				
			||||||
 | 
					                    resource_tracker._resource_tracker._fd,
 | 
				
			||||||
 | 
					                 ) = curr_actor._forkserver_info
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            fs_info = (None, None, None, None, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            proc = _ctx.Process(  # type: ignore
 | 
					        proc: mp.Process = _ctx.Process(  # type: ignore
 | 
				
			||||||
                target=_mp_main,
 | 
					            target=_mp_main,
 | 
				
			||||||
                args=(
 | 
					            args=(
 | 
				
			||||||
                    subactor,
 | 
					                subactor,
 | 
				
			||||||
                    bind_addr,
 | 
					                bind_addr,
 | 
				
			||||||
                    fs_info,
 | 
					                fs_info,
 | 
				
			||||||
                    start_method,
 | 
					                start_method,
 | 
				
			||||||
                    parent_addr,
 | 
					                parent_addr,
 | 
				
			||||||
                ),
 | 
					            ),
 | 
				
			||||||
                # daemon=True,
 | 
					            # daemon=True,
 | 
				
			||||||
                name=name,
 | 
					            name=name,
 | 
				
			||||||
            )
 | 
					        )
 | 
				
			||||||
            # `multiprocessing` only (since no async interface):
 | 
					        # `multiprocessing` only (since no async interface):
 | 
				
			||||||
            # register the process before start in case we get a cancel
 | 
					        # register the process before start in case we get a cancel
 | 
				
			||||||
            # request before the actor has fully spawned - then we can wait
 | 
					        # request before the actor has fully spawned - then we can wait
 | 
				
			||||||
            # for it to fully come up before sending a cancel request
 | 
					        # for it to fully come up before sending a cancel request
 | 
				
			||||||
            actor_nursery._children[subactor.uid] = (subactor, proc, None)
 | 
					        actor_nursery._children[subactor.uid] = (subactor, proc, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            proc.start()
 | 
					        proc.start()
 | 
				
			||||||
            if not proc.is_alive():
 | 
					        if not proc.is_alive():
 | 
				
			||||||
                raise ActorFailure("Couldn't start sub-actor?")
 | 
					            raise ActorFailure("Couldn't start sub-actor?")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            log.info(f"Started {proc}")
 | 
					        log.info(f"Started {proc}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
            # wait for actor to spawn and connect back to us
 | 
					            # wait for actor to spawn and connect back to us
 | 
				
			||||||
            # channel should have handshake completed by the
 | 
					            # channel should have handshake completed by the
 | 
				
			||||||
            # local actor by the time we get a ref to it
 | 
					            # local actor by the time we get a ref to it
 | 
				
			||||||
| 
						 | 
					@ -356,23 +405,60 @@ async def new_proc(
 | 
				
			||||||
            # awaited and reported upwards to the supervisor.
 | 
					            # awaited and reported upwards to the supervisor.
 | 
				
			||||||
            await actor_nursery._join_procs.wait()
 | 
					            await actor_nursery._join_procs.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        finally:
 | 
				
			||||||
 | 
					            # XXX: in the case we were cancelled before the sub-proc
 | 
				
			||||||
 | 
					            # registered itself back we must be sure to try and clean
 | 
				
			||||||
 | 
					            # any process we may have started.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            reaping_cancelled: bool = False
 | 
				
			||||||
 | 
					            cancel_scope: Optional[trio.CancelScope] = None
 | 
				
			||||||
 | 
					            cancel_exc: Optional[trio.Cancelled] = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if portal in actor_nursery._cancel_after_result_on_exit:
 | 
					            if portal in actor_nursery._cancel_after_result_on_exit:
 | 
				
			||||||
                cancel_scope = await nursery.start(
 | 
					                try:
 | 
				
			||||||
                    cancel_on_completion, portal, subactor, errors)
 | 
					                    # async with trio.open_nursery() as n:
 | 
				
			||||||
 | 
					                    # n.cancel_scope.shield = True
 | 
				
			||||||
 | 
					                    cancel_scope = await nursery.start(
 | 
				
			||||||
 | 
					                        cancel_on_completion,
 | 
				
			||||||
 | 
					                        portal,
 | 
				
			||||||
 | 
					                        subactor,
 | 
				
			||||||
 | 
					                        errors
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                except trio.Cancelled as err:
 | 
				
			||||||
 | 
					                    cancel_exc = err
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # if the reaping task was cancelled we may have hit
 | 
				
			||||||
 | 
					                    # a race where the subproc disconnected before we
 | 
				
			||||||
 | 
					                    # could send it a message to cancel (classic 2 generals)
 | 
				
			||||||
 | 
					                    # in that case, wait shortly then kill the process.
 | 
				
			||||||
 | 
					                    reaping_cancelled = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    if proc.is_alive():
 | 
				
			||||||
 | 
					                        with trio.move_on_after(0.1) as cs:
 | 
				
			||||||
 | 
					                            cs.shield = True
 | 
				
			||||||
 | 
					                            await proc_waiter(proc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        if cs.cancelled_caught:
 | 
				
			||||||
 | 
					                            proc.terminate()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if not reaping_cancelled and proc.is_alive():
 | 
				
			||||||
 | 
					                await proc_waiter(proc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: timeout block here?
 | 
					            # TODO: timeout block here?
 | 
				
			||||||
            if proc.is_alive():
 | 
					 | 
				
			||||||
                await proc_waiter(proc)
 | 
					 | 
				
			||||||
            proc.join()
 | 
					            proc.join()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # This is again common logic for all backends:
 | 
					            log.debug(f"Joined {proc}")
 | 
				
			||||||
 | 
					            # pop child entry to indicate we are no longer managing subactor
 | 
				
			||||||
 | 
					            subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        log.debug(f"Joined {proc}")
 | 
					            # cancel result waiter that may have been spawned in
 | 
				
			||||||
        # pop child entry to indicate we are no longer managing this subactor
 | 
					            # tandem if not done already
 | 
				
			||||||
        subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
 | 
					            if cancel_scope:
 | 
				
			||||||
        # cancel result waiter that may have been spawned in
 | 
					                log.warning(
 | 
				
			||||||
        # tandem if not done already
 | 
					                    "Cancelling existing result waiter task for "
 | 
				
			||||||
        if cancel_scope:
 | 
					                    f"{subactor.uid}")
 | 
				
			||||||
            log.warning(
 | 
					                cancel_scope.cancel()
 | 
				
			||||||
                f"Cancelling existing result waiter task for {subactor.uid}")
 | 
					
 | 
				
			||||||
            cancel_scope.cancel()
 | 
					            elif reaping_cancelled:  # let the cancellation bubble up
 | 
				
			||||||
 | 
					                assert cancel_exc
 | 
				
			||||||
 | 
					                raise cancel_exc
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue