forked from goodboy/tractor
1
0
Fork 0

Map broken stream errs to transport closed; msgspec seems to be racy

msgspec_infect_asyncio
Tyler Goodlet 2021-09-05 18:48:09 -04:00
parent c27b00687c
commit b8b264ae54
2 changed files with 12 additions and 2 deletions

View File

@ -430,7 +430,10 @@ class Actor:
uid = await self._do_handshake(chan) uid = await self._do_handshake(chan)
except ( except (
# we need this for ``msgspec`` for some reason?
# for now, it's been put in the stream backend.
# trio.BrokenResourceError, # trio.BrokenResourceError,
# trio.ClosedResourceError, # trio.ClosedResourceError,
TransportClosed, TransportClosed,
): ):
@ -797,7 +800,7 @@ class Actor:
# XXX: msgspec doesn't support serializing tuples # XXX: msgspec doesn't support serializing tuples
# so just cash manually here since it's what our # so just cash manually here since it's what our
# internals expect. # internals expect.
address: Tuple[str, int] = tuple(value) address: Tuple[str, int] = tuple(value) if value else value
self._arb_addr = address self._arb_addr = address
else: else:

View File

@ -148,7 +148,14 @@ class MsgspecTCPStream(MsgpackTCPStream):
try: try:
header = await self.recv_stream.receive_exactly(4) header = await self.recv_stream.receive_exactly(4)
except (ValueError): except (
ValueError,
# not sure entirely why we need this but without it we
# seem to be getting racy failures here on
# arbiter/registry name subs..
trio.BrokenResourceError,
):
raise TransportClosed( raise TransportClosed(
f'transport {self} was already closed prior ro read' f'transport {self} was already closed prior ro read'
) )