Drop stream shielding; it was from a legacy design
The whole origin was not having an explicit open/close semantic for streams. We have that now so this internal mechanic isn't needed and further our streams become more correct by having `.aclose()` be independent of cancellation.tokio_backup
parent
a105e32e34
commit
0c6e7ca351
|
@ -313,12 +313,12 @@ async def test_respawn_consumer_task(
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
|
|
||||||
# shield stream's underlying channel from cancellation
|
# shield stream's underlying channel from cancellation
|
||||||
with stream.shield():
|
# with stream.shield():
|
||||||
|
|
||||||
async for v in stream:
|
async for v in stream:
|
||||||
print(f'from stream: {v}')
|
print(f'from stream: {v}')
|
||||||
expect.remove(v)
|
expect.remove(v)
|
||||||
received.append(v)
|
received.append(v)
|
||||||
|
|
||||||
print('exited consume')
|
print('exited consume')
|
||||||
|
|
||||||
|
|
|
@ -46,11 +46,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
self,
|
self,
|
||||||
ctx: 'Context', # typing: ignore # noqa
|
ctx: 'Context', # typing: ignore # noqa
|
||||||
rx_chan: trio.abc.ReceiveChannel,
|
rx_chan: trio.abc.ReceiveChannel,
|
||||||
shield: bool = False,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
self._ctx = ctx
|
self._ctx = ctx
|
||||||
self._rx_chan = rx_chan
|
self._rx_chan = rx_chan
|
||||||
self._shielded = shield
|
|
||||||
|
|
||||||
# flag to denote end of stream
|
# flag to denote end of stream
|
||||||
self._eoc: bool = False
|
self._eoc: bool = False
|
||||||
|
@ -103,7 +101,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError, # by self._rx_chan
|
trio.ClosedResourceError, # by self._rx_chan
|
||||||
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
||||||
trio.Cancelled, # by local cancellation
|
|
||||||
|
# Wait why would we do an implicit close on cancel? THAT'S
|
||||||
|
# NOT HOW MEM CHANS WORK!!?!?!?!?
|
||||||
|
# trio.Cancelled, # by local cancellation
|
||||||
):
|
):
|
||||||
# XXX: we close the stream on any of these error conditions:
|
# XXX: we close the stream on any of these error conditions:
|
||||||
|
|
||||||
|
@ -135,23 +136,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
|
|
||||||
raise # propagate
|
raise # propagate
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def shield(
|
|
||||||
self
|
|
||||||
) -> Iterator['ReceiveMsgStream']: # noqa
|
|
||||||
"""Shield this stream's underlying channel such that a local consumer task
|
|
||||||
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
|
|
||||||
|
|
||||||
Note that here, "shielding" here guards against relaying
|
|
||||||
a ``'stop'`` message to the far end of the stream thus keeping
|
|
||||||
the stream machinery active and ready for further use, it does
|
|
||||||
not have anything to do with an internal ``trio.CancelScope``.
|
|
||||||
|
|
||||||
"""
|
|
||||||
self._shielded = True
|
|
||||||
yield self
|
|
||||||
self._shielded = False
|
|
||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
"""Cancel associated remote actor task and local memory channel
|
"""Cancel associated remote actor task and local memory channel
|
||||||
on close.
|
on close.
|
||||||
|
@ -169,18 +153,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||||
return
|
return
|
||||||
|
|
||||||
# TODO: broadcasting to multiple consumers
|
# if self._shielded:
|
||||||
# stats = rx_chan.statistics()
|
# log.warning(f"{self} is shielded, portal channel being kept alive")
|
||||||
# if stats.open_receive_channels > 1:
|
|
||||||
# # if we've been cloned don't kill the stream
|
|
||||||
# log.debug(
|
|
||||||
# "there are still consumers running keeping stream alive")
|
|
||||||
# return
|
# return
|
||||||
|
|
||||||
if self._shielded:
|
|
||||||
log.warning(f"{self} is shielded, portal channel being kept alive")
|
|
||||||
return
|
|
||||||
|
|
||||||
# XXX: This must be set **AFTER** the shielded test above!
|
# XXX: This must be set **AFTER** the shielded test above!
|
||||||
self._eoc = True
|
self._eoc = True
|
||||||
|
|
||||||
|
@ -397,7 +373,6 @@ class Context:
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
shield: bool = False,
|
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
'''Open a ``MsgStream``, a bi-directional stream connected to the
|
'''Open a ``MsgStream``, a bi-directional stream connected to the
|
||||||
|
@ -455,7 +430,6 @@ class Context:
|
||||||
async with MsgStream(
|
async with MsgStream(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
rx_chan=recv_chan,
|
rx_chan=recv_chan,
|
||||||
shield=shield,
|
|
||||||
) as rchan:
|
) as rchan:
|
||||||
|
|
||||||
if self._portal:
|
if self._portal:
|
||||||
|
|
Loading…
Reference in New Issue