From 6d72a4ef454f117175d7b513e932862c24911d60 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jun 2021 15:55:03 -0400 Subject: [PATCH] Go back to only logging crashes if no pdb gets engaged --- tractor/_actor.py | 146 +++++++++++++++++++++++++++++++--------------- 1 file changed, 99 insertions(+), 47 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index f2e1ffb..ab7a321 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -12,6 +12,7 @@ import uuid import typing from typing import Dict, List, Tuple, Any, Optional, Union from types import ModuleType +import signal import sys import os from contextlib import ExitStack @@ -47,6 +48,7 @@ class ActorFailure(Exception): async def _invoke( + actor: 'Actor', cid: str, chan: Channel, @@ -60,13 +62,14 @@ async def _invoke( """ treat_as_gen = False - # possibly a traceback object - # (not sure what typing is for this..) + # possible a traceback (not sure what typing is for this..) tb = None cancel_scope = trio.CancelScope() - ctx = Context(chan, cid, _cancel_scope=cancel_scope) - context = False + cs: trio.CancelScope = None + + ctx = Context(chan, cid) + context: bool = False if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions @@ -154,7 +157,9 @@ async def _invoke( # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) - with cancel_scope as cs: + async with trio.open_nursery() as scope_nursery: + ctx._scope_nursery = scope_nursery + cs = scope_nursery.cancel_scope task_status.started(cs) try: await chan.send({'return': await coro, 'cid': cid}) @@ -192,19 +197,32 @@ async def _invoke( except (Exception, trio.MultiError) as err: - # TODO: maybe we'll want differnet "levels" of debugging - # eventualy such as ('app', 'supervisory', 'runtime') ? - if not isinstance(err, trio.ClosedResourceError) and ( - not is_multi_cancelled(err)) and ( - not isinstance(err, ContextCancelled) - ): - # XXX: is there any case where we'll want to debug IPC - # disconnects? I can't think of a reason that inspecting - # this type of failure will be useful for respawns or - # recovery logic - the only case is some kind of strange bug - # in `trio` itself? - entered = await _debug._maybe_enter_pm(err) - if not entered: + if not is_multi_cancelled(err): + + log.exception("Actor crashed:") + + # TODO: maybe we'll want different "levels" of debugging + # eventualy such as ('app', 'supervisory', 'runtime') ? + + # if not isinstance(err, trio.ClosedResourceError) and ( + # if not is_multi_cancelled(err) and ( + + entered_debug: bool = False + if not isinstance(err, ContextCancelled) or ( + isinstance(err, ContextCancelled) and ctx._cancel_called + ): + # XXX: is there any case where we'll want to debug IPC + # disconnects as a default? + # + # I can't think of a reason that inspecting + # this type of failure will be useful for respawns or + # recovery logic - the only case is some kind of strange bug + # in our transport layer itself? Going to keep this + # open ended for now. + + entered_debug = await _debug._maybe_enter_pm(err) + + if not entered_debug: log.exception("Actor crashed:") # always ship errors back to caller @@ -214,8 +232,10 @@ async def _invoke( await chan.send(err_msg) except trio.ClosedResourceError: - log.warning( - f"Failed to ship error to caller @ {chan.uid}") + # if we can't propagate the error that's a big boo boo + log.error( + f"Failed to ship error to caller @ {chan.uid} !?" + ) if cs is None: # error is from above code not from rpc invocation @@ -271,7 +291,7 @@ class Actor: enable_modules: List[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = None, + arbiter_addr: Optional[Tuple[str, int]] = (None, None), spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning @@ -301,8 +321,7 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - - self._arb_addr = arbiter_addr or (None, None) + self._arb_addr = tuple(arbiter_addr) # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -396,12 +415,16 @@ class Actor: raise mne async def _stream_handler( + self, stream: trio.SocketStream, + ) -> None: """Entry point for new inbound connections to the channel server. + """ self._no_more_peers = trio.Event() # unset + chan = Channel(stream=stream) log.runtime(f"New connection to us {chan}") @@ -435,8 +458,9 @@ class Actor: chans = self._peers[uid] - # TODO: re-use channels for new connections instead of always - # new ones; will require changing all the discovery funcs + # TODO: re-use channels for new connections instead + # of always new ones; will require changing all the + # discovery funcs if chans: log.runtime( f"already have channel(s) for {uid}:{chans}?" @@ -451,10 +475,24 @@ class Actor: try: await self._process_messages(chan) finally: + + # channel cleanup sequence + + # for (channel, cid) in self._rpc_tasks.copy(): + # if channel is chan: + # with trio.CancelScope(shield=True): + # await self._cancel_task(cid, channel) + + # # close all consumer side task mem chans + # send_chan, _ = self._cids2qs[(chan.uid, cid)] + # assert send_chan.cid == cid # type: ignore + # await send_chan.aclose() + # Drop ref to channel so it can be gc-ed and disconnected log.debug(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) + if not chans: log.debug(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) @@ -467,14 +505,22 @@ class Actor: # # XXX: is this necessary (GC should do it?) if chan.connected(): + # if the channel is still connected it may mean the far + # end has not closed and we may have gotten here due to + # an error and so we should at least try to terminate + # the channel from this end gracefully. + log.debug(f"Disconnecting channel {chan}") try: - # send our msg loop terminate sentinel + # send a msg loop terminate sentinel await chan.send(None) + + # XXX: do we want this? + # causes "[104] connection reset by peer" on other end # await chan.aclose() + except trio.BrokenResourceError: - log.exception( - f"Channel for {chan.uid} was already zonked..") + log.warning(f"Channel for {chan.uid} was already closed") async def _push_result( self, @@ -484,18 +530,22 @@ class Actor: ) -> None: """Push an RPC result to the local consumer's queue. """ - actorid = chan.uid - assert actorid, f"`actorid` can't be {actorid}" - send_chan, recv_chan = self._cids2qs[(actorid, cid)] + # actorid = chan.uid + assert chan.uid, f"`chan.uid` can't be {chan.uid}" + send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] assert send_chan.cid == cid # type: ignore - # if 'stop' in msg: + if 'error' in msg: + ctx = getattr(recv_chan, '_ctx', None) + # if ctx: + # ctx._error_from_remote_msg(msg) + # log.debug(f"{send_chan} was terminated at remote end") # # indicate to consumer that far end has stopped # return await send_chan.aclose() try: - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") # maintain backpressure await send_chan.send(msg) @@ -514,7 +564,9 @@ class Actor: self, actorid: Tuple[str, str], cid: str + ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: + log.debug(f"Getting result queue for {actorid} cid {cid}") try: send_chan, recv_chan = self._cids2qs[(actorid, cid)] @@ -577,9 +629,15 @@ class Actor: if channel is chan: await self._cancel_task(cid, channel) + # close all consumer side task mem chans + # send_chan, _ = self._cids2qs[(chan.uid, cid)] + # assert send_chan.cid == cid # type: ignore + # await send_chan.aclose() + log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") + break log.trace( # type: ignore @@ -685,9 +743,6 @@ class Actor: # caller's teardown sequence to clean up. log.warning(f"Channel from {chan.uid} closed abruptly") - except trio.ClosedResourceError: - log.error(f"{chan} form {chan.uid} broke") - except (Exception, trio.MultiError) as err: # ship any "internal" exception (i.e. one from internal machinery # not from an rpc task) to parent @@ -750,8 +805,7 @@ class Actor: # XXX: msgspec doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - address: Tuple[str, int] = value - self._arb_addr = address + self._arb_addr = tuple(value) else: setattr(self, attr, value) @@ -1132,7 +1186,6 @@ class Actor: async def _do_handshake( self, chan: Channel - ) -> Tuple[str, str]: """Exchange (name, UUIDs) identifiers as the first communication step. @@ -1140,10 +1193,11 @@ class Actor: parlance. """ await chan.send(self.uid) - uid: Tuple[str, str] = await chan.recv() + # breakpoint() + uid: Tuple[str, str] = tuple(await chan.recv()) - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") + # if not isinstance(uid, tuple): + # raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") @@ -1208,12 +1262,10 @@ class Arbiter(Actor): return sockaddrs async def register_actor( - self, - uid: Tuple[str, str], - sockaddr: Tuple[str, str] - + self, uid: Tuple[str, str], sockaddr: Tuple[str, int] ) -> None: - name, uuid = tuple(uid) + uid = tuple(uid) + name, uuid = uid self._registry[uid] = tuple(sockaddr) # pop and signal all waiter events