forked from goodboy/tractor
1
0
Fork 0

Merge pull request #267 from goodboy/acked_remote_cancels

Acked remote cancels
agpl
goodboy 2021-12-03 09:51:41 -05:00 committed by GitHub
commit ae6d751d71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 352 additions and 141 deletions

View File

@ -0,0 +1,16 @@
This (finally) adds fully acknowledged remote cancellation messaging
support for both explicit ``Portal.cancel_actor()`` calls as well as
when there is a "runtime-wide" cancellations (eg. during KBI or general
actor nursery exception handling which causes a full actor
"crash"/termination).
You can think of this as the most ideal case in 2-generals where the
actor requesting the cancel of its child is able to always receive back
the ACK to that request. This leads to a more deterministic shutdown of
the child where the parent is able to wait for the child to fully
respond to the request. On a localhost setup, where the parent can
monitor the state of the child through process or other OS APIs instead
of solely through IPC messaging, the parent can know whether or not the
child decided to cancel with more certainty. In the case of separate
hosts, we still rely on a simple timeout approach until such a time
where we prefer to get "fancier".

View File

@ -128,7 +128,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
if len(exceptions) == 2: if len(exceptions) == 2:
# sometimes oddly now there's an embedded BrokenResourceError ? # sometimes oddly now there's an embedded BrokenResourceError ?
exceptions = exceptions[1].exceptions for exc in exceptions:
excs = getattr(exc, 'exceptions', None)
if excs:
exceptions = excs
break
assert len(exceptions) == num_subactors assert len(exceptions) == num_subactors

View File

