Accept transport closed error during handshake and msg loop

transport_cleaning
Tyler Goodlet 2021-07-01 07:44:03 -04:00
parent 80e100f818
commit 32b4ae0603
1 changed files with 32 additions and 3 deletions

View File

@ -27,6 +27,7 @@ from ._exceptions import (
unpack_error,
ModuleNotExposed,
is_multi_cancelled,
TransportClosed,
)
from . import _debug
from ._discovery import get_arbiter
@ -202,7 +203,7 @@ class Actor:
enable_modules: List[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
@ -338,7 +339,18 @@ class Actor:
# send/receive initial handshake response
try:
uid = await self._do_handshake(chan)
except StopAsyncIteration:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake")
return
@ -578,22 +590,39 @@ class Actor:
)
await self.cancel_rpc_tasks(chan)
except (
TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
):
# channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence
# to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly")
# raise
except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")
except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await self._parent_chan.send(pack_error(err))
raise
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise
except trio.Cancelled:
# debugging only
log.debug(f"Msg loop was cancelled for {chan}")
raise
finally:
# msg debugging for when he machinery is brokey
log.debug(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")