From 700f09ce9bb622723ac8697656d5a44a53205312 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Jul 2021 07:44:03 -0400 Subject: [PATCH] Accept transport closed error during handshake and msg loop --- tractor/_actor.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2ff2314..fac73ce 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -27,6 +27,7 @@ from ._exceptions import ( unpack_error, ModuleNotExposed, is_multi_cancelled, + TransportClosed, ) from . import _debug from ._discovery import get_arbiter @@ -202,7 +203,7 @@ class Actor: enable_modules: List[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = None, + arbiter_addr: Optional[Tuple[str, int]] = (None, None), spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning @@ -232,7 +233,7 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = tuple(arbiter_addr) if arbiter_addr is not None else (None, None) + self._arb_addr = tuple(arbiter_addr) # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -338,7 +339,18 @@ class Actor: # send/receive initial handshake response try: uid = await self._do_handshake(chan) - except StopAsyncIteration: + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError, + TransportClosed, + ): + # XXX: This may propagate up from ``Channel._aiter_recv()`` + # and ``MsgpackStream._inter_packets()`` on a read from the + # stream particularly when the runtime is first starting up + # inside ``open_root_actor()`` where there is a check for + # a bound listener on the "arbiter" addr. the reset will be + # because the handshake was never meant took place. log.warning(f"Channel {chan} failed to handshake") return @@ -579,22 +591,39 @@ class Actor: ) await self.cancel_rpc_tasks(chan) + except ( + TransportClosed, + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # channels "breaking" is ok since we don't have a teardown + # handshake for them (yet) and instead we simply bail out + # of the message loop and expect the teardown sequence + # to clean up. + log.error(f"{chan} form {chan.uid} closed abruptly") + # raise + except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") + except (Exception, trio.MultiError) as err: # ship any "internal" exception (i.e. one from internal machinery # not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: await self._parent_chan.send(pack_error(err)) - raise + # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" + raise + except trio.Cancelled: # debugging only log.debug(f"Msg loop was cancelled for {chan}") raise + finally: + # msg debugging for when he machinery is brokey log.debug( f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}")