@ -2,6 +2,7 @@
Actor primitives and helpers Actor primitives and helpers
""" """
from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from itertools import chain from itertools import chain
@ -10,7 +11,7 @@ import importlib.util
import inspect import inspect
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import List, Tuple, Any, Optional, Union
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
@ -48,7 +49,7 @@ async def _invoke(
cid: str, cid: str,
chan: Channel, chan: Channel,
func: typing.Callable, func: typing.Callable,
kwargs: Dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
task_status: TaskStatus[ task_status: TaskStatus[
Union[trio.CancelScope, BaseException] Union[trio.CancelScope, BaseException]
@ -57,6 +58,8 @@ async def _invoke(
''' '''
Invoke local func and deliver result(s) over provided channel. Invoke local func and deliver result(s) over provided channel.
This is the core "RPC task" starting machinery.
''' '''
__tracebackhide__ = True __tracebackhide__ = True
treat_as_gen = False treat_as_gen = False
@ -263,14 +266,51 @@ def _get_mod_abspath(module):
_lifetime_stack: ExitStack = ExitStack() _lifetime_stack: ExitStack = ExitStack()
class Actor: async def try_ship_error_to_parent(
"""The fundamental concurrency primitive. channel: Channel,
err: Union[Exception, trio.MultiError],
An *actor* is the combination of a regular Python process ) -> None:
executing a ``trio`` task tree, communicating with trio.CancelScope(shield=True):
with other actors through "portals" which provide a native async API try:
around various IPC transport "channels". # internal error so ship to parent without cid
""" await channel.send(pack_error(err))
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.error(
f"Failed to ship error to parent "
f"{channel.uid}, channel was closed"
)
class Actor:
'''
The fundamental "runtime" concurrency primitive.
An *actor* is the combination of a regular Python process executing
a ``trio`` task tree, communicating with other actors through
"memory boundary portals" - which provide a native async API around
IPC transport "channels" which themselves encapsulate various
(swappable) network protocols.
Each "actor" is ``trio.run()`` scheduled "runtime" composed of many
concurrent tasks in a single thread. The "runtime" tasks conduct
a slew of low(er) level functions to make it possible for message
passing between actors as well as the ability to create new actors
(aka new "runtimes" in new processes which are supervised via
a nursery construct). Each task which sends messages to a task in
a "peer" (not necessarily a parent-child, depth hierarchy)) is able
to do so via an "address", which maps IPC connections across memory
boundaries, and task request id which allows for per-actor
tasks to send and receive messages to specific peer-actor tasks with
which there is an ongoing RPC/IPC dialog.
'''
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False is_arbiter: bool = False
# nursery placeholders filled in by `_async_main()` after fork # nursery placeholders filled in by `_async_main()` after fork
@ -279,7 +319,7 @@ class Actor:
_server_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: Dict[str, str] _parent_main_data: dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None _parent_chan_cs: Optional[trio.CancelScope] = None
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
@ -317,7 +357,7 @@ class Actor:
mods[name] = _get_mod_abspath(mod) mods[name] = _get_mod_abspath(mod)
self.enable_modules = mods self.enable_modules = mods
self._mods: Dict[str, ModuleType] = {} self._mods: dict[str, ModuleType] = {}
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
@ -340,12 +380,12 @@ class Actor:
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set() self._ongoing_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: Dict[ self._rpc_tasks: dict[
Tuple[Channel, str], Tuple[Channel, str],
Tuple[trio.CancelScope, typing.Callable, trio.Event] Tuple[trio.CancelScope, typing.Callable, trio.Event]
] = {} ] = {}
# map {uids -> {callids -> waiter queues}} # map {uids -> {callids -> waiter queues}}
self._cids2qs: Dict[ self._cids2qs: dict[
Tuple[Tuple[str, str], str], Tuple[Tuple[str, str], str],
Tuple[ Tuple[
trio.abc.SendChannel[Any], trio.abc.SendChannel[Any],
@ -356,7 +396,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[ self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
async def wait_for_peer( async def wait_for_peer(
self, uid: Tuple[str, str] self, uid: Tuple[str, str]
@ -441,8 +481,8 @@ class Actor:
# we need this for ``msgspec`` for some reason? # we need this for ``msgspec`` for some reason?
# for now, it's been put in the stream backend. # for now, it's been put in the stream backend.
# trio.BrokenResourceError, # trio.BrokenResourceError,
# trio.ClosedResourceError, # trio.ClosedResourceError,
TransportClosed, TransportClosed,
): ):
# XXX: This may propagate up from ``Channel._aiter_recv()`` # XXX: This may propagate up from ``Channel._aiter_recv()``
@ -482,7 +522,50 @@ class Actor:
# process received reponses. # process received reponses.
try: try:
await self._process_messages(chan) await self._process_messages(chan)
except trio.Cancelled:
log.cancel(f"Msg loop was cancelled for {chan}")
raise
finally: finally:
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if (
local_nursery
):
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the remote
# peer side since we presume that any channel which
# is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries)
# message is sent to the peer likely by this actor which is
# now in a cancelled condition) when the local runtime here
# is now cancelled while (presumably) in the middle of msg
# loop processing.
with trio.move_on_after(0.1) as cs:
cs.shield = True
# Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure).
assert chan.msgstream
async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.runtime(f'drained {msg} for {chan.uid}')
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
await self._push_result(chan, cid, msg)
await local_nursery.exited.wait()
# channel cleanup sequence # channel cleanup sequence
@ -534,7 +617,7 @@ class Actor:
self, self,
chan: Channel, chan: Channel,
cid: str, cid: str,
msg: Dict[str, Any], msg: dict[str, Any],
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
""" """
@ -593,10 +676,12 @@ class Actor:
func: str, func: str,
kwargs: dict kwargs: dict
) -> Tuple[str, trio.abc.ReceiveChannel]: ) -> Tuple[str, trio.abc.ReceiveChannel]:
"""Send a ``'cmd'`` message to a remote actor and return a '''
Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop. responses delivered by the local message processing loop.
"""
'''
cid = str(uuid.uuid4()) cid = str(uuid.uuid4())
assert chan.uid assert chan.uid
send_chan, recv_chan = self.get_memchans(chan.uid, cid) send_chan, recv_chan = self.get_memchans(chan.uid, cid)
@ -609,11 +694,14 @@ class Actor:
chan: Channel, chan: Channel,
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Process messages for the channel async-RPC style. '''
Process messages for the channel async-RPC style.
Receive multiplexed RPC requests and deliver responses over ``chan``. Receive multiplexed RPC requests and deliver responses over ``chan``.
"""
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
msg = None msg = None
@ -692,8 +780,9 @@ class Actor:
# msg loop and break out into # msg loop and break out into
# ``_async_main()`` # ``_async_main()``
log.cancel( log.cancel(
f"Actor {self.uid} was remotely cancelled;" f"Actor {self.uid} was remotely cancelled "
" waiting on cancellation completion..") f"by {chan.uid}"
)
await _invoke( await _invoke(
self, cid, chan, func, kwargs, is_rpc=False self, cid, chan, func, kwargs, is_rpc=False
) )
@ -789,17 +878,12 @@ class Actor:
# machinery not from an rpc task) to parent # machinery not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send(pack_error(err)) await try_ship_error_to_parent(self._parent_chan, err)
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
raise raise
except trio.Cancelled:
# debugging only
log.runtime(f"Msg loop was cancelled for {chan}")
raise
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
@ -891,6 +975,7 @@ class Actor:
# establish primary connection with immediate parent # establish primary connection with immediate parent
self._parent_chan = None self._parent_chan = None
if parent_addr is not None: if parent_addr is not None:
self._parent_chan, accept_addr_rent = await self._from_parent( self._parent_chan, accept_addr_rent = await self._from_parent(
parent_addr) parent_addr)
@ -994,14 +1079,7 @@ class Actor:
) )
if self._parent_chan: if self._parent_chan:
with trio.CancelScope(shield=True): await try_ship_error_to_parent(self._parent_chan, err)
try:
# internal error so ship to parent without cid
await self._parent_chan.send(pack_error(err))
except trio.ClosedResourceError:
log.error(
f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed")
# always! # always!
log.exception("Actor errored:") log.exception("Actor errored:")
@ -1283,7 +1361,7 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._registry: Dict[ self._registry: dict[
Tuple[str, str], Tuple[str, str],
Tuple[str, int], Tuple[str, int],
] = {} ] = {}
@ -1300,7 +1378,7 @@ class Arbiter(Actor):
async def get_registry( async def get_registry(
self self
) -> Dict[Tuple[str, str], Tuple[str, int]]: ) -> dict[Tuple[str, str], Tuple[str, int]]:
'''Return current name registry. '''Return current name registry.
This method is async to allow for cross-actor invocation. This method is async to allow for cross-actor invocation.

View File

@ -6,9 +6,10 @@ from __future__ import annotations
import platform import platform
import struct import struct
import typing import typing
from collections.abc import AsyncGenerator, AsyncIterator
from typing import ( from typing import (
Any, Tuple, Optional, Any, Tuple, Optional,
Type, Protocol, TypeVar Type, Protocol, TypeVar,
) )
from tricycle import BufferedReceiveStream from tricycle import BufferedReceiveStream
@ -46,6 +47,7 @@ MsgType = TypeVar("MsgType")
class MsgTransport(Protocol[MsgType]): class MsgTransport(Protocol[MsgType]):
stream: trio.SocketStream stream: trio.SocketStream
drained: list[MsgType]
def __init__(self, stream: trio.SocketStream) -> None: def __init__(self, stream: trio.SocketStream) -> None:
... ...
@ -63,6 +65,11 @@ class MsgTransport(Protocol[MsgType]):
def connected(self) -> bool: def connected(self) -> bool:
... ...
# defining this sync otherwise it causes a mypy error because it
# can't figure out it's a generator i guess?..?
def drain(self) -> AsyncIterator[dict]:
...
@property @property
def laddr(self) -> Tuple[str, int]: def laddr(self) -> Tuple[str, int]:
... ...
@ -93,7 +100,10 @@ class MsgpackTCPStream:
self._agen = self._iter_packets() self._agen = self._iter_packets()
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: # public i guess?
self.drained: list[dict] = []
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream. """Yield packets from the underlying stream.
""" """
unpacker = msgpack.Unpacker( unpacker = msgpack.Unpacker(
@ -132,7 +142,7 @@ class MsgpackTCPStream:
if data == b'': if data == b'':
raise TransportClosed( raise TransportClosed(
f'transport {self} was already closed prior ro read' f'transport {self} was already closed prior to read'
) )
unpacker.feed(data) unpacker.feed(data)
@ -156,6 +166,20 @@ class MsgpackTCPStream:
async def recv(self) -> Any: async def recv(self) -> Any:
return await self._agen.asend(None) return await self._agen.asend(None)
async def drain(self) -> AsyncIterator[dict]:
'''
Drain the stream's remaining messages sent from
the far end until the connection is closed by
the peer.
'''
try:
async for msg in self._iter_packets():
self.drained.append(msg)
except TransportClosed:
for msg in self.drained:
yield msg
def __aiter__(self): def __aiter__(self):
return self._agen return self._agen
@ -164,7 +188,8 @@ class MsgpackTCPStream:
class MsgspecTCPStream(MsgpackTCPStream): class MsgspecTCPStream(MsgpackTCPStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``. using ``msgspec``.
''' '''
@ -184,7 +209,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
self.encode = msgspec.Encoder().encode self.encode = msgspec.Encoder().encode
self.decode = msgspec.Decoder().decode # dict[str, Any]) self.decode = msgspec.Decoder().decode # dict[str, Any])
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
'''Yield packets from the underlying stream. '''Yield packets from the underlying stream.
''' '''
@ -259,9 +284,12 @@ def get_msg_transport(
class Channel: class Channel:
'''An inter-process channel for communication between (remote) actors. '''
An inter-process channel for communication between (remote) actors.
Currently the only supported transport is a ``trio.SocketStream``. Wraps a ``MsgStream``: transport + encoding IPC connection.
Currently we only support ``trio.SocketStream`` for transport
(aka TCP).
''' '''
def __init__( def __init__(
@ -299,10 +327,12 @@ class Channel:
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: Optional[Tuple[str, str]] = None self.uid: Optional[Tuple[str, str]] = None
# set if far end actor errors internally
self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# remote (peer) cancellation of the far end actor runtime.
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
@classmethod @classmethod
def from_stream( def from_stream(
@ -441,9 +471,11 @@ class Channel:
async def _aiter_recv( async def _aiter_recv(
self self
) -> typing.AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
"""Async iterate items from underlying stream. '''
""" Async iterate items from underlying stream.
'''
assert self.msgstream assert self.msgstream
while True: while True:
try: try:
@ -473,9 +505,11 @@ class Channel:
async def _connect_chan( async def _connect_chan(
host: str, port: int host: str, port: int
) -> typing.AsyncGenerator[Channel, None]: ) -> typing.AsyncGenerator[Channel, None]:
"""Create and connect a channel with disconnect on context manager '''
Create and connect a channel with disconnect on context manager
teardown. teardown.
"""
'''
chan = Channel((host, port)) chan = Channel((host, port))
await chan.connect() await chan.connect()
yield chan yield chan

View File

@ -1,5 +1,6 @@
""" """
Portal api Memory boundary "Portals": an API for structured
concurrency linked tasks running in disparate memory domains.
""" """
import importlib import importlib
@ -21,7 +22,6 @@ from .log import get_logger
from ._exceptions import ( from ._exceptions import (
unpack_error, unpack_error,
NoResult, NoResult,
# RemoteActorError,
ContextCancelled, ContextCancelled,
) )
from ._streaming import Context, ReceiveMsgStream from ._streaming import Context, ReceiveMsgStream
@ -35,10 +35,12 @@ async def maybe_open_nursery(
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
shield: bool = False, shield: bool = False,
) -> AsyncGenerator[trio.Nursery, Any]: ) -> AsyncGenerator[trio.Nursery, Any]:
"""Create a new nursery if None provided. '''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided. Blocks on exit as expected if no input nursery is provided.
"""
'''
if nursery is not None: if nursery is not None:
yield nursery yield nursery
else: else:
@ -87,14 +89,18 @@ class Portal:
like having a "portal" between the seperate actor memory spaces. like having a "portal" between the seperate actor memory spaces.
''' '''
# the timeout for a remote cancel request sent to
# a(n) (peer) actor.
cancel_timeout = 0.5
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.channel = channel self.channel = channel
# when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result_msg: Optional[dict] = None self._result_msg: Optional[dict] = None
# set when _submit_for_result is called # When this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some
# point. Set when _submit_for_result is called
self._expect_result: Optional[ self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
] = None ] = None
@ -199,36 +205,46 @@ class Portal:
# we'll need to .aclose all those channels here # we'll need to .aclose all those channels here
await self._cancel_streams() await self._cancel_streams()
async def cancel_actor(self): async def cancel_actor(
"""Cancel the actor on the other end of this portal. self,
""" timeout: float = None,
if not self.channel.connected():
log.cancel("This portal is already closed can't cancel")
return False
await self._cancel_streams() ) -> bool:
'''
Cancel the actor on the other end of this portal.
'''
if not self.channel.connected():
log.cancel("This channel is already closed can't cancel")
return False
log.cancel( log.cancel(
f"Sending actor cancel request to {self.channel.uid} on " f"Sending actor cancel request to {self.channel.uid} on "
f"{self.channel}") f"{self.channel}")
self.channel._cancel_called = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield # XXX: sure would be nice to make this work with a proper shield
with trio.move_on_after(0.5) as cancel_scope: with trio.move_on_after(timeout or self.cancel_timeout) as cs:
cancel_scope.shield = True cs.shield = True
await self.run_from_ns('self', 'cancel') await self.run_from_ns('self', 'cancel')
return True return True
if cancel_scope.cancelled_caught: if cs.cancelled_caught:
log.cancel(f"May have failed to cancel {self.channel.uid}") log.cancel(f"May have failed to cancel {self.channel.uid}")
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
except trio.ClosedResourceError: except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.cancel( log.cancel(
f"{self.channel} for {self.channel.uid} was already closed?") f"{self.channel} for {self.channel.uid} was already closed or broken?")
return False return False
async def run_from_ns( async def run_from_ns(
@ -237,7 +253,9 @@ class Portal:
function_name: str, function_name: str,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Run a function from a (remote) namespace in a new task on the far-end actor. '''
Run a function from a (remote) namespace in a new task on the
far-end actor.
This is a more explitcit way to run tasks in a remote-process This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how actor using explicit object-path syntax. Hint: this is how
@ -246,9 +264,11 @@ class Portal:
Note:: Note::
A special namespace `self` can be used to invoke `Actor` A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this should only instance methods in the remote runtime. Currently this
be used for `tractor` internals. should only be used solely for ``tractor`` runtime
""" internals.
'''
msg = await self._return_once( msg = await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs)) *(await self._submit(namespace_path, function_name, kwargs))
) )
@ -447,7 +467,8 @@ class Portal:
except ( except (
BaseException, BaseException,
# more specifically, we need to handle: # more specifically, we need to handle these but not
# sure it's worth being pedantic:
# Exception, # Exception,
# trio.Cancelled, # trio.Cancelled,
# trio.MultiError, # trio.MultiError,
@ -495,19 +516,22 @@ class Portal:
@dataclass @dataclass
class LocalPortal: class LocalPortal:
"""A 'portal' to a local ``Actor``. '''
A 'portal' to a local ``Actor``.
A compatibility shim for normal portals but for invoking functions A compatibility shim for normal portals but for invoking functions
using an in process actor instance. using an in process actor instance.
"""
'''
actor: 'Actor' # type: ignore # noqa actor: 'Actor' # type: ignore # noqa
channel: Channel channel: Channel
async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
"""Run a requested local function from a namespace path and '''
Run a requested local function from a namespace path and
return it's result. return it's result.
""" '''
obj = self.actor if ns == 'self' else importlib.import_module(ns) obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func_name) func = getattr(obj, func_name)
return await func(**kwargs) return await func(**kwargs)
@ -522,10 +546,13 @@ async def open_portal(
shield: bool = False, shield: bool = False,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. '''
Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing. Spawns a background task to handle message processing (normally
""" done by the actor-runtime implicitly).
'''
actor = current_actor() actor = current_actor()
assert actor assert actor
was_connected = False was_connected = False
@ -553,7 +580,6 @@ async def open_portal(
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
finally: finally:
await portal.aclose() await portal.aclose()

View File

@ -5,7 +5,11 @@ Machinery for actor process spawning using multiple backends.
import sys import sys
import multiprocessing as mp import multiprocessing as mp
import platform import platform
from typing import Any, Dict, Optional from typing import (
Any, Dict, Optional, Union, Callable,
TypeVar,
)
from collections.abc import Awaitable, Coroutine
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -41,6 +45,7 @@ from ._exceptions import ActorFailure
log = get_logger('tractor') log = get_logger('tractor')
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None _ctx: Optional[mp.context.BaseContext] = None
@ -97,14 +102,17 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
async def exhaust_portal( async def exhaust_portal(
portal: Portal, portal: Portal,
actor: Actor actor: Actor
) -> Any: ) -> Any:
"""Pull final result from portal (assuming it has one). '''
Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume If the main task is an async generator do our best to consume
what's left of it. what's left of it.
""" '''
try: try:
log.debug(f"Waiting on final result from {actor.uid}") log.debug(f"Waiting on final result from {actor.uid}")
@ -126,18 +134,19 @@ async def exhaust_portal(
async def cancel_on_completion( async def cancel_on_completion(
portal: Portal, portal: Portal,
actor: Actor, actor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
) -> None: ) -> None:
""" '''
Cancel actor gracefully once it's "main" portal's Cancel actor gracefully once it's "main" portal's
result arrives. result arrives.
Should only be called for actors spawned with `run_in_actor()`. Should only be called for actors spawned with `run_in_actor()`.
""" '''
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request # a MultiError and we still send out a cancel request
@ -175,10 +184,37 @@ async def do_hard_kill(
# XXX: should pretty much never get here unless we have # XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and # to move the bits from ``proc.__aexit__()`` out and
# into here. # into here.
log.critical(f"HARD KILLING {proc}") log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
proc.kill() proc.kill()
async def soft_wait(
proc: ProcessType,
wait_func: Callable[
[ProcessType],
Awaitable,
],
portal: Portal,
) -> None:
# Wait for proc termination but **dont' yet** call
# ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace).
# This is a "soft" (cancellable) join/reap.
try:
await wait_func(proc)
except trio.Cancelled:
# if cancelled during a soft wait, cancel the child
# actor before entering the hard reap sequence
# below. This means we try to do a graceful teardown
# via sending a cancel message before getting out
# zombie killing tools.
with trio.CancelScope(shield=True):
await portal.cancel_actor()
raise
async def new_proc( async def new_proc(
name: str, name: str,
@ -195,11 +231,14 @@ async def new_proc(
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
""" '''
Create a new ``multiprocessing.Process`` using the Create a new ``Process`` using a "spawn method" as (configured using
spawn method as configured using ``try_set_start_method()``. ``try_set_start_method()``).
""" This routine should be started in a actor runtime task and the logic
here is to be considered the core supervision strategy.
'''
# mark the new actor with the global spawn method # mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method subactor._spawn_method = _spawn_method
uid = subactor.uid uid = subactor.uid
@ -230,17 +269,19 @@ async def new_proc(
] ]
cancelled_during_spawn: bool = False cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None
try: try:
proc = await trio.open_process(spawn_cmd)
log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
try: try:
proc = await trio.open_process(spawn_cmd)
log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid) subactor.uid)
except trio.Cancelled: except trio.Cancelled:
cancelled_during_spawn = True cancelled_during_spawn = True
# we may cancel before the child connects back in which # we may cancel before the child connects back in which
@ -250,7 +291,8 @@ async def new_proc(
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
else:
elif proc is not None:
async with acquire_debug_lock(uid): async with acquire_debug_lock(uid):
# soft wait on the proc to terminate # soft wait on the proc to terminate
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
@ -291,21 +333,14 @@ async def new_proc(
errors errors
) )
# Wait for proc termination but **dont' yet** call # This is a "soft" (cancellable) join/reap which
# ``trio.Process.__aexit__()`` (it tears down stdio # will remote cancel the actor on a ``trio.Cancelled``
# which will kill any waiting remote pdb trace). # condition.
# This is a "soft" (cancellable) join/reap. await soft_wait(
try: proc,
await proc.wait() trio.Process.wait,
except trio.Cancelled: portal
# if cancelled during a soft wait, cancel the child )
# actor before entering the hard reap sequence
# below. This means we try to do a graceful teardown
# via sending a cancel message before getting out
# zombie killing tools.
with trio.CancelScope(shield=True):
await portal.cancel_actor()
raise
# cancel result waiter that may have been spawned in # cancel result waiter that may have been spawned in
# tandem if not done already # tandem if not done already
@ -320,23 +355,26 @@ async def new_proc(
# killing the process too early. # killing the process too early.
log.cancel(f'Hard reap sequence starting for {uid}') log.cancel(f'Hard reap sequence starting for {uid}')
with trio.CancelScope(shield=True): if proc:
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if cancelled_during_spawn:
# Try again to avoid TTY clobbering. # Try again to avoid TTY clobbering.
async with acquire_debug_lock(uid): async with acquire_debug_lock(uid):
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
if proc.poll() is None: if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}") log.cancel(f"Attempting to hard kill {proc}")
await do_hard_kill(proc) await do_hard_kill(proc)
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
else:
log.warning('Nursery cancelled before sub-proc started')
if not cancelled_during_spawn: if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this # pop child entry to indicate we no longer managing this
@ -351,6 +389,7 @@ async def new_proc(
actor_nursery=actor_nursery, actor_nursery=actor_nursery,
subactor=subactor, subactor=subactor,
errors=errors, errors=errors,
# passed through to actor main # passed through to actor main
bind_addr=bind_addr, bind_addr=bind_addr,
parent_addr=parent_addr, parent_addr=parent_addr,
@ -469,7 +508,14 @@ async def mp_new_proc(
errors errors
) )
await proc_waiter(proc) # This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
proc_waiter,
portal
)
# cancel result waiter that may have been spawned in # cancel result waiter that may have been spawned in
# tandem if not done already # tandem if not done already

View File

@ -52,6 +52,7 @@ class ActorNursery:
self.cancelled: bool = False self.cancelled: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self.errors = errors self.errors = errors
self.exited = trio.Event()
async def start_actor( async def start_actor(
self, self,
@ -207,7 +208,8 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor # spawn cancel tasks for each sub-actor
assert portal assert portal
nursery.start_soon(portal.cancel_actor) if portal.channel.connected():
nursery.start_soon(portal.cancel_actor)
# if we cancelled the cancel (we hung cancelling remote actors) # if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes # then hard kill all sub-processes
@ -401,18 +403,23 @@ async def open_nursery(
async with open_root_actor(**kwargs) as actor: async with open_root_actor(**kwargs) as actor:
assert actor is current_actor() assert actor is current_actor()
# try: try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally:
anursery.exited.set()
else: # sub-nursery case
try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
finally:
else: # sub-nursery case anursery.exited.set()
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
finally: finally:
log.debug("Nursery teardown complete") log.debug("Nursery teardown complete")