Finally, deterministic remote cancellation support

On msg loop termination we now check and see if a channel is associated
with a child-actor registered in some local task's nursery. If so, we
attempt to wait on channel closure initiated from the child side (by
draining the underlying msg stream) so as to avoid closing it too early
resulting in the child not relaying its termination status response. This
means we now support the ideal case in 2-general's where we get back the
ack to the closure request instead of just ignoring it and timing out XD

The main implementation detail is that when `Portal.cancel_actor()`
remotely calls `Actor.cancel()` we actually wait for the RPC response
from that request before allowing the channel shutdown sequence to
engage. The new msg stream draining support enables this.

Also, factor child-to-parent error propagation logic into a helper func
and improve some docs (yeah yeah y'all don't like the ''', i don't
care - it makes my eyes not hurt).
acked_backup
Tyler Goodlet 2021-12-01 22:17:09 -05:00
parent d817f1a658
commit d81eb1a51e
1 changed files with 105 additions and 28 deletions

View File

@ -2,6 +2,7 @@
Actor primitives and helpers Actor primitives and helpers
""" """
from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from itertools import chain from itertools import chain
@ -57,6 +58,8 @@ async def _invoke(
''' '''
Invoke local func and deliver result(s) over provided channel. Invoke local func and deliver result(s) over provided channel.
This is the core "RPC task" starting machinery.
''' '''
__tracebackhide__ = True __tracebackhide__ = True
treat_as_gen = False treat_as_gen = False
@ -263,14 +266,51 @@ def _get_mod_abspath(module):
_lifetime_stack: ExitStack = ExitStack() _lifetime_stack: ExitStack = ExitStack()
class Actor: async def try_ship_error_to_parent(
"""The fundamental concurrency primitive. actor: Actor,
err: Exception,
An *actor* is the combination of a regular Python process ) -> None:
executing a ``trio`` task tree, communicating with trio.CancelScope(shield=True):
with other actors through "portals" which provide a native async API try:
around various IPC transport "channels". # internal error so ship to parent without cid
""" await actor._parent_chan.send(pack_error(err))
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.error(
f"Failed to ship error to parent "
f"{actor._parent_chan.uid}, channel was closed"
)
class Actor:
'''
The fundamental "runtime" concurrency primitive.
An *actor* is the combination of a regular Python process executing
a ``trio`` task tree, communicating with other actors through
"memory boundary portals" - which provide a native async API around
IPC transport "channels" which themselves encapsulate various
(swappable) network protocols.
Each "actor" is ``trio.run()`` scheduled "runtime" composed of many
concurrent tasks in a single thread. The "runtime" tasks conduct
a slew of low(er) level functions to make it possible for message
passing between actors as well as the ability to create new actors
(aka new "runtimes" in new processes which are supervised via
a nursery construct). Each task which sends messages to a task in
a "peer" (not necessarily a parent-child, depth hierarchy)) is able
to do so via an "address", which maps IPC connections across memory
boundaries, and task request id which allows for per-actor
tasks to send and receive messages to specific peer-actor tasks with
which there is an ongoing RPC/IPC dialog.
'''
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False is_arbiter: bool = False
# nursery placeholders filled in by `_async_main()` after fork # nursery placeholders filled in by `_async_main()` after fork
@ -441,8 +481,8 @@ class Actor:
# we need this for ``msgspec`` for some reason? # we need this for ``msgspec`` for some reason?
# for now, it's been put in the stream backend. # for now, it's been put in the stream backend.
# trio.BrokenResourceError, # trio.BrokenResourceError,
# trio.ClosedResourceError, # trio.ClosedResourceError,
TransportClosed, TransportClosed,
): ):
# XXX: This may propagate up from ``Channel._aiter_recv()`` # XXX: This may propagate up from ``Channel._aiter_recv()``
@ -482,7 +522,49 @@ class Actor:
# process received reponses. # process received reponses.
try: try:
await self._process_messages(chan) await self._process_messages(chan)
except trio.Cancelled:
log.cancel(f"Msg loop was cancelled for {chan}")
raise
finally: finally:
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if (
local_nursery
):
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the remote
# peer side since we presume that any channel which
# is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries)
# message is sent to the peer likely by this actor which is
# now in a cancelled condition) when the local runtime here
# is now cancelled while (presumably) in the middle of msg
# loop processing.
with trio.move_on_after(0.1) as cs:
cs.shield = True
# Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure).
async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.runtime(f'drained {msg} for {chan.uid}')
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
await self._push_result(chan, cid, msg)
await local_nursery.exited.wait()
# channel cleanup sequence # channel cleanup sequence
@ -593,10 +675,12 @@ class Actor:
func: str, func: str,
kwargs: dict kwargs: dict
) -> Tuple[str, trio.abc.ReceiveChannel]: ) -> Tuple[str, trio.abc.ReceiveChannel]:
"""Send a ``'cmd'`` message to a remote actor and return a '''
Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop. responses delivered by the local message processing loop.
"""
'''
cid = str(uuid.uuid4()) cid = str(uuid.uuid4())
assert chan.uid assert chan.uid
send_chan, recv_chan = self.get_memchans(chan.uid, cid) send_chan, recv_chan = self.get_memchans(chan.uid, cid)
@ -609,11 +693,14 @@ class Actor:
chan: Channel, chan: Channel,
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Process messages for the channel async-RPC style. '''
Process messages for the channel async-RPC style.
Receive multiplexed RPC requests and deliver responses over ``chan``. Receive multiplexed RPC requests and deliver responses over ``chan``.
"""
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
msg = None msg = None
@ -692,8 +779,9 @@ class Actor:
# msg loop and break out into # msg loop and break out into
# ``_async_main()`` # ``_async_main()``
log.cancel( log.cancel(
f"Actor {self.uid} was remotely cancelled;" f"Actor {self.uid} was remotely cancelled "
" waiting on cancellation completion..") f"by {chan.uid}"
)
await _invoke( await _invoke(
self, cid, chan, func, kwargs, is_rpc=False self, cid, chan, func, kwargs, is_rpc=False
) )
@ -789,17 +877,12 @@ class Actor:
# machinery not from an rpc task) to parent # machinery not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send(pack_error(err)) await try_ship_error_to_parent(self, err)
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
raise raise
except trio.Cancelled:
# debugging only
log.runtime(f"Msg loop was cancelled for {chan}")
raise
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
@ -891,6 +974,7 @@ class Actor:
# establish primary connection with immediate parent # establish primary connection with immediate parent
self._parent_chan = None self._parent_chan = None
if parent_addr is not None: if parent_addr is not None:
self._parent_chan, accept_addr_rent = await self._from_parent( self._parent_chan, accept_addr_rent = await self._from_parent(
parent_addr) parent_addr)
@ -994,14 +1078,7 @@ class Actor:
) )
if self._parent_chan: if self._parent_chan:
with trio.CancelScope(shield=True): await try_ship_error_to_parent(self, err)
try:
# internal error so ship to parent without cid
await self._parent_chan.send(pack_error(err))
except trio.ClosedResourceError:
log.error(
f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed")
# always! # always!
log.exception("Actor errored:") log.exception("Actor errored:")