forked from goodboy/tractor
1
0
Fork 0

WIP attempt to relay context error via raised-in-scope-nursery task

prehardkill
Tyler Goodlet 2021-06-24 19:56:05 -04:00
parent e6c9232b45
commit 0a5cf6bbd2
3 changed files with 232 additions and 105 deletions

View File

@ -12,7 +12,7 @@ import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType from types import ModuleType
import signal # import signal
import sys import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
@ -31,6 +31,7 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
TransportClosed,
) )
from . import _debug from . import _debug
from ._discovery import get_arbiter from ._discovery import get_arbiter
@ -47,6 +48,7 @@ class ActorFailure(Exception):
async def _invoke( async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
@ -59,10 +61,12 @@ async def _invoke(
"""Invoke local func and deliver result(s) over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
treat_as_gen = False treat_as_gen = False
cs = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, _cancel_scope=cancel_scope) cs: trio.CancelScope = None
context = False
ctx = Context(chan, cid)
context: bool = False
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
@ -150,14 +154,22 @@ async def _invoke(
# context func with support for bi-dir streaming # context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid}) await chan.send({'functype': 'context', 'cid': cid})
with cancel_scope as cs: async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
if cs.cancelled_caught: if cs.cancelled_caught:
if ctx._cancel_called:
msg = f'{func.__name__} cancelled itself',
else:
msg = f'{func.__name__} was remotely cancelled',
# task-contex was cancelled so relay to the cancel to caller # task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled( raise ContextCancelled(
f'{func.__name__} cancelled itself', msg,
suberror_type=trio.Cancelled, suberror_type=trio.Cancelled,
) )
@ -170,20 +182,29 @@ async def _invoke(
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# TODO: maybe we'll want differnet "levels" of debugging if not is_multi_cancelled(err):
log.exception("Actor crashed:")
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
if not isinstance(err, trio.ClosedResourceError) and (
not is_multi_cancelled(err)) and ( # if not isinstance(err, trio.ClosedResourceError) and (
not isinstance(err, ContextCancelled) # if not is_multi_cancelled(err) and (
if not isinstance(err, ContextCancelled) or (
isinstance(err, ContextCancelled) and ctx._cancel_called
): ):
# XXX: is there any case where we'll want to debug IPC # XXX: is there any case where we'll want to debug IPC
# disconnects? I can't think of a reason that inspecting # disconnects? I can't think of a reason that inspecting
# this type of failure will be useful for respawns or # this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug # recovery logic - the only case is some kind of strange bug
# in `trio` itself? # in `trio` itself?
entered = await _debug._maybe_enter_pm(err) await _debug._maybe_enter_pm(err)
if not entered:
log.exception("Actor crashed:") # entered = await _debug._maybe_enter_pm(err)
# if not entered:
# log.exception("Actor crashed:")
# always ship errors back to caller # always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err)
@ -192,8 +213,10 @@ async def _invoke(
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.warning( # if we can't propagate the error that's a big boo boo
f"Failed to ship error to caller @ {chan.uid}") log.error(
f"Failed to ship error to caller @ {chan.uid} !?"
)
if cs is None: if cs is None:
# error is from above code not from rpc invocation # error is from above code not from rpc invocation
@ -378,19 +401,34 @@ class Actor:
raise mne raise mne
async def _stream_handler( async def _stream_handler(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.info(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid = await self._do_handshake(chan) uid = await self._do_handshake(chan)
except StopAsyncIteration:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake") log.warning(f"Channel {chan} failed to handshake")
return return
@ -418,10 +456,24 @@ class Actor:
try: try:
await self._process_messages(chan) await self._process_messages(chan)
finally: finally:
# channel cleanup sequence
# for (channel, cid) in self._rpc_tasks.copy():
# if channel is chan:
# with trio.CancelScope(shield=True):
# await self._cancel_task(cid, channel)
# # close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}") log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if not chans: if not chans:
log.debug(f"No more channels for {chan.uid}") log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(chan.uid, None)
@ -434,14 +486,22 @@ class Actor:
# # XXX: is this necessary (GC should do it?) # # XXX: is this necessary (GC should do it?)
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.debug(f"Disconnecting channel {chan}") log.debug(f"Disconnecting channel {chan}")
try: try:
# send our msg loop terminate sentinel # send a msg loop terminate sentinel
await chan.send(None) await chan.send(None)
# XXX: do we want this?
# causes "[104] connection reset by peer" on other end
# await chan.aclose() # await chan.aclose()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception( log.warning(f"Channel for {chan.uid} was already closed")
f"Channel for {chan.uid} was already zonked..")
async def _push_result( async def _push_result(
self, self,
@ -451,18 +511,22 @@ class Actor:
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
""" """
actorid = chan.uid # actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}" assert chan.uid, f"`chan.uid` can't be {chan.uid}"
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
# if 'stop' in msg: if 'error' in msg:
ctx = getattr(recv_chan, '_ctx', None)
# if ctx:
# ctx._error_from_remote_msg(msg)
# log.debug(f"{send_chan} was terminated at remote end") # log.debug(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped # # indicate to consumer that far end has stopped
# return await send_chan.aclose() # return await send_chan.aclose()
try: try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
# maintain backpressure # maintain backpressure
await send_chan.send(msg) await send_chan.send(msg)
@ -481,7 +545,9 @@ class Actor:
self, self,
actorid: Tuple[str, str], actorid: Tuple[str, str],
cid: str cid: str
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
log.debug(f"Getting result queue for {actorid} cid {cid}") log.debug(f"Getting result queue for {actorid} cid {cid}")
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
@ -544,9 +610,15 @@ class Actor:
if channel is chan: if channel is chan:
await self._cancel_task(cid, channel) await self._cancel_task(cid, channel)
# close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
break break
log.trace( # type: ignore log.trace( # type: ignore
@ -641,22 +713,36 @@ class Actor:
) )
await self.cancel_rpc_tasks(chan) await self.cancel_rpc_tasks(chan)
except trio.ClosedResourceError: except (
log.error(f"{chan} form {chan.uid} broke") TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
):
# channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence
# to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly")
# raise
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery # ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent # not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send(pack_error(err)) await self._parent_chan.send(pack_error(err))
raise
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
raise
except trio.Cancelled: except trio.Cancelled:
# debugging only # debugging only
log.debug(f"Msg loop was cancelled for {chan}") log.debug(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
# msg debugging for when he machinery is brokey
log.debug( log.debug(
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
@ -1089,6 +1175,7 @@ class Actor:
parlance. parlance.
""" """
await chan.send(self.uid) await chan.send(self.uid)
# breakpoint()
uid: Tuple[str, str] = tuple(await chan.recv()) uid: Tuple[str, str] = tuple(await chan.recv())
# if not isinstance(uid, tuple): # if not isinstance(uid, tuple):

View File

@ -177,6 +177,7 @@ class Portal:
f"Cancelling all streams with {self.channel.uid}") f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
# with trio.CancelScope(shield=True):
await stream.aclose() await stream.aclose()
except trio.ClosedResourceError: except trio.ClosedResourceError:
# don't error the stream having already been closed # don't error the stream having already been closed
@ -369,7 +370,6 @@ class Portal:
recv_chan: Optional[trio.MemoryReceiveChannel] = None recv_chan: Optional[trio.MemoryReceiveChannel] = None
try:
cid, recv_chan, functype, first_msg = await self._submit( cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs) fn_mod_path, fn_name, kwargs)
@ -392,16 +392,21 @@ class Portal:
# deliver context instance and .started() msg value in open # deliver context instance and .started() msg value in open
# tuple. # tuple.
try:
async with trio.open_nursery() as scope_nursery:
ctx = Context( ctx = Context(
self.channel, self.channel,
cid, cid,
_portal=self, _portal=self,
_recv_chan=recv_chan, _recv_chan=recv_chan,
_scope_nursery=scope_nursery,
) )
recv_chan._ctx = ctx
try:
yield ctx, first yield ctx, first
log.info(f'Context for {func.__name__} completed')
if cancel_on_exit: if cancel_on_exit:
await ctx.cancel() await ctx.cancel()
@ -409,24 +414,34 @@ class Portal:
if not ctx._cancel_called: if not ctx._cancel_called:
await ctx.result() await ctx.result()
await recv_chan.aclose()
# except TypeError:
# # if fn_name == '_emsd_main':
# import tractor
# await tractor.breakpoint()
except ContextCancelled: except ContextCancelled:
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this.
if not ctx._cancel_called: if not ctx._cancel_called:
raise raise
except BaseException: # if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this and the block should exit.
else:
log.debug(f'Context {ctx} cancelled gracefully')
except trio.Cancelled:
# the context cancels itself on any deviation # the context cancels itself on any deviation
await ctx.cancel() await ctx.cancel()
raise raise
finally: # finally:
log.info(f'Context for {func.__name__} completed') # log.info(f'Context for {func.__name__} completed')
# finally:
# if recv_chan is not None:
finally:
if recv_chan is not None:
await recv_chan.aclose()
@dataclass @dataclass
class LocalPortal: class LocalPortal:
@ -464,6 +479,7 @@ async def open_portal(
was_connected = False was_connected = False
async with maybe_open_nursery(nursery, shield=shield) as nursery: async with maybe_open_nursery(nursery, shield=shield) as nursery:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()
was_connected = True was_connected = True
@ -485,11 +501,12 @@ async def open_portal(
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
finally: finally:
await portal.aclose() await portal.aclose()
if was_connected: if was_connected:
# cancel remote channel-msg loop # gracefully signal remote channel-msg loop
await channel.send(None) await channel.send(None)
# cancel background msg loop task # cancel background msg loop task

View File

@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import ( from typing import (
Any, Iterator, Optional, Callable, Any, Iterator, Optional, Callable,
AsyncGenerator, AsyncGenerator, Dict,
) )
import warnings import warnings
@ -67,6 +67,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
raise trio.EndOfChannel raise trio.EndOfChannel
try: try:
# if self._ctx.chan.uid[0] == 'brokerd.ib':
# breakpoint()
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
@ -134,6 +137,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
raise # propagate raise # propagate
# except trio.Cancelled:
# if not self._shielded:
# # if shielded we don't propagate a cancelled
# raise
# except trio.Cancelled: # except trio.Cancelled:
# # relay cancels to the remote task # # relay cancels to the remote task
# await self.aclose() # await self.aclose()
@ -165,7 +173,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
if rx_chan._closed: if rx_chan._closed: # or self._eoc:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
@ -212,7 +220,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# stop from the caller side # stop from the caller side
await self._ctx.send_stop() await self._ctx.send_stop()
except trio.BrokenResourceError: except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
# it can't traverse the transport. # it can't traverse the transport.
@ -254,18 +265,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# still need to consume msgs that are "in transit" from the far # still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``). # end (eg. for ``Context.result()``).
# TODO: but make it broadcasting to consumers
# def clone(self):
# """Clone this receive channel allowing for multi-task
# consumption from the same channel.
# """
# return ReceiveStream(
# self._cid,
# self._rx_chan.clone(),
# self._portal,
# )
class MsgStream(ReceiveMsgStream, trio.abc.Channel): class MsgStream(ReceiveMsgStream, trio.abc.Channel):
""" """
@ -282,6 +281,17 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
''' '''
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
# TODO: but make it broadcasting to consumers
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return MsgStream(
self._ctx,
self._rx_chan.clone(),
)
@dataclass @dataclass
class Context: class Context:
@ -308,7 +318,7 @@ class Context:
_cancel_called: bool = False _cancel_called: bool = False
# only set on the callee side # only set on the callee side
_cancel_scope: Optional[trio.CancelScope] = None _scope_nursery: Optional[trio.Nursery] = None
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
@ -323,6 +333,16 @@ class Context:
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) await self.chan.send({'stop': True, 'cid': self.cid})
def _error_from_remote_msg(
self,
msg: Dict[str, Any],
) -> None:
async def raiser():
raise unpack_error(msg, self.chan)
self._scope_nursery.start_soon(raiser)
async def cancel(self) -> None: async def cancel(self) -> None:
'''Cancel this inter-actor-task context. '''Cancel this inter-actor-task context.
@ -361,13 +381,16 @@ class Context:
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
else: else:
# ensure callee side # ensure callee side
assert self._cancel_scope assert self._scope_nursery
# TODO: should we have an explicit cancel message # TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an # or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough? # {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in # This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36 # https://github.com/goodboy/tractor/issues/36
self._cancel_scope.cancel() self._scope_nursery.cancel_scope.cancel()
if self._recv_chan:
await self._recv_chan.aclose()
@asynccontextmanager @asynccontextmanager
async def open_stream( async def open_stream(