forked from goodboy/tractor
Accept transport closed error during handshake and msg loop
parent
1c4d418f81
commit
134a84e39e
|
@ -31,6 +31,7 @@ from ._exceptions import (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._discovery import get_arbiter
|
from ._discovery import get_arbiter
|
||||||
|
@ -249,7 +250,7 @@ class Actor:
|
||||||
enable_modules: List[str] = [],
|
enable_modules: List[str] = [],
|
||||||
uid: str = None,
|
uid: str = None,
|
||||||
loglevel: str = None,
|
loglevel: str = None,
|
||||||
arbiter_addr: Optional[Tuple[str, int]] = None,
|
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
|
||||||
spawn_method: Optional[str] = None
|
spawn_method: Optional[str] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
"""This constructor is called in the parent actor **before** the spawning
|
"""This constructor is called in the parent actor **before** the spawning
|
||||||
|
@ -279,7 +280,7 @@ class Actor:
|
||||||
# TODO: consider making this a dynamically defined
|
# TODO: consider making this a dynamically defined
|
||||||
# @dataclass once we get py3.7
|
# @dataclass once we get py3.7
|
||||||
self.loglevel = loglevel
|
self.loglevel = loglevel
|
||||||
self._arb_addr = tuple(arbiter_addr) if arbiter_addr is not None else (None, None)
|
self._arb_addr = tuple(arbiter_addr)
|
||||||
|
|
||||||
# marked by the process spawning backend at startup
|
# marked by the process spawning backend at startup
|
||||||
# will be None for the parent most process started manually
|
# will be None for the parent most process started manually
|
||||||
|
@ -385,7 +386,18 @@ class Actor:
|
||||||
# send/receive initial handshake response
|
# send/receive initial handshake response
|
||||||
try:
|
try:
|
||||||
uid = await self._do_handshake(chan)
|
uid = await self._do_handshake(chan)
|
||||||
except StopAsyncIteration:
|
|
||||||
|
except (
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
trio.ClosedResourceError,
|
||||||
|
TransportClosed,
|
||||||
|
):
|
||||||
|
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
||||||
|
# and ``MsgpackStream._inter_packets()`` on a read from the
|
||||||
|
# stream particularly when the runtime is first starting up
|
||||||
|
# inside ``open_root_actor()`` where there is a check for
|
||||||
|
# a bound listener on the "arbiter" addr. the reset will be
|
||||||
|
# because the handshake was never meant took place.
|
||||||
log.warning(f"Channel {chan} failed to handshake")
|
log.warning(f"Channel {chan} failed to handshake")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -636,22 +648,39 @@ class Actor:
|
||||||
)
|
)
|
||||||
await self.cancel_rpc_tasks(chan)
|
await self.cancel_rpc_tasks(chan)
|
||||||
|
|
||||||
|
except (
|
||||||
|
TransportClosed,
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
trio.ClosedResourceError
|
||||||
|
):
|
||||||
|
# channels "breaking" is ok since we don't have a teardown
|
||||||
|
# handshake for them (yet) and instead we simply bail out
|
||||||
|
# of the message loop and expect the teardown sequence
|
||||||
|
# to clean up.
|
||||||
|
log.error(f"{chan} form {chan.uid} closed abruptly")
|
||||||
|
# raise
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.error(f"{chan} form {chan.uid} broke")
|
log.error(f"{chan} form {chan.uid} broke")
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# ship any "internal" exception (i.e. one from internal machinery
|
# ship any "internal" exception (i.e. one from internal machinery
|
||||||
# not from an rpc task) to parent
|
# not from an rpc task) to parent
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
if self._parent_chan:
|
if self._parent_chan:
|
||||||
await self._parent_chan.send(pack_error(err))
|
await self._parent_chan.send(pack_error(err))
|
||||||
raise
|
|
||||||
# if this is the `MainProcess` we expect the error broadcasting
|
# if this is the `MainProcess` we expect the error broadcasting
|
||||||
# above to trigger an error at consuming portal "checkpoints"
|
# above to trigger an error at consuming portal "checkpoints"
|
||||||
|
raise
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
# debugging only
|
# debugging only
|
||||||
log.debug(f"Msg loop was cancelled for {chan}")
|
log.debug(f"Msg loop was cancelled for {chan}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
# msg debugging for when he machinery is brokey
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}")
|
||||||
|
|
Loading…
Reference in New Issue