Set stream "end of channel" after shielded check!
Another face palm that was causing serious issues for code that is using the `.shielded` feature.. Add a bunch more detailed comments for all this subtlety and hopefully get it right once and for all. Also aggregated the `trio` errors that should trigger closure inside `.aclose()`, hopefully that's right too.transport_hardening
							parent
							
								
									23dabb9502
								
							
						
					
					
						commit
						c92fc33b7c
					
				|  | @ -61,7 +61,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|         return msg['yield'] |         return msg['yield'] | ||||||
| 
 | 
 | ||||||
|     async def receive(self): |     async def receive(self): | ||||||
|         # see ``.aclose()`` for an alt to always checking this |         # see ``.aclose()`` for notes on the old behaviour prior to | ||||||
|  |         # introducing this | ||||||
|         if self._eoc: |         if self._eoc: | ||||||
|             raise trio.EndOfChannel |             raise trio.EndOfChannel | ||||||
| 
 | 
 | ||||||
|  | @ -81,12 +82,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|             if msg.get('stop'): |             if msg.get('stop'): | ||||||
|                 log.debug(f"{self} was stopped at remote end") |                 log.debug(f"{self} was stopped at remote end") | ||||||
| 
 | 
 | ||||||
|                 # when the send is closed we assume the stream has |                 # # when the send is closed we assume the stream has | ||||||
|                 # terminated and signal this local iterator to stop |                 # # terminated and signal this local iterator to stop | ||||||
|                 await self.aclose() |                 # await self.aclose() | ||||||
| 
 | 
 | ||||||
|                 # XXX: this causes ``ReceiveChannel.__anext__()`` to |                 # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||||
|                 # raise a ``StopAsyncIteration``. |                 # raise a ``StopAsyncIteration`` **and** in our catch | ||||||
|  |                 # block below it will trigger ``.aclose()``. | ||||||
|                 raise trio.EndOfChannel |                 raise trio.EndOfChannel | ||||||
| 
 | 
 | ||||||
|             # TODO: test that shows stream raising an expected error!!! |             # TODO: test that shows stream raising an expected error!!! | ||||||
|  | @ -97,24 +99,34 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|             else: |             else: | ||||||
|                 raise |                 raise | ||||||
| 
 | 
 | ||||||
|         except trio.ClosedResourceError: |         except ( | ||||||
|             # XXX: this indicates that a `stop` message was |             trio.ClosedResourceError,  # by self._rx_chan | ||||||
|             # sent by the far side of the underlying channel. |             trio.EndOfChannel,  # by self._rx_chan or `stop` msg from far end | ||||||
|             # Currently this is triggered by calling ``.aclose()`` on |             trio.Cancelled,  # by local cancellation | ||||||
|  |         ): | ||||||
|  |             # XXX: we close the stream on any of these error conditions: | ||||||
|  | 
 | ||||||
|  |             # a ``ClosedResourceError`` indicates that the internal | ||||||
|  |             # feeder memory receive channel was closed likely by the | ||||||
|  |             # runtime after the associated transport-channel | ||||||
|  |             # disconnected or broke. | ||||||
|  | 
 | ||||||
|  |             # an ``EndOfChannel`` indicates either the internal recv | ||||||
|  |             # memchan exhausted **or** we raisesd it just above after | ||||||
|  |             # receiving a `stop` message from the far end of the stream. | ||||||
|  | 
 | ||||||
|  |             # Previously this was triggered by calling ``.aclose()`` on | ||||||
|             # the send side of the channel inside |             # the send side of the channel inside | ||||||
|             # ``Actor._push_result()``, but maybe it should be put here? |             # ``Actor._push_result()`` (should still be commented code | ||||||
|             # to avoid exposing the internal mem chan closing mechanism? |             # there - which should eventually get removed), but now the | ||||||
|             # in theory we could instead do some flushing of the channel |             # 'stop' message handling has been put just above. | ||||||
|             # if needed to ensure all consumers are complete before |  | ||||||
|             # triggering closure too early? |  | ||||||
| 
 | 
 | ||||||
