forked from goodboy/tractor
Set stream "end of channel" after shielded check!
Another face palm that was causing serious issues for code that is using the `.shielded` feature.. Add a bunch more detailed comments for all this subtlety and hopefully get it right once and for all. Also aggregated the `trio` errors that should trigger closure inside `.aclose()`, hopefully that's right too.prehardkill
parent
186968d3e7
commit
f7b7c3fb93
|
@ -61,7 +61,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
return msg['yield']
|
||||
|
||||
async def receive(self):
|
||||
# see ``.aclose()`` for an alt to always checking this
|
||||
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||
# introducing this
|
||||
if self._eoc:
|
||||
raise trio.EndOfChannel
|
||||
|
||||
|
@ -81,12 +82,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
if msg.get('stop'):
|
||||
log.debug(f"{self} was stopped at remote end")
|
||||
|
||||
# when the send is closed we assume the stream has
|
||||
# terminated and signal this local iterator to stop
|
||||
await self.aclose()
|
||||
# # when the send is closed we assume the stream has
|
||||
# # terminated and signal this local iterator to stop
|
||||
# await self.aclose()
|
||||
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration``.
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
raise trio.EndOfChannel
|
||||
|
||||
# TODO: test that shows stream raising an expected error!!!
|
||||
|
@ -97,44 +99,45 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
else:
|
||||
raise
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
# XXX: this indicates that a `stop` message was
|
||||
# sent by the far side of the underlying channel.
|
||||
# Currently this is triggered by calling ``.aclose()`` on
|
||||
except (
|
||||
trio.ClosedResourceError, # by self._rx_chan
|
||||
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
||||
trio.Cancelled, # by local cancellation
|
||||
):
|
||||
# XXX: we close the stream on any of these error conditions:
|
||||
|
||||
# a ``ClosedResourceError`` indicates that the internal
|
||||
# feeder memory receive channel was closed likely by the
|
||||
# runtime after the associated transport-channel
|
||||
# disconnected or broke.
|
||||
|
||||
# an ``EndOfChannel`` indicates either the internal recv
|
||||
# memchan exhausted **or** we raisesd it just above after
|
||||
# receiving a `stop` message from the far end of the stream.
|
||||
|
||||
# Previously this was triggered by calling ``.aclose()`` on
|
||||
# the send side of the channel inside
|
||||
# ``Actor._push_result()``, but maybe it should be put here?
|
||||
# to avoid exposing the internal mem chan closing mechanism?
|
||||
# in theory we could instead do some flushing of the channel
|
||||
# if needed to ensure all consumers are complete before
|
||||
# triggering closure too early?
|
||||
# ``Actor._push_result()`` (should still be commented code
|
||||
# there - which should eventually get removed), but now the
|
||||
# 'stop' message handling has been put just above.
|
||||
|
||||
# Locally, we want to close this stream gracefully, by
|
||||
# TODO: Locally, we want to close this stream gracefully, by
|
||||
# terminating any local consumers tasks deterministically.
|
||||
# We **don't** want to be closing this send channel and not
|
||||
# relaying a final value to remaining consumers who may not
|
||||
# have been scheduled to receive it yet?
|
||||
|
||||
# lots of testing to do here
|
||||
# One we have broadcast support, we **don't** want to be
|
||||
# closing this stream and not flushing a final value to
|
||||
# remaining (clone) consumers who may not have been
|
||||
# scheduled to receive it yet.
|
||||
|
||||
# when the send is closed we assume the stream has
|
||||
# terminated and signal this local iterator to stop
|
||||
await self.aclose()
|
||||
|
||||
raise trio.EndOfChannel
|
||||
raise # propagate
|
||||
|
||||
# if not isinstance(self, MsgStream):
|
||||
# # XXX: this was how we handled this originally for the
|
||||
# # single direction case?
|
||||
# raise trio.EndOfChannel
|
||||
|
||||
# else:
|
||||
# # in 2-way case raise the closed error
|
||||
# raise # propagate
|
||||
|
||||
except trio.Cancelled:
|
||||
# relay cancels to the remote task
|
||||
await self.aclose()
|
||||
raise
|
||||
# except trio.Cancelled:
|
||||
# # relay cancels to the remote task
|
||||
# await self.aclose()
|
||||
# raise
|
||||
|
||||
@contextmanager
|
||||
def shield(
|
||||
|
@ -158,8 +161,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
on close.
|
||||
|
||||
"""
|
||||
self._eoc = True
|
||||
|
||||
# XXX: keep proper adherance to trio's `.aclose()` semantics:
|
||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||
rx_chan = self._rx_chan
|
||||
|
@ -184,6 +185,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
log.warning(f"{self} is shielded, portal channel being kept alive")
|
||||
return
|
||||
|
||||
# XXX: This must be set **AFTER** the shielded test above!
|
||||
self._eoc = True
|
||||
|
||||
# NOTE: this is super subtle IPC messaging stuff:
|
||||
# Relay stop iteration to far end **iff** we're
|
||||
# in bidirectional mode. If we're only streaming
|
||||
|
@ -195,9 +199,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
# isn't expecting messages to be sent by the caller.
|
||||
# Thus, we must check that this context DOES NOT
|
||||
# have a portal reference to ensure this is indeed the callee
|
||||
# side and can relay a 'stop'. In the bidirectional case,
|
||||
# `Context.open_stream()` will create the `Actor._cids2qs`
|
||||
# entry from a call to `Actor.get_memchans()`.
|
||||
# side and can relay a 'stop'.
|
||||
|
||||
# In the bidirectional case, `Context.open_stream()` will create
|
||||
# the `Actor._cids2qs` entry from a call to
|
||||
# `Actor.get_memchans()` and will send the stop message in
|
||||
# ``__aexit__()`` on teardown so it **does not** need to be
|
||||
# called here.
|
||||
if not self._ctx._portal:
|
||||
try:
|
||||
# only for 2 way streams can we can send
|
||||
|
@ -210,32 +218,41 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
# it can't traverse the transport.
|
||||
log.debug(f'Channel for {self} was already closed')
|
||||
|
||||
# close the local mem chan??!?
|
||||
# close the local mem chan ``self._rx_chan`` ??!?
|
||||
|
||||
# NOT if we're a ``MsgStream``!
|
||||
# DEFINITELY NOT if we're a bi-dir ``MsgStream``!
|
||||
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
||||
# the potential final result from the surrounding inter-actor
|
||||
# `Context` so we don't want to close it until that context has
|
||||
# run to completion.
|
||||
|
||||
# XXX: Notes on old behaviour.
|
||||
# XXX: Notes on old behaviour:
|
||||
# await rx_chan.aclose()
|
||||
|
||||
# In the receive-only case, ``Portal.open_stream_from()`` should
|
||||
# call this explicitly on teardown but additionally if for some
|
||||
# reason stream consumer code tries to manually receive a new
|
||||
# In the receive-only case, ``Portal.open_stream_from()`` used
|
||||
# to rely on this call explicitly on teardown such that a new
|
||||
# call to ``.receive()`` after ``rx_chan`` had been closed, would
|
||||
# result in us raising a ``trio.EndOfChannel`` (since we
|
||||
# remapped the ``trio.ClosedResourceError`). However, now if for some
|
||||
# reason the stream's consumer code tries to manually receive a new
|
||||
# value before ``.aclose()`` is called **but** the far end has
|
||||
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in
|
||||
# order to avoid an infinite hang on ``.__anext__()``. So we can
|
||||
# instead uncomment this check and close the underlying msg-loop
|
||||
# mem chan below and not then **not** check for ``self._eoc`` in
|
||||
# ``.receive()`` (if for some reason we think that check is
|
||||
# a bottle neck - not likely) such that the
|
||||
# ``trio.ClosedResourceError`` would instead trigger the
|
||||
# ``trio.EndOfChannel`` in ``.receive()`` (as it originally was
|
||||
# before bi-dir streaming support).
|
||||
# order to avoid an infinite hang on ``.__anext__()``; this is
|
||||
# why we added ``self._eoc`` to denote stream closure indepedent
|
||||
# of ``rx_chan``.
|
||||
|
||||
# if not isinstance(self, MsgStream):
|
||||
# await rx_chan.aclose()
|
||||
# In theory we could still use this old method and close the
|
||||
# underlying msg-loop mem chan as above and then **not** check
|
||||
# for ``self._eoc`` in ``.receive()`` (if for some reason we
|
||||
# think that check is a bottle neck - not likely) **but** then
|
||||
# we would need to map the resulting
|
||||
# ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in
|
||||
# ``.receive()`` (as it originally was before bi-dir streaming
|
||||
# support) in order to trigger stream closure. The old behaviour
|
||||
# is arguably more confusing since we lose detection of the
|
||||
# runtime's closure of ``rx_chan`` in the case where we may
|
||||
# still need to consume msgs that are "in transit" from the far
|
||||
# end (eg. for ``Context.result()``).
|
||||
|
||||
# TODO: but make it broadcasting to consumers
|
||||
# def clone(self):
|
||||
|
@ -260,6 +277,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
|||
self,
|
||||
data: Any
|
||||
) -> None:
|
||||
'''Send a message over this stream to the far end.
|
||||
|
||||
'''
|
||||
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
||||
|
||||
|
||||
|
@ -418,7 +438,8 @@ class Context:
|
|||
yield rchan
|
||||
|
||||
except trio.EndOfChannel:
|
||||
# stream iteration stop signal
|
||||
# likely the far end sent us a 'stop' message to
|
||||
# terminate the stream.
|
||||
raise
|
||||
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue