Handle broken-pipes from `MsgpackTransport.send()`

Much like we already do in the `._iter_packets()` async-generator which
delivers to `.recv()` and `async for`, handle the `''[Errno 32] Broken
pipe'` case that can show up with unix-domain-socket usage.

Seems like the cause is due to how fast the socket can be torn down
during a registry addr channel ping where,
- the sending side can break the connection faster then the pong side
  can prep its handshake msg,
- the pong side tries to send it's handshake pkt via
  `.SocketStream.send_all()` after the breakage and then raises
  `trio.BrokenResourceError`.
leslies_extra_appendix
Tyler Goodlet 2025-04-01 12:56:28 -04:00
parent 012b5fd8c2
commit a28659c3cd
1 changed files with 27 additions and 1 deletions

View File

@ -412,7 +412,33 @@ class MsgpackTransport(MsgTransport):
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962
size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
try:
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as trans_err:
loglevel = 'transport'
match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport afaik?
# likely todo with races related to how fast
# the socket is setup/torn-down on linux..
):
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
loglevel=loglevel,
) from trans_err
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
raise trans_err
# ?TODO? does it help ever to dynamically show this
# frame?