Don't clobber msg loop mem chan on rx stream close
Revert this change since it really is poking at internals and doesn't make a lot of sense. If the context is going to be cancelled then the msg loop will tear down the feed memory channel when ready, we don't need to be clobbering it and confusing the runtime machinery lol.try_msgspec
parent
6c039f7581
commit
715dc9afc6
tractor
|
@ -332,12 +332,6 @@ class Portal:
|
||||||
# message right now since there shouldn't be a reason to
|
# message right now since there shouldn't be a reason to
|
||||||
# stop and restart the stream, right?
|
# stop and restart the stream, right?
|
||||||
try:
|
try:
|
||||||
|
|
||||||
# We are for sure done with this stream and no more
|
|
||||||
# messages are expected to be delivered from the
|
|
||||||
# runtime's msg loop.
|
|
||||||
await recv_chan.aclose()
|
|
||||||
|
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
|
|
|
@ -61,7 +61,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
async def receive(self):
|
async def receive(self):
|
||||||
# see ``.aclose()`` to an alt to always checking this
|
# see ``.aclose()`` for an alt to always checking this
|
||||||
if self._eoc:
|
if self._eoc:
|
||||||
raise trio.EndOfChannel
|
raise trio.EndOfChannel
|
||||||
|
|
||||||
|
@ -80,7 +80,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
|
|
||||||
if msg.get('stop'):
|
if msg.get('stop'):
|
||||||
log.debug(f"{self} was stopped at remote end")
|
log.debug(f"{self} was stopped at remote end")
|
||||||
self._eoc = True
|
|
||||||
|
|
||||||
# when the send is closed we assume the stream has
|
# when the send is closed we assume the stream has
|
||||||
# terminated and signal this local iterator to stop
|
# terminated and signal this local iterator to stop
|
||||||
|
@ -150,6 +149,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
on close.
|
on close.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
self._eoc = True
|
||||||
|
|
||||||
# XXX: keep proper adherance to trio's `.aclose()` semantics:
|
# XXX: keep proper adherance to trio's `.aclose()` semantics:
|
||||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||||
rx_chan = self._rx_chan
|
rx_chan = self._rx_chan
|
||||||
|
@ -200,8 +201,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# it can't traverse the transport.
|
# it can't traverse the transport.
|
||||||
log.debug(f'Channel for {self} was already closed')
|
log.debug(f'Channel for {self} was already closed')
|
||||||
|
|
||||||
self._eoc = True
|
|
||||||
|
|
||||||
# close the local mem chan??!?
|
# close the local mem chan??!?
|
||||||
|
|
||||||
# NOT if we're a ``MsgStream``!
|
# NOT if we're a ``MsgStream``!
|
||||||
|
|
Loading…
Reference in New Issue