Drop stream shielding; it was from a legacy design

The whole origin was not having an explicit open/close semantic for
streams. We have that now so this internal mechanic isn't needed and
further our streams become more correct by having `.aclose()` be
independent of cancellation.
alpha2
Tyler Goodlet 2021-08-31 20:47:50 -04:00
parent 07e43f88bf
commit af85d35685
2 changed files with 9 additions and 39 deletions

View File

@ -313,7 +313,7 @@ async def test_respawn_consumer_task(
task_status.started(cs) task_status.started(cs)
# shield stream's underlying channel from cancellation # shield stream's underlying channel from cancellation
with stream.shield(): # with stream.shield():
async for v in stream: async for v in stream:
print(f'from stream: {v}') print(f'from stream: {v}')

View File

@ -46,11 +46,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self, self,
ctx: 'Context', # typing: ignore # noqa ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.abc.ReceiveChannel, rx_chan: trio.abc.ReceiveChannel,
shield: bool = False,
) -> None: ) -> None:
self._ctx = ctx self._ctx = ctx
self._rx_chan = rx_chan self._rx_chan = rx_chan
self._shielded = shield
# flag to denote end of stream # flag to denote end of stream
self._eoc: bool = False self._eoc: bool = False
@ -103,7 +101,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
except ( except (
trio.ClosedResourceError, # by self._rx_chan trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
trio.Cancelled, # by local cancellation
# Wait why would we do an implicit close on cancel? THAT'S
# NOT HOW MEM CHANS WORK!!?!?!?!?
# trio.Cancelled, # by local cancellation
): ):
# XXX: we close the stream on any of these error conditions: # XXX: we close the stream on any of these error conditions:
@ -135,23 +136,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
raise # propagate raise # propagate
@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
Note that here, "shielding" here guards against relaying
a ``'stop'`` message to the far end of the stream thus keeping
the stream machinery active and ready for further use, it does
not have anything to do with an internal ``trio.CancelScope``.
"""
self._shielded = True
yield self
self._shielded = False
async def aclose(self): async def aclose(self):
"""Cancel associated remote actor task and local memory channel """Cancel associated remote actor task and local memory channel
on close. on close.
@ -169,18 +153,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return return
# TODO: broadcasting to multiple consumers
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return
if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return
# XXX: This must be set **AFTER** the shielded test above! # XXX: This must be set **AFTER** the shielded test above!
self._eoc = True self._eoc = True
@ -397,7 +369,6 @@ class Context:
async def open_stream( async def open_stream(
self, self,
shield: bool = False,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
'''Open a ``MsgStream``, a bi-directional stream connected to the '''Open a ``MsgStream``, a bi-directional stream connected to the
@ -455,7 +426,6 @@ class Context:
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=recv_chan, rx_chan=recv_chan,
shield=shield,
) as rchan: ) as rchan:
if self._portal: if self._portal: