forked from goodboy/tractor
Specially remap TCP 104-connection-reset to `TransportClosed`
Since we currently have no real "discovery protocol" between process trees, the current naive approach is to check via a connect and drop to see if a TCP server is bound to a particular address during root actor startup. This was a historical decision and had no real grounding beyond taking a simple approach to get something working when the project was first started. This is obviously problematic from an error handling perspective since we need to be able to avoid such quick connect-and-drops from cancelling an "arbiter"'s (registry actor's) channel-msg loop machinery (which would propagate and cancel the actor). For now we map this particular TCP error, which gets remapped by `trio` as a `trio.BrokenResourceError` to our own internal `TransportClosed` which is swallowed by channel message loop processing and indicates a graceful teardown of the far end actor.transport_cleaning
parent
a2d400583f
commit
1edf5c2f06
|
@ -263,7 +263,7 @@ class Actor:
|
||||||
self._parent_chan: Optional[Channel] = None
|
self._parent_chan: Optional[Channel] = None
|
||||||
self._forkserver_info: Optional[
|
self._forkserver_info: Optional[
|
||||||
Tuple[Any, Any, Any, Any, Any]] = None
|
Tuple[Any, Any, Any, Any, Any]] = None
|
||||||
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore
|
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self, uid: Tuple[str, str]
|
self, uid: Tuple[str, str]
|
||||||
|
@ -341,8 +341,8 @@ class Actor:
|
||||||
uid = await self._do_handshake(chan)
|
uid = await self._do_handshake(chan)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
# trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError,
|
# trio.ClosedResourceError,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
):
|
):
|
||||||
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
||||||
|
@ -592,20 +592,16 @@ class Actor:
|
||||||
|
|
||||||
except (
|
except (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
trio.BrokenResourceError,
|
|
||||||
trio.ClosedResourceError
|
|
||||||
):
|
):
|
||||||
# channels "breaking" is ok since we don't have a teardown
|
# channels "breaking" (for TCP streams by EOF or 104
|
||||||
# handshake for them (yet) and instead we simply bail out
|
# connection-reset) is ok since we don't have a teardown
|
||||||
# of the message loop and expect the teardown sequence
|
# handshake for them (yet) and instead we simply bail out of
|
||||||
# to clean up.
|
# the message loop and expect the teardown sequence to clean
|
||||||
log.error(f"{chan} form {chan.uid} closed abruptly")
|
# up.
|
||||||
# raise
|
log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
|
||||||
log.error(f"{chan} form {chan.uid} broke")
|
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
|
|
||||||
# ship any "internal" exception (i.e. one from internal machinery
|
# ship any "internal" exception (i.e. one from internal machinery
|
||||||
# not from an rpc task) to parent
|
# not from an rpc task) to parent
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
|
|
|
@ -57,7 +57,26 @@ class MsgpackTCPStream:
|
||||||
use_list=False,
|
use_list=False,
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
|
try:
|
||||||
data = await self.stream.receive_some(2**10)
|
data = await self.stream.receive_some(2**10)
|
||||||
|
|
||||||
|
except trio.BrokenResourceError as err:
|
||||||
|
msg = err.args[0]
|
||||||
|
|
||||||
|
# XXX: handle connection-reset-by-peer the same as a EOF.
|
||||||
|
# we're currently remapping this since we allow
|
||||||
|
# a quick connect then drop for root actors when
|
||||||
|
# checking to see if there exists an "arbiter"
|
||||||
|
# on the chosen sockaddr (``_root.py:108`` or thereabouts)
|
||||||
|
if '[Errno 104]' in msg:
|
||||||
|
raise TransportClosed(
|
||||||
|
f'{self} was broken with {msg}'
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
log.trace(f"received {data}") # type: ignore
|
log.trace(f"received {data}") # type: ignore
|
||||||
|
|
||||||
if data == b'':
|
if data == b'':
|
||||||
|
@ -175,11 +194,13 @@ class Channel:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
log.debug(f"Closing {self}")
|
log.debug(
|
||||||
|
f'Closing channel to {self.uid} '
|
||||||
|
f'{self.laddr} -> {self.raddr}'
|
||||||
|
)
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
await self.msgstream.stream.aclose()
|
await self.msgstream.stream.aclose()
|
||||||
self._closed = True
|
self._closed = True
|
||||||
log.error(f'CLOSING CHAN {self}')
|
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
await self.connect()
|
await self.connect()
|
||||||
|
|
|
@ -105,6 +105,11 @@ async def open_root_actor(
|
||||||
arbiter_found = False
|
arbiter_found = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# TODO: this connect-and-bail forces us to have to carefully
|
||||||
|
# rewrap TCP 104-connection-reset errors as EOF so as to avoid
|
||||||
|
# propagating cancel-causing errors to the channel-msg loop
|
||||||
|
# machinery. Likely it would be better to eventually have
|
||||||
|
# a "discovery" protocol with basic handshake instead.
|
||||||
async with _connect_chan(host, port):
|
async with _connect_chan(host, port):
|
||||||
arbiter_found = True
|
arbiter_found = True
|
||||||
|
|
||||||
|
@ -174,8 +179,11 @@ async def open_root_actor(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
logger.info("Shutting down root actor")
|
logger.info("Shutting down root actor")
|
||||||
|
try:
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await actor.cancel()
|
await actor.cancel()
|
||||||
|
except Exception as err:
|
||||||
|
log.warning('Root was already cancelled')
|
||||||
finally:
|
finally:
|
||||||
_state._current_actor = None
|
_state._current_actor = None
|
||||||
logger.info("Root actor terminated")
|
logger.info("Root actor terminated")
|
||||||
|
|
Loading…
Reference in New Issue