Always raise end of channel ; see if it fixes CI

prehardkill
Tyler Goodlet 2021-06-14 00:06:14 -04:00
parent 6c039f7581
commit caaf15b75a
1 changed files with 13 additions and 5 deletions

View File

@ -61,7 +61,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
return msg['yield'] return msg['yield']
async def receive(self): async def receive(self):
# see ``.aclose()`` to an alt to always checking this # see ``.aclose()`` for an alt to always checking this
if self._eoc: if self._eoc:
raise trio.EndOfChannel raise trio.EndOfChannel
@ -80,7 +80,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
if msg.get('stop'): if msg.get('stop'):
log.debug(f"{self} was stopped at remote end") log.debug(f"{self} was stopped at remote end")
self._eoc = True
# when the send is closed we assume the stream has # when the send is closed we assume the stream has
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
@ -121,7 +120,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
await self.aclose() await self.aclose()
raise # propagate raise trio.EndOfChannel
# if not isinstance(self, MsgStream):
# # XXX: this was how we handled this originally for the
# # single direction case?
# raise trio.EndOfChannel
# else:
# # in 2-way case raise the closed error
# raise # propagate
except trio.Cancelled: except trio.Cancelled:
# relay cancels to the remote task # relay cancels to the remote task
@ -150,6 +158,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
on close. on close.
""" """
self._eoc = True
# XXX: keep proper adherance to trio's `.aclose()` semantics: # XXX: keep proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
@ -200,8 +210,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# it can't traverse the transport. # it can't traverse the transport.
log.debug(f'Channel for {self} was already closed') log.debug(f'Channel for {self} was already closed')
self._eoc = True
# close the local mem chan??!? # close the local mem chan??!?
# NOT if we're a ``MsgStream``! # NOT if we're a ``MsgStream``!