|             # Locally, we want to close this stream gracefully, by |             # TODO: Locally, we want to close this stream gracefully, by | ||||||
|             # terminating any local consumers tasks deterministically. |             # terminating any local consumers tasks deterministically. | ||||||
|             # We **don't** want to be closing this send channel and not |             # One we have broadcast support, we **don't** want to be | ||||||
|             # relaying a final value to remaining consumers who may not |             # closing this stream and not flushing a final value to | ||||||
|             # have been scheduled to receive it yet? |             # remaining (clone) consumers who may not have been | ||||||
| 
 |             # scheduled to receive it yet. | ||||||
|             # lots of testing to do here |  | ||||||
| 
 | 
 | ||||||
|             # when the send is closed we assume the stream has |             # when the send is closed we assume the stream has | ||||||
|             # terminated and signal this local iterator to stop |             # terminated and signal this local iterator to stop | ||||||
|  | @ -122,10 +134,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
| 
 | 
 | ||||||
|             raise  # propagate |             raise  # propagate | ||||||
| 
 | 
 | ||||||
|         except trio.Cancelled: |         # except trio.Cancelled: | ||||||
|             # relay cancels to the remote task |         #     # relay cancels to the remote task | ||||||
|             await self.aclose() |         #     await self.aclose() | ||||||
|             raise |         #     raise | ||||||
| 
 | 
 | ||||||
