diff --git a/newsfragments/267.misc.rst b/newsfragments/267.misc.rst new file mode 100644 index 0000000..727a879 --- /dev/null +++ b/newsfragments/267.misc.rst @@ -0,0 +1,16 @@ +This (finally) adds fully acknowledged remote cancellation messaging +support for both explicit ``Portal.cancel_actor()`` calls as well as +when there is a "runtime-wide" cancellations (eg. during KBI or general +actor nursery exception handling which causes a full actor +"crash"/termination). + +You can think of this as the most ideal case in 2-generals where the +actor requesting the cancel of its child is able to always receive back +the ACK to that request. This leads to a more deterministic shutdown of +the child where the parent is able to wait for the child to fully +respond to the request. On a localhost setup, where the parent can +monitor the state of the child through process or other OS APIs instead +of solely through IPC messaging, the parent can know whether or not the +child decided to cancel with more certainty. In the case of separate +hosts, we still rely on a simple timeout approach until such a time +where we prefer to get "fancier". diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 9f8ae0d..c346806 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -128,7 +128,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): if len(exceptions) == 2: # sometimes oddly now there's an embedded BrokenResourceError ? - exceptions = exceptions[1].exceptions + for exc in exceptions: + excs = getattr(exc, 'exceptions', None) + if excs: + exceptions = excs + break assert len(exceptions) == num_subactors diff --git a/tractor/_actor.py b/tractor/_actor.py index a9ce438..de86df1 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -2,6 +2,7 @@ Actor primitives and helpers """ +from __future__ import annotations from collections import defaultdict from functools import partial from itertools import chain @@ -10,7 +11,7 @@ import importlib.util import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional, Union +from typing import List, Tuple, Any, Optional, Union from types import ModuleType import sys import os @@ -48,7 +49,7 @@ async def _invoke( cid: str, chan: Channel, func: typing.Callable, - kwargs: Dict[str, Any], + kwargs: dict[str, Any], is_rpc: bool = True, task_status: TaskStatus[ Union[trio.CancelScope, BaseException] @@ -57,6 +58,8 @@ async def _invoke( ''' Invoke local func and deliver result(s) over provided channel. + This is the core "RPC task" starting machinery. + ''' __tracebackhide__ = True treat_as_gen = False @@ -263,14 +266,51 @@ def _get_mod_abspath(module): _lifetime_stack: ExitStack = ExitStack() -class Actor: - """The fundamental concurrency primitive. +async def try_ship_error_to_parent( + channel: Channel, + err: Union[Exception, trio.MultiError], - An *actor* is the combination of a regular Python process - executing a ``trio`` task tree, communicating - with other actors through "portals" which provide a native async API - around various IPC transport "channels". - """ +) -> None: + with trio.CancelScope(shield=True): + try: + # internal error so ship to parent without cid + await channel.send(pack_error(err)) + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + log.error( + f"Failed to ship error to parent " + f"{channel.uid}, channel was closed" + ) + + +class Actor: + ''' + The fundamental "runtime" concurrency primitive. + + An *actor* is the combination of a regular Python process executing + a ``trio`` task tree, communicating with other actors through + "memory boundary portals" - which provide a native async API around + IPC transport "channels" which themselves encapsulate various + (swappable) network protocols. + + + Each "actor" is ``trio.run()`` scheduled "runtime" composed of many + concurrent tasks in a single thread. The "runtime" tasks conduct + a slew of low(er) level functions to make it possible for message + passing between actors as well as the ability to create new actors + (aka new "runtimes" in new processes which are supervised via + a nursery construct). Each task which sends messages to a task in + a "peer" (not necessarily a parent-child, depth hierarchy)) is able + to do so via an "address", which maps IPC connections across memory + boundaries, and task request id which allows for per-actor + tasks to send and receive messages to specific peer-actor tasks with + which there is an ongoing RPC/IPC dialog. + + ''' + # ugh, we need to get rid of this and replace with a "registry" sys + # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False # nursery placeholders filled in by `_async_main()` after fork @@ -279,7 +319,7 @@ class Actor: _server_n: Optional[trio.Nursery] = None # Information about `__main__` from parent - _parent_main_data: Dict[str, str] + _parent_main_data: dict[str, str] _parent_chan_cs: Optional[trio.CancelScope] = None # syncs for setup/teardown sequences @@ -317,7 +357,7 @@ class Actor: mods[name] = _get_mod_abspath(mod) self.enable_modules = mods - self._mods: Dict[str, ModuleType] = {} + self._mods: dict[str, ModuleType] = {} # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 @@ -340,12 +380,12 @@ class Actor: self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set() # (chan, cid) -> (cancel_scope, func) - self._rpc_tasks: Dict[ + self._rpc_tasks: dict[ Tuple[Channel, str], Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} - self._cids2qs: Dict[ + self._cids2qs: dict[ Tuple[Tuple[str, str], str], Tuple[ trio.abc.SendChannel[Any], @@ -356,7 +396,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa + self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa async def wait_for_peer( self, uid: Tuple[str, str] @@ -441,8 +481,8 @@ class Actor: # we need this for ``msgspec`` for some reason? # for now, it's been put in the stream backend. # trio.BrokenResourceError, - # trio.ClosedResourceError, + TransportClosed, ): # XXX: This may propagate up from ``Channel._aiter_recv()`` @@ -482,7 +522,50 @@ class Actor: # process received reponses. try: await self._process_messages(chan) + + except trio.Cancelled: + log.cancel(f"Msg loop was cancelled for {chan}") + raise + finally: + # This is set in ``Portal.cancel_actor()``. So if + # the peer was cancelled we try to wait for them + # to tear down their side of the connection before + # moving on with closing our own side. + local_nursery = self._actoruid2nursery.get(chan.uid) + if ( + local_nursery + ): + log.cancel(f"Waiting on cancel request to peer {chan.uid}") + # XXX: this is a soft wait on the channel (and its + # underlying transport protocol) to close from the remote + # peer side since we presume that any channel which + # is mapped to a sub-actor (i.e. it's managed by + # one of our local nurseries) + # message is sent to the peer likely by this actor which is + # now in a cancelled condition) when the local runtime here + # is now cancelled while (presumably) in the middle of msg + # loop processing. + with trio.move_on_after(0.1) as cs: + cs.shield = True + # Attempt to wait for the far end to close the channel + # and bail after timeout (2-generals on closure). + assert chan.msgstream + async for msg in chan.msgstream.drain(): + # try to deliver any lingering msgs + # before we destroy the channel. + # This accomplishes deterministic + # ``Portal.cancel_actor()`` cancellation by + # making sure any RPC response to that call is + # delivered the local calling task. + # TODO: factor this into a helper? + log.runtime(f'drained {msg} for {chan.uid}') + cid = msg.get('cid') + if cid: + # deliver response to local caller/waiter + await self._push_result(chan, cid, msg) + + await local_nursery.exited.wait() # channel cleanup sequence @@ -534,7 +617,7 @@ class Actor: self, chan: Channel, cid: str, - msg: Dict[str, Any], + msg: dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ @@ -593,10 +676,12 @@ class Actor: func: str, kwargs: dict ) -> Tuple[str, trio.abc.ReceiveChannel]: - """Send a ``'cmd'`` message to a remote actor and return a + ''' + Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. - """ + + ''' cid = str(uuid.uuid4()) assert chan.uid send_chan, recv_chan = self.get_memchans(chan.uid, cid) @@ -609,11 +694,14 @@ class Actor: chan: Channel, shield: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Process messages for the channel async-RPC style. + ''' + Process messages for the channel async-RPC style. Receive multiplexed RPC requests and deliver responses over ``chan``. - """ + + ''' # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! msg = None @@ -692,8 +780,9 @@ class Actor: # msg loop and break out into # ``_async_main()`` log.cancel( - f"Actor {self.uid} was remotely cancelled;" - " waiting on cancellation completion..") + f"Actor {self.uid} was remotely cancelled " + f"by {chan.uid}" + ) await _invoke( self, cid, chan, func, kwargs, is_rpc=False ) @@ -789,17 +878,12 @@ class Actor: # machinery not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await self._parent_chan.send(pack_error(err)) + await try_ship_error_to_parent(self._parent_chan, err) # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" raise - except trio.Cancelled: - # debugging only - log.runtime(f"Msg loop was cancelled for {chan}") - raise - finally: # msg debugging for when he machinery is brokey log.runtime( @@ -891,6 +975,7 @@ class Actor: # establish primary connection with immediate parent self._parent_chan = None if parent_addr is not None: + self._parent_chan, accept_addr_rent = await self._from_parent( parent_addr) @@ -994,14 +1079,7 @@ class Actor: ) if self._parent_chan: - with trio.CancelScope(shield=True): - try: - # internal error so ship to parent without cid - await self._parent_chan.send(pack_error(err)) - except trio.ClosedResourceError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") + await try_ship_error_to_parent(self._parent_chan, err) # always! log.exception("Actor errored:") @@ -1283,7 +1361,7 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): - self._registry: Dict[ + self._registry: dict[ Tuple[str, str], Tuple[str, int], ] = {} @@ -1300,7 +1378,7 @@ class Arbiter(Actor): async def get_registry( self - ) -> Dict[Tuple[str, str], Tuple[str, int]]: + ) -> dict[Tuple[str, str], Tuple[str, int]]: '''Return current name registry. This method is async to allow for cross-actor invocation. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 28bef97..47f8cd9 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -6,9 +6,10 @@ from __future__ import annotations import platform import struct import typing +from collections.abc import AsyncGenerator, AsyncIterator from typing import ( Any, Tuple, Optional, - Type, Protocol, TypeVar + Type, Protocol, TypeVar, ) from tricycle import BufferedReceiveStream @@ -46,6 +47,7 @@ MsgType = TypeVar("MsgType") class MsgTransport(Protocol[MsgType]): stream: trio.SocketStream + drained: list[MsgType] def __init__(self, stream: trio.SocketStream) -> None: ... @@ -63,6 +65,11 @@ class MsgTransport(Protocol[MsgType]): def connected(self) -> bool: ... + # defining this sync otherwise it causes a mypy error because it + # can't figure out it's a generator i guess?..? + def drain(self) -> AsyncIterator[dict]: + ... + @property def laddr(self) -> Tuple[str, int]: ... @@ -93,7 +100,10 @@ class MsgpackTCPStream: self._agen = self._iter_packets() self._send_lock = trio.StrictFIFOLock() - async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: + # public i guess? + self.drained: list[dict] = [] + + async def _iter_packets(self) -> AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ unpacker = msgpack.Unpacker( @@ -132,7 +142,7 @@ class MsgpackTCPStream: if data == b'': raise TransportClosed( - f'transport {self} was already closed prior ro read' + f'transport {self} was already closed prior to read' ) unpacker.feed(data) @@ -156,6 +166,20 @@ class MsgpackTCPStream: async def recv(self) -> Any: return await self._agen.asend(None) + async def drain(self) -> AsyncIterator[dict]: + ''' + Drain the stream's remaining messages sent from + the far end until the connection is closed by + the peer. + + ''' + try: + async for msg in self._iter_packets(): + self.drained.append(msg) + except TransportClosed: + for msg in self.drained: + yield msg + def __aiter__(self): return self._agen @@ -164,7 +188,8 @@ class MsgpackTCPStream: class MsgspecTCPStream(MsgpackTCPStream): - '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + ''' + A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. ''' @@ -184,7 +209,7 @@ class MsgspecTCPStream(MsgpackTCPStream): self.encode = msgspec.Encoder().encode self.decode = msgspec.Decoder().decode # dict[str, Any]) - async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: + async def _iter_packets(self) -> AsyncGenerator[dict, None]: '''Yield packets from the underlying stream. ''' @@ -259,9 +284,12 @@ def get_msg_transport( class Channel: - '''An inter-process channel for communication between (remote) actors. + ''' + An inter-process channel for communication between (remote) actors. - Currently the only supported transport is a ``trio.SocketStream``. + Wraps a ``MsgStream``: transport + encoding IPC connection. + Currently we only support ``trio.SocketStream`` for transport + (aka TCP). ''' def __init__( @@ -299,10 +327,12 @@ class Channel: # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None - # set if far end actor errors internally - self._exc: Optional[Exception] = None self._agen = self._aiter_recv() + self._exc: Optional[Exception] = None # set if far end actor errors self._closed: bool = False + # flag set on ``Portal.cancel_actor()`` indicating + # remote (peer) cancellation of the far end actor runtime. + self._cancel_called: bool = False # set on ``Portal.cancel_actor()`` @classmethod def from_stream( @@ -441,9 +471,11 @@ class Channel: async def _aiter_recv( self - ) -> typing.AsyncGenerator[Any, None]: - """Async iterate items from underlying stream. - """ + ) -> AsyncGenerator[Any, None]: + ''' + Async iterate items from underlying stream. + + ''' assert self.msgstream while True: try: @@ -473,9 +505,11 @@ class Channel: async def _connect_chan( host: str, port: int ) -> typing.AsyncGenerator[Channel, None]: - """Create and connect a channel with disconnect on context manager + ''' + Create and connect a channel with disconnect on context manager teardown. - """ + + ''' chan = Channel((host, port)) await chan.connect() yield chan diff --git a/tractor/_portal.py b/tractor/_portal.py index 80fc902..70339fa 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -1,5 +1,6 @@ """ -Portal api +Memory boundary "Portals": an API for structured +concurrency linked tasks running in disparate memory domains. """ import importlib @@ -21,7 +22,6 @@ from .log import get_logger from ._exceptions import ( unpack_error, NoResult, - # RemoteActorError, ContextCancelled, ) from ._streaming import Context, ReceiveMsgStream @@ -35,10 +35,12 @@ async def maybe_open_nursery( nursery: trio.Nursery = None, shield: bool = False, ) -> AsyncGenerator[trio.Nursery, Any]: - """Create a new nursery if None provided. + ''' + Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. - """ + + ''' if nursery is not None: yield nursery else: @@ -87,14 +89,18 @@ class Portal: like having a "portal" between the seperate actor memory spaces. ''' + # the timeout for a remote cancel request sent to + # a(n) (peer) actor. + cancel_timeout = 0.5 + def __init__(self, channel: Channel) -> None: self.channel = channel - # when this is set to a tuple returned from ``_submit()`` then - # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result_msg: Optional[dict] = None - # set when _submit_for_result is called + # When this is set to a tuple returned from ``_submit()`` then + # it is expected that ``result()`` will be awaited at some + # point. Set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None @@ -199,36 +205,46 @@ class Portal: # we'll need to .aclose all those channels here await self._cancel_streams() - async def cancel_actor(self): - """Cancel the actor on the other end of this portal. - """ - if not self.channel.connected(): - log.cancel("This portal is already closed can't cancel") - return False + async def cancel_actor( + self, + timeout: float = None, - await self._cancel_streams() + ) -> bool: + ''' + Cancel the actor on the other end of this portal. + + ''' + if not self.channel.connected(): + log.cancel("This channel is already closed can't cancel") + return False log.cancel( f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") + + self.channel._cancel_called = True + try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with a proper shield - with trio.move_on_after(0.5) as cancel_scope: - cancel_scope.shield = True + with trio.move_on_after(timeout or self.cancel_timeout) as cs: + cs.shield = True await self.run_from_ns('self', 'cancel') return True - if cancel_scope.cancelled_caught: + if cs.cancelled_caught: log.cancel(f"May have failed to cancel {self.channel.uid}") # if we get here some weird cancellation case happened return False - except trio.ClosedResourceError: + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): log.cancel( - f"{self.channel} for {self.channel.uid} was already closed?") + f"{self.channel} for {self.channel.uid} was already closed or broken?") return False async def run_from_ns( @@ -237,7 +253,9 @@ class Portal: function_name: str, **kwargs, ) -> Any: - """Run a function from a (remote) namespace in a new task on the far-end actor. + ''' + Run a function from a (remote) namespace in a new task on the + far-end actor. This is a more explitcit way to run tasks in a remote-process actor using explicit object-path syntax. Hint: this is how @@ -246,9 +264,11 @@ class Portal: Note:: A special namespace `self` can be used to invoke `Actor` - instance methods in the remote runtime. Currently this should only - be used for `tractor` internals. - """ + instance methods in the remote runtime. Currently this + should only be used solely for ``tractor`` runtime + internals. + + ''' msg = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) @@ -447,7 +467,8 @@ class Portal: except ( BaseException, - # more specifically, we need to handle: + # more specifically, we need to handle these but not + # sure it's worth being pedantic: # Exception, # trio.Cancelled, # trio.MultiError, @@ -495,19 +516,22 @@ class Portal: @dataclass class LocalPortal: - """A 'portal' to a local ``Actor``. + ''' + A 'portal' to a local ``Actor``. A compatibility shim for normal portals but for invoking functions using an in process actor instance. - """ + + ''' actor: 'Actor' # type: ignore # noqa channel: Channel async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: - """Run a requested local function from a namespace path and + ''' + Run a requested local function from a namespace path and return it's result. - """ + ''' obj = self.actor if ns == 'self' else importlib.import_module(ns) func = getattr(obj, func_name) return await func(**kwargs) @@ -522,10 +546,13 @@ async def open_portal( shield: bool = False, ) -> AsyncGenerator[Portal, None]: - """Open a ``Portal`` through the provided ``channel``. + ''' + Open a ``Portal`` through the provided ``channel``. - Spawns a background task to handle message processing. - """ + Spawns a background task to handle message processing (normally + done by the actor-runtime implicitly). + + ''' actor = current_actor() assert actor was_connected = False @@ -553,7 +580,6 @@ async def open_portal( portal = Portal(channel) try: yield portal - finally: await portal.aclose() diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 6ede566..c7eb2d2 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,7 +5,11 @@ Machinery for actor process spawning using multiple backends. import sys import multiprocessing as mp import platform -from typing import Any, Dict, Optional +from typing import ( + Any, Dict, Optional, Union, Callable, + TypeVar, +) +from collections.abc import Awaitable, Coroutine import trio from trio_typing import TaskStatus @@ -41,6 +45,7 @@ from ._exceptions import ActorFailure log = get_logger('tractor') +ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) # placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None @@ -97,14 +102,17 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: async def exhaust_portal( + portal: Portal, actor: Actor + ) -> Any: - """Pull final result from portal (assuming it has one). + ''' + Pull final result from portal (assuming it has one). If the main task is an async generator do our best to consume what's left of it. - """ + ''' try: log.debug(f"Waiting on final result from {actor.uid}") @@ -126,18 +134,19 @@ async def exhaust_portal( async def cancel_on_completion( + portal: Portal, actor: Actor, errors: Dict[Tuple[str, str], Exception], ) -> None: - """ + ''' Cancel actor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. - """ + ''' # if this call errors we store the exception for later # in ``errors`` which will be reraised inside # a MultiError and we still send out a cancel request @@ -175,10 +184,37 @@ async def do_hard_kill( # XXX: should pretty much never get here unless we have # to move the bits from ``proc.__aexit__()`` out and # into here. - log.critical(f"HARD KILLING {proc}") + log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}") proc.kill() +async def soft_wait( + + proc: ProcessType, + wait_func: Callable[ + [ProcessType], + Awaitable, + ], + portal: Portal, + +) -> None: + # Wait for proc termination but **dont' yet** call + # ``trio.Process.__aexit__()`` (it tears down stdio + # which will kill any waiting remote pdb trace). + # This is a "soft" (cancellable) join/reap. + try: + await wait_func(proc) + except trio.Cancelled: + # if cancelled during a soft wait, cancel the child + # actor before entering the hard reap sequence + # below. This means we try to do a graceful teardown + # via sending a cancel message before getting out + # zombie killing tools. + with trio.CancelScope(shield=True): + await portal.cancel_actor() + raise + + async def new_proc( name: str, @@ -195,11 +231,14 @@ async def new_proc( task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - """ - Create a new ``multiprocessing.Process`` using the - spawn method as configured using ``try_set_start_method()``. + ''' + Create a new ``Process`` using a "spawn method" as (configured using + ``try_set_start_method()``). - """ + This routine should be started in a actor runtime task and the logic + here is to be considered the core supervision strategy. + + ''' # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method uid = subactor.uid @@ -230,17 +269,19 @@ async def new_proc( ] cancelled_during_spawn: bool = False + proc: Optional[trio.Process] = None try: - proc = await trio.open_process(spawn_cmd) - - log.runtime(f"Started {proc}") - - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it try: + proc = await trio.open_process(spawn_cmd) + + log.runtime(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it event, chan = await actor_nursery._actor.wait_for_peer( subactor.uid) + except trio.Cancelled: cancelled_during_spawn = True # we may cancel before the child connects back in which @@ -250,7 +291,8 @@ async def new_proc( # don't clobber an ongoing pdb if is_root_process(): await maybe_wait_for_debugger() - else: + + elif proc is not None: async with acquire_debug_lock(uid): # soft wait on the proc to terminate with trio.move_on_after(0.5): @@ -291,21 +333,14 @@ async def new_proc( errors ) - # Wait for proc termination but **dont' yet** call - # ``trio.Process.__aexit__()`` (it tears down stdio - # which will kill any waiting remote pdb trace). - # This is a "soft" (cancellable) join/reap. - try: - await proc.wait() - except trio.Cancelled: - # if cancelled during a soft wait, cancel the child - # actor before entering the hard reap sequence - # below. This means we try to do a graceful teardown - # via sending a cancel message before getting out - # zombie killing tools. - with trio.CancelScope(shield=True): - await portal.cancel_actor() - raise + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + trio.Process.wait, + portal + ) # cancel result waiter that may have been spawned in # tandem if not done already @@ -320,23 +355,26 @@ async def new_proc( # killing the process too early. log.cancel(f'Hard reap sequence starting for {uid}') - with trio.CancelScope(shield=True): + if proc: + with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if cancelled_during_spawn: - # Try again to avoid TTY clobbering. - async with acquire_debug_lock(uid): - with trio.move_on_after(0.5): - await proc.wait() + # don't clobber an ongoing pdb + if cancelled_during_spawn: + # Try again to avoid TTY clobbering. + async with acquire_debug_lock(uid): + with trio.move_on_after(0.5): + await proc.wait() - if is_root_process(): - await maybe_wait_for_debugger() + if is_root_process(): + await maybe_wait_for_debugger() - if proc.poll() is None: - log.cancel(f"Attempting to hard kill {proc}") - await do_hard_kill(proc) + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") + await do_hard_kill(proc) - log.debug(f"Joined {proc}") + log.debug(f"Joined {proc}") + else: + log.warning('Nursery cancelled before sub-proc started') if not cancelled_during_spawn: # pop child entry to indicate we no longer managing this @@ -351,6 +389,7 @@ async def new_proc( actor_nursery=actor_nursery, subactor=subactor, errors=errors, + # passed through to actor main bind_addr=bind_addr, parent_addr=parent_addr, @@ -469,7 +508,14 @@ async def mp_new_proc( errors ) - await proc_waiter(proc) + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + proc_waiter, + portal + ) # cancel result waiter that may have been spawned in # tandem if not done already diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 1259be6..3f381ef 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -52,6 +52,7 @@ class ActorNursery: self.cancelled: bool = False self._join_procs = trio.Event() self.errors = errors + self.exited = trio.Event() async def start_actor( self, @@ -207,7 +208,8 @@ class ActorNursery: # spawn cancel tasks for each sub-actor assert portal - nursery.start_soon(portal.cancel_actor) + if portal.channel.connected(): + nursery.start_soon(portal.cancel_actor) # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes @@ -401,18 +403,23 @@ async def open_nursery( async with open_root_actor(**kwargs) as actor: assert actor is current_actor() - # try: + try: + async with _open_and_supervise_one_cancels_all_nursery( + actor + ) as anursery: + yield anursery + finally: + anursery.exited.set() + + else: # sub-nursery case + + try: async with _open_and_supervise_one_cancels_all_nursery( actor ) as anursery: yield anursery - - else: # sub-nursery case - - async with _open_and_supervise_one_cancels_all_nursery( - actor - ) as anursery: - yield anursery + finally: + anursery.exited.set() finally: log.debug("Nursery teardown complete")