Set stream "end of channel" after shielded check!
Another face palm that was causing serious issues for code that is using the `.shielded` feature.. Add a bunch more detailed comments for all this subtlety and hopefully get it right once and for all. Also aggregated the `trio` errors that should trigger closure inside `.aclose()`, hopefully that's right too.bi_streaming
parent
59c8f72952
commit
288e2b5db1
|
@ -61,7 +61,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
async def receive(self):
|
async def receive(self):
|
||||||
# see ``.aclose()`` for an alt to always checking this
|
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||||
|
# introducing this
|
||||||
if self._eoc:
|
if self._eoc:
|
||||||
raise trio.EndOfChannel
|
raise trio.EndOfChannel
|
||||||
|
|
||||||
|
@ -81,12 +82,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
if msg.get('stop'):
|
if msg.get('stop'):
|
||||||
log.debug(f"{self} was stopped at remote end")
|
log.debug(f"{self} was stopped at remote end")
|
||||||
|
|
||||||
# when the send is closed we assume the stream has
|
# # when the send is closed we assume the stream has
|
||||||
# terminated and signal this local iterator to stop
|
# # terminated and signal this local iterator to stop
|
||||||
await self.aclose()
|
# await self.aclose()
|
||||||
|
|
||||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||||
# raise a ``StopAsyncIteration``.
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
|
# block below it will trigger ``.aclose()``.
|
||||||
raise trio.EndOfChannel
|
raise trio.EndOfChannel
|
||||||
|
|
||||||
# TODO: test that shows stream raising an expected error!!!
|
# TODO: test that shows stream raising an expected error!!!
|
||||||
|
@ -97,24 +99,34 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except (
|
||||||
# XXX: this indicates that a `stop` message was
|
trio.ClosedResourceError, # by self._rx_chan
|
||||||
# sent by the far side of the underlying channel.
|
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
||||||
# Currently this is triggered by calling ``.aclose()`` on
|
trio.Cancelled, # by local cancellation
|
||||||
|
):
|
||||||
|
# XXX: we close the stream on any of these error conditions:
|
||||||
|
|
||||||
|
# a ``ClosedResourceError`` indicates that the internal
|
||||||
|
# feeder memory receive channel was closed likely by the
|
||||||
|
# runtime after the associated transport-channel
|
||||||
|
# disconnected or broke.
|
||||||
|
|
||||||
|
# an ``EndOfChannel`` indicates either the internal recv
|
||||||
|
# memchan exhausted **or** we raisesd it just above after
|
||||||
|
# receiving a `stop` message from the far end of the stream.
|
||||||
|
|
||||||
|
# Previously this was triggered by calling ``.aclose()`` on
|
||||||
# the send side of the channel inside
|
# the send side of the channel inside
|
||||||
# ``Actor._push_result()``, but maybe it should be put here?
|
# ``Actor._push_result()`` (should still be commented code
|
||||||
# to avoid exposing the internal mem chan closing mechanism?
|
# there - which should eventually get removed), but now the
|
||||||
# in theory we could instead do some flushing of the channel
|
# 'stop' message handling has been put just above.
|
||||||
# if needed to ensure all consumers are complete before
|
|
||||||
# triggering closure too early?
|
|
||||||
|
|
||||||
# Locally, we want to close this stream gracefully, by
|
# TODO: Locally, we want to close this stream gracefully, by
|
||||||
# terminating any local consumers tasks deterministically.
|
# terminating any local consumers tasks deterministically.
|
||||||
# We **don't** want to be closing this send channel and not
|
# One we have broadcast support, we **don't** want to be
|
||||||
# relaying a final value to remaining consumers who may not
|
# closing this stream and not flushing a final value to
|
||||||
# have been scheduled to receive it yet?
|
# remaining (clone) consumers who may not have been
|
||||||
|
# scheduled to receive it yet.
|
||||||
# lots of testing to do here
|
|
||||||
|
|
||||||
# when the send is closed we assume the stream has
|
# when the send is closed we assume the stream has
|
||||||
# terminated and signal this local iterator to stop
|
# terminated and signal this local iterator to stop
|
||||||
|
@ -122,10 +134,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
|
|
||||||
raise # propagate
|
raise # propagate
|
||||||
|
|
||||||
except trio.Cancelled:
|
# except trio.Cancelled:
|
||||||
# relay cancels to the remote task
|
# # relay cancels to the remote task
|
||||||
await self.aclose()
|
# await self.aclose()
|
||||||
raise
|
# raise
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def shield(
|
def shield(
|
||||||
|
@ -149,8 +161,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
on close.
|
on close.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self._eoc = True
|
|
||||||
|
|
||||||
# XXX: keep proper adherance to trio's `.aclose()` semantics:
|
# XXX: keep proper adherance to trio's `.aclose()` semantics:
|
||||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||||
rx_chan = self._rx_chan
|
rx_chan = self._rx_chan
|
||||||
|
@ -175,6 +185,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
log.warning(f"{self} is shielded, portal channel being kept alive")
|
log.warning(f"{self} is shielded, portal channel being kept alive")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# XXX: This must be set **AFTER** the shielded test above!
|
||||||
|
self._eoc = True
|
||||||
|
|
||||||
# NOTE: this is super subtle IPC messaging stuff:
|
# NOTE: this is super subtle IPC messaging stuff:
|
||||||
# Relay stop iteration to far end **iff** we're
|
# Relay stop iteration to far end **iff** we're
|
||||||
# in bidirectional mode. If we're only streaming
|
# in bidirectional mode. If we're only streaming
|
||||||
|
@ -186,9 +199,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# isn't expecting messages to be sent by the caller.
|
# isn't expecting messages to be sent by the caller.
|
||||||
# Thus, we must check that this context DOES NOT
|
# Thus, we must check that this context DOES NOT
|
||||||
# have a portal reference to ensure this is indeed the callee
|
# have a portal reference to ensure this is indeed the callee
|
||||||
# side and can relay a 'stop'. In the bidirectional case,
|
# side and can relay a 'stop'.
|
||||||
# `Context.open_stream()` will create the `Actor._cids2qs`
|
|
||||||
# entry from a call to `Actor.get_memchans()`.
|
# In the bidirectional case, `Context.open_stream()` will create
|
||||||
|
# the `Actor._cids2qs` entry from a call to
|
||||||
|
# `Actor.get_memchans()` and will send the stop message in
|
||||||
|
# ``__aexit__()`` on teardown so it **does not** need to be
|
||||||
|
# called here.
|
||||||
if not self._ctx._portal:
|
if not self._ctx._portal:
|
||||||
try:
|
try:
|
||||||
# only for 2 way streams can we can send
|
# only for 2 way streams can we can send
|
||||||
|
@ -201,32 +218,41 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# it can't traverse the transport.
|
# it can't traverse the transport.
|
||||||
log.debug(f'Channel for {self} was already closed')
|
log.debug(f'Channel for {self} was already closed')
|
||||||
|
|
||||||
# close the local mem chan??!?
|
# close the local mem chan ``self._rx_chan`` ??!?
|
||||||
|
|
||||||
# NOT if we're a ``MsgStream``!
|
# DEFINITELY NOT if we're a bi-dir ``MsgStream``!
|
||||||
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
||||||
# the potential final result from the surrounding inter-actor
|
# the potential final result from the surrounding inter-actor
|
||||||
# `Context` so we don't want to close it until that context has
|
# `Context` so we don't want to close it until that context has
|
||||||
# run to completion.
|
# run to completion.
|
||||||
|
|
||||||
# XXX: Notes on old behaviour.
|
# XXX: Notes on old behaviour:
|
||||||
|
# await rx_chan.aclose()
|
||||||
|
|
||||||
# In the receive-only case, ``Portal.open_stream_from()`` should
|
# In the receive-only case, ``Portal.open_stream_from()`` used
|
||||||
# call this explicitly on teardown but additionally if for some
|
# to rely on this call explicitly on teardown such that a new
|
||||||
# reason stream consumer code tries to manually receive a new
|
# call to ``.receive()`` after ``rx_chan`` had been closed, would
|
||||||
|
# result in us raising a ``trio.EndOfChannel`` (since we
|
||||||
|
# remapped the ``trio.ClosedResourceError`). However, now if for some
|
||||||
|
# reason the stream's consumer code tries to manually receive a new
|
||||||
# value before ``.aclose()`` is called **but** the far end has
|
# value before ``.aclose()`` is called **but** the far end has
|
||||||
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in
|
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in
|
||||||
# order to avoid an infinite hang on ``.__anext__()``. So we can
|
# order to avoid an infinite hang on ``.__anext__()``; this is
|
||||||
# instead uncomment this check and close the underlying msg-loop
|
# why we added ``self._eoc`` to denote stream closure indepedent
|
||||||
# mem chan below and not then **not** check for ``self._eoc`` in
|
# of ``rx_chan``.
|
||||||
# ``.receive()`` (if for some reason we think that check is
|
|
||||||
# a bottle neck - not likely) such that the
|
|
||||||
# ``trio.ClosedResourceError`` would instead trigger the
|
|
||||||
# ``trio.EndOfChannel`` in ``.receive()`` (as it originally was
|
|
||||||
# before bi-dir streaming support).
|
|
||||||
|
|
||||||
# if not isinstance(self, MsgStream):
|
# In theory we could still use this old method and close the
|
||||||
# await rx_chan.aclose()
|
# underlying msg-loop mem chan as above and then **not** check
|
||||||
|
# for ``self._eoc`` in ``.receive()`` (if for some reason we
|
||||||
|
# think that check is a bottle neck - not likely) **but** then
|
||||||
|
# we would need to map the resulting
|
||||||
|
# ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in
|
||||||
|
# ``.receive()`` (as it originally was before bi-dir streaming
|
||||||
|
# support) in order to trigger stream closure. The old behaviour
|
||||||
|
# is arguably more confusing since we lose detection of the
|
||||||
|
# runtime's closure of ``rx_chan`` in the case where we may
|
||||||
|
# still need to consume msgs that are "in transit" from the far
|
||||||
|
# end (eg. for ``Context.result()``).
|
||||||
|
|
||||||
# TODO: but make it broadcasting to consumers
|
# TODO: but make it broadcasting to consumers
|
||||||
# def clone(self):
|
# def clone(self):
|
||||||
|
@ -251,6 +277,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||||
self,
|
self,
|
||||||
data: Any
|
data: Any
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''Send a message over this stream to the far end.
|
||||||
|
|
||||||
|
'''
|
||||||
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
||||||
|
|
||||||
|
|
||||||
|
@ -409,7 +438,8 @@ class Context:
|
||||||
yield rchan
|
yield rchan
|
||||||
|
|
||||||
except trio.EndOfChannel:
|
except trio.EndOfChannel:
|
||||||
# stream iteration stop signal
|
# likely the far end sent us a 'stop' message to
|
||||||
|
# terminate the stream.
|
||||||
raise
|
raise
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in New Issue