forked from goodboy/tractor
Only send stop msg if not received from far end
parent
0b73a4b61e
commit
9e7bed646d
|
@ -53,7 +53,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
|
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(self):
|
def receive_nowait(self):
|
||||||
return self._rx_chan.receive_nowait()
|
msg = self._rx_chan.receive_nowait()
|
||||||
|
return msg['yield']
|
||||||
|
|
||||||
async def receive(self):
|
async def receive(self):
|
||||||
try:
|
try:
|
||||||
|
@ -106,6 +107,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# when the send is closed we assume the stream has
|
# when the send is closed we assume the stream has
|
||||||
# terminated and signal this local iterator to stop
|
# terminated and signal this local iterator to stop
|
||||||
await self.aclose()
|
await self.aclose()
|
||||||
|
|
||||||
# await self._ctx.send_stop()
|
# await self._ctx.send_stop()
|
||||||
raise StopAsyncIteration
|
raise StopAsyncIteration
|
||||||
|
|
||||||
|
@ -326,10 +328,14 @@ class Context:
|
||||||
try:
|
try:
|
||||||
yield rchan
|
yield rchan
|
||||||
|
|
||||||
finally:
|
except trio.EndOfChannel:
|
||||||
|
raise
|
||||||
|
|
||||||
|
else:
|
||||||
# signal ``StopAsyncIteration`` on far end.
|
# signal ``StopAsyncIteration`` on far end.
|
||||||
await self.send_stop()
|
await self.send_stop()
|
||||||
|
|
||||||
|
finally:
|
||||||
if self._portal:
|
if self._portal:
|
||||||
self._portal._streams.remove(rchan)
|
self._portal._streams.remove(rchan)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue