forked from goodboy/tractor
Only send stop msg if not received from far end
parent
6559fb72aa
commit
a2e2f7e7a8
|
@ -53,7 +53,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
|
||||
# delegate directly to underlying mem channel
|
||||
def receive_nowait(self):
|
||||
return self._rx_chan.receive_nowait()
|
||||
msg = self._rx_chan.receive_nowait()
|
||||
return msg['yield']
|
||||
|
||||
async def receive(self):
|
||||
try:
|
||||
|
@ -106,6 +107,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
# when the send is closed we assume the stream has
|
||||
# terminated and signal this local iterator to stop
|
||||
await self.aclose()
|
||||
|
||||
# await self._ctx.send_stop()
|
||||
raise StopAsyncIteration
|
||||
|
||||
|
@ -326,10 +328,14 @@ class Context:
|
|||
try:
|
||||
yield rchan
|
||||
|
||||
finally:
|
||||
except trio.EndOfChannel:
|
||||
raise
|
||||
|
||||
else:
|
||||
# signal ``StopAsyncIteration`` on far end.
|
||||
await self.send_stop()
|
||||
|
||||
finally:
|
||||
if self._portal:
|
||||
self._portal._streams.remove(rchan)
|
||||
|
||||
|
|
Loading…
Reference in New Issue