forked from goodboy/tractor
1
0
Fork 0

Go back to only logging crashes if no pdb gets engaged

transport_hardening
Tyler Goodlet 2021-06-29 15:55:03 -04:00
parent f57cc66a7f
commit 6d72a4ef45
1 changed files with 99 additions and 47 deletions

View File

@ -12,6 +12,7 @@ import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType
import signal
import sys
import os
from contextlib import ExitStack
@ -47,6 +48,7 @@ class ActorFailure(Exception):
async def _invoke(
actor: 'Actor',
cid: str,
chan: Channel,
@ -60,13 +62,14 @@ async def _invoke(
"""
treat_as_gen = False
# possibly a traceback object
# (not sure what typing is for this..)
# possible a traceback (not sure what typing is for this..)
tb = None
cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, _cancel_scope=cancel_scope)
context = False
cs: trio.CancelScope = None
ctx = Context(chan, cid)
context: bool = False
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
@ -154,7 +157,9 @@ async def _invoke(
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
with cancel_scope as cs:
async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope
task_status.started(cs)
try:
await chan.send({'return': await coro, 'cid': cid})
@ -192,19 +197,32 @@ async def _invoke(
except (Exception, trio.MultiError) as err:
# TODO: maybe we'll want differnet "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
if not isinstance(err, trio.ClosedResourceError) and (
not is_multi_cancelled(err)) and (
not isinstance(err, ContextCancelled)
):
# XXX: is there any case where we'll want to debug IPC
# disconnects? I can't think of a reason that inspecting
# this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug
# in `trio` itself?
entered = await _debug._maybe_enter_pm(err)
if not entered:
if not is_multi_cancelled(err):
log.exception("Actor crashed:")
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
entered_debug: bool = False
if not isinstance(err, ContextCancelled) or (
isinstance(err, ContextCancelled) and ctx._cancel_called
):
# XXX: is there any case where we'll want to debug IPC
# disconnects as a default?
#
# I can't think of a reason that inspecting
# this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug
# in our transport layer itself? Going to keep this
# open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception("Actor crashed:")
# always ship errors back to caller
@ -214,8 +232,10 @@ async def _invoke(
await chan.send(err_msg)
except trio.ClosedResourceError:
log.warning(
f"Failed to ship error to caller @ {chan.uid}")
# if we can't propagate the error that's a big boo boo
log.error(
f"Failed to ship error to caller @ {chan.uid} !?"
)
if cs is None:
# error is from above code not from rpc invocation
@ -271,7 +291,7 @@ class Actor:
enable_modules: List[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
@ -301,8 +321,7 @@ class Actor:
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.loglevel = loglevel
self._arb_addr = arbiter_addr or (None, None)
self._arb_addr = tuple(arbiter_addr)
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -396,12 +415,16 @@ class Actor:
raise mne
async def _stream_handler(
self,
stream: trio.SocketStream,
) -> None:
"""Entry point for new inbound connections to the channel server.
"""
self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream)
log.runtime(f"New connection to us {chan}")
@ -435,8 +458,9 @@ class Actor:
chans = self._peers[uid]
# TODO: re-use channels for new connections instead of always
# new ones; will require changing all the discovery funcs
# TODO: re-use channels for new connections instead
# of always new ones; will require changing all the
# discovery funcs
if chans:
log.runtime(
f"already have channel(s) for {uid}:{chans}?"
@ -451,10 +475,24 @@ class Actor:
try:
await self._process_messages(chan)
finally:
# channel cleanup sequence
# for (channel, cid) in self._rpc_tasks.copy():
# if channel is chan:
# with trio.CancelScope(shield=True):
# await self._cancel_task(cid, channel)
# # close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
# Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
@ -467,14 +505,22 @@ class Actor:
# # XXX: is this necessary (GC should do it?)
if chan.connected():
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.debug(f"Disconnecting channel {chan}")
try:
# send our msg loop terminate sentinel
# send a msg loop terminate sentinel
await chan.send(None)
# XXX: do we want this?
# causes "[104] connection reset by peer" on other end
# await chan.aclose()
except trio.BrokenResourceError:
log.exception(
f"Channel for {chan.uid} was already zonked..")
log.warning(f"Channel for {chan.uid} was already closed")
async def _push_result(
self,
@ -484,18 +530,22 @@ class Actor:
) -> None:
"""Push an RPC result to the local consumer's queue.
"""
actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}"
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
# actorid = chan.uid
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore
# if 'stop' in msg:
if 'error' in msg:
ctx = getattr(recv_chan, '_ctx', None)
# if ctx:
# ctx._error_from_remote_msg(msg)
# log.debug(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped
# return await send_chan.aclose()
try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
# maintain backpressure
await send_chan.send(msg)
@ -514,7 +564,9 @@ class Actor:
self,
actorid: Tuple[str, str],
cid: str
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
log.debug(f"Getting result queue for {actorid} cid {cid}")
try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
@ -577,9 +629,15 @@ class Actor:
if channel is chan:
await self._cancel_task(cid, channel)
# close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
log.debug(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.trace( # type: ignore
@ -685,9 +743,6 @@ class Actor:
# caller's teardown sequence to clean up.
log.warning(f"Channel from {chan.uid} closed abruptly")
except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")
except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
@ -750,8 +805,7 @@ class Actor:
# XXX: msgspec doesn't support serializing tuples
# so just cash manually here since it's what our
# internals expect.
address: Tuple[str, int] = value
self._arb_addr = address
self._arb_addr = tuple(value)
else:
setattr(self, attr, value)
@ -1132,7 +1186,6 @@ class Actor:
async def _do_handshake(
self,
chan: Channel
) -> Tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step.
@ -1140,10 +1193,11 @@ class Actor:
parlance.
"""
await chan.send(self.uid)
uid: Tuple[str, str] = await chan.recv()
# breakpoint()
uid: Tuple[str, str] = tuple(await chan.recv())
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
# if not isinstance(uid, tuple):
# raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
@ -1208,12 +1262,10 @@ class Arbiter(Actor):
return sockaddrs
async def register_actor(
self,
uid: Tuple[str, str],
sockaddr: Tuple[str, str]
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
) -> None:
name, uuid = tuple(uid)
uid = tuple(uid)
name, uuid = uid
self._registry[uid] = tuple(sockaddr)
# pop and signal all waiter events