|     @contextmanager |     @contextmanager | ||||||
|     def shield( |     def shield( | ||||||
|  | @ -149,8 +161,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|         on close. |         on close. | ||||||
| 
 | 
 | ||||||
|         """ |         """ | ||||||
|         self._eoc = True |  | ||||||
| 
 |  | ||||||
|         # XXX: keep proper adherance to trio's `.aclose()` semantics: |         # XXX: keep proper adherance to trio's `.aclose()` semantics: | ||||||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose |         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||||
|         rx_chan = self._rx_chan |         rx_chan = self._rx_chan | ||||||
|  | @ -175,6 +185,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|             log.warning(f"{self} is shielded, portal channel being kept alive") |             log.warning(f"{self} is shielded, portal channel being kept alive") | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|  |         # XXX: This must be set **AFTER** the shielded test above! | ||||||
|  |         self._eoc = True | ||||||
|  | 
 | ||||||
|         # NOTE: this is super subtle IPC messaging stuff: |         # NOTE: this is super subtle IPC messaging stuff: | ||||||
|         # Relay stop iteration to far end **iff** we're |         # Relay stop iteration to far end **iff** we're | ||||||
|         # in bidirectional mode. If we're only streaming |         # in bidirectional mode. If we're only streaming | ||||||
|  | @ -186,9 +199,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|         # isn't expecting messages to be sent by the caller. |         # isn't expecting messages to be sent by the caller. | ||||||
|         # Thus, we must check that this context DOES NOT |         # Thus, we must check that this context DOES NOT | ||||||
|         # have a portal reference to ensure this is indeed the callee |         # have a portal reference to ensure this is indeed the callee | ||||||
|         # side and can relay a 'stop'. In the bidirectional case, |         # side and can relay a 'stop'. | ||||||
|         # `Context.open_stream()` will create the `Actor._cids2qs` | 
 | ||||||
|         # entry from a call to `Actor.get_memchans()`. |         # In the bidirectional case, `Context.open_stream()` will create | ||||||
|  |         # the `Actor._cids2qs` entry from a call to | ||||||
|  |         # `Actor.get_memchans()` and will send the stop message in | ||||||
|  |         # ``__aexit__()`` on teardown so it **does not** need to be | ||||||
|  |         # called here. | ||||||
|         if not self._ctx._portal: |         if not self._ctx._portal: | ||||||
|             try: |             try: | ||||||
|                 # only for 2 way streams can we can send |                 # only for 2 way streams can we can send | ||||||
|  | @ -201,32 +218,41 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|                 # it can't traverse the transport. |                 # it can't traverse the transport. | ||||||
|                 log.debug(f'Channel for {self} was already closed') |                 log.debug(f'Channel for {self} was already closed') | ||||||
| 
 | 
 | ||||||
|         # close the local mem chan??!? |         # close the local mem chan ``self._rx_chan`` ??!? | ||||||
| 
 | 
 | ||||||
|         # NOT if we're a ``MsgStream``! |         # DEFINITELY NOT if we're a bi-dir ``MsgStream``! | ||||||
|         # BECAUSE this same core-msg-loop mem recv-chan is used to deliver |         # BECAUSE this same core-msg-loop mem recv-chan is used to deliver | ||||||
|         # the potential final result from the surrounding inter-actor |         # the potential final result from the surrounding inter-actor | ||||||
|         # `Context` so we don't want to close it until that context has |         # `Context` so we don't want to close it until that context has | ||||||
|         # run to completion. |         # run to completion. | ||||||
| 
 | 
 | ||||||
|         # XXX: Notes on old behaviour. |         # XXX: Notes on old behaviour: | ||||||
|  |         # await rx_chan.aclose() | ||||||
| 
 | 
 | ||||||
|         # In the receive-only case, ``Portal.open_stream_from()`` should |         # In the receive-only case, ``Portal.open_stream_from()`` used | ||||||
|         # call this explicitly on teardown but additionally if for some |         # to rely on this call explicitly on teardown such that a new | ||||||
|         # reason stream consumer code tries to manually receive a new |         # call to ``.receive()`` after ``rx_chan`` had been closed, would | ||||||
|  |         # result in us raising a ``trio.EndOfChannel`` (since we | ||||||
|  |         # remapped the ``trio.ClosedResourceError`). However, now if for some | ||||||
|  |         # reason the stream's consumer code tries to manually receive a new | ||||||
|         # value before ``.aclose()`` is called **but** the far end has |         # value before ``.aclose()`` is called **but** the far end has | ||||||
|         # stopped `.receive()` **must** raise ``trio.EndofChannel`` in |         # stopped `.receive()` **must** raise ``trio.EndofChannel`` in | ||||||
|         # order to avoid an infinite hang on ``.__anext__()``. So we can |         # order to avoid an infinite hang on ``.__anext__()``; this is | ||||||
|         # instead uncomment this check and close the underlying msg-loop |         # why we added ``self._eoc`` to denote stream closure indepedent | ||||||
|         # mem chan below and not then **not** check for ``self._eoc`` in |         # of ``rx_chan``. | ||||||
|         # ``.receive()`` (if for some reason we think that check is |  | ||||||
|         # a bottle neck - not likely) such that the |  | ||||||
|         # ``trio.ClosedResourceError`` would instead trigger the |  | ||||||
|         # ``trio.EndOfChannel`` in ``.receive()`` (as it originally was |  | ||||||
|         # before bi-dir streaming support). |  | ||||||
| 
 | 
 | ||||||
|         # if not isinstance(self, MsgStream): |         # In theory we could still use this old method and close the | ||||||
|         #     await rx_chan.aclose() |         # underlying msg-loop mem chan as above and then **not** check | ||||||
|  |         # for ``self._eoc`` in ``.receive()`` (if for some reason we | ||||||
|  |         # think that check is a bottle neck - not likely) **but** then | ||||||
|  |         # we would need to map the resulting | ||||||
|  |         # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in | ||||||
|  |         # ``.receive()`` (as it originally was before bi-dir streaming | ||||||
|  |         # support) in order to trigger stream closure. The old behaviour | ||||||
|  |         # is arguably more confusing since we lose detection of the | ||||||
|  |         # runtime's closure of ``rx_chan`` in the case where we may | ||||||
|  |         # still need to consume msgs that are "in transit" from the far | ||||||
|  |         # end (eg. for ``Context.result()``). | ||||||
| 
 | 
 | ||||||
|     # TODO: but make it broadcasting to consumers |     # TODO: but make it broadcasting to consumers | ||||||
|     # def clone(self): |     # def clone(self): | ||||||
|  | @ -251,6 +277,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||||
|         self, |         self, | ||||||
|         data: Any |         data: Any | ||||||
|     ) -> None: |     ) -> None: | ||||||
|  |         '''Send a message over this stream to the far end. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) |         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -409,7 +438,8 @@ class Context: | ||||||
|                 yield rchan |                 yield rchan | ||||||
| 
 | 
 | ||||||
|             except trio.EndOfChannel: |             except trio.EndOfChannel: | ||||||
|                 # stream iteration stop signal |                 # likely the far end sent us a 'stop' message to | ||||||
|  |                 # terminate the stream. | ||||||
|                 raise |                 raise | ||||||
| 
 | 
 | ||||||
|             else: |             else: | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue