Set stream "end of channel" after shielded check!
Another face palm that was causing serious issues for code that is using the `.shielded` feature.. Add a bunch more detailed comments for all this subtlety and hopefully get it right once and for all. Also aggregated the `trio` errors that should trigger closure inside `.aclose()`, hopefully that's right too.round_2_ci_windows
							parent
							
								
									3d633408fc
								
							
						
					
					
						commit
						1703171bea
					
				|  | @ -61,7 +61,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         return msg['yield'] | ||||
| 
 | ||||
|     async def receive(self): | ||||
|         # see ``.aclose()`` for an alt to always checking this | ||||
|         # see ``.aclose()`` for notes on the old behaviour prior to | ||||
|         # introducing this | ||||
|         if self._eoc: | ||||
|             raise trio.EndOfChannel | ||||
| 
 | ||||
|  | @ -81,12 +82,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|             if msg.get('stop'): | ||||
|                 log.debug(f"{self} was stopped at remote end") | ||||
| 
 | ||||
|                 # when the send is closed we assume the stream has | ||||
|                 # terminated and signal this local iterator to stop | ||||
|                 await self.aclose() | ||||
|                 # # when the send is closed we assume the stream has | ||||
|                 # # terminated and signal this local iterator to stop | ||||
|                 # await self.aclose() | ||||
| 
 | ||||
|                 # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|                 # raise a ``StopAsyncIteration``. | ||||
|                 # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|                 # block below it will trigger ``.aclose()``. | ||||
|                 raise trio.EndOfChannel | ||||
| 
 | ||||
|             # TODO: test that shows stream raising an expected error!!! | ||||
|  | @ -97,24 +99,34 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         except trio.ClosedResourceError: | ||||
|             # XXX: this indicates that a `stop` message was | ||||
|             # sent by the far side of the underlying channel. | ||||
|             # Currently this is triggered by calling ``.aclose()`` on | ||||
|         except ( | ||||
|             trio.ClosedResourceError,  # by self._rx_chan | ||||
|             trio.EndOfChannel,  # by self._rx_chan or `stop` msg from far end | ||||
|             trio.Cancelled,  # by local cancellation | ||||
|         ): | ||||
|             # XXX: we close the stream on any of these error conditions: | ||||
| 
 | ||||
|             # a ``ClosedResourceError`` indicates that the internal | ||||
|             # feeder memory receive channel was closed likely by the | ||||
|             # runtime after the associated transport-channel | ||||
|             # disconnected or broke. | ||||
| 
 | ||||
|             # an ``EndOfChannel`` indicates either the internal recv | ||||
|             # memchan exhausted **or** we raisesd it just above after | ||||
|             # receiving a `stop` message from the far end of the stream. | ||||
| 
 | ||||
|             # Previously this was triggered by calling ``.aclose()`` on | ||||
|             # the send side of the channel inside | ||||
|             # ``Actor._push_result()``, but maybe it should be put here? | ||||
|             # to avoid exposing the internal mem chan closing mechanism? | ||||
|             # in theory we could instead do some flushing of the channel | ||||
|             # if needed to ensure all consumers are complete before | ||||
|             # triggering closure too early? | ||||
|             # ``Actor._push_result()`` (should still be commented code | ||||
|             # there - which should eventually get removed), but now the | ||||
|             # 'stop' message handling has been put just above. | ||||
| 
 | ||||
|             # Locally, we want to close this stream gracefully, by | ||||
|             # TODO: Locally, we want to close this stream gracefully, by | ||||
|             # terminating any local consumers tasks deterministically. | ||||
|             # We **don't** want to be closing this send channel and not | ||||
|             # relaying a final value to remaining consumers who may not | ||||
|             # have been scheduled to receive it yet? | ||||
| 
 | ||||
|             # lots of testing to do here | ||||
|             # One we have broadcast support, we **don't** want to be | ||||
|             # closing this stream and not flushing a final value to | ||||
|             # remaining (clone) consumers who may not have been | ||||
|             # scheduled to receive it yet. | ||||
| 
 | ||||
|             # when the send is closed we assume the stream has | ||||
|             # terminated and signal this local iterator to stop | ||||
|  | @ -122,10 +134,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
| 
 | ||||
|             raise  # propagate | ||||
| 
 | ||||
|         except trio.Cancelled: | ||||
|             # relay cancels to the remote task | ||||
|             await self.aclose() | ||||
|             raise | ||||
|         # except trio.Cancelled: | ||||
|         #     # relay cancels to the remote task | ||||
|         #     await self.aclose() | ||||
|         #     raise | ||||
| 
 | ||||
|     @contextmanager | ||||
|     def shield( | ||||
|  | @ -149,8 +161,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         on close. | ||||
| 
 | ||||
|         """ | ||||
|         self._eoc = True | ||||
| 
 | ||||
|         # XXX: keep proper adherance to trio's `.aclose()` semantics: | ||||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|         rx_chan = self._rx_chan | ||||
|  | @ -175,6 +185,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|             log.warning(f"{self} is shielded, portal channel being kept alive") | ||||
|             return | ||||
| 
 | ||||
|         # XXX: This must be set **AFTER** the shielded test above! | ||||
|         self._eoc = True | ||||
| 
 | ||||
|         # NOTE: this is super subtle IPC messaging stuff: | ||||
|         # Relay stop iteration to far end **iff** we're | ||||
|         # in bidirectional mode. If we're only streaming | ||||
|  | @ -186,9 +199,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         # isn't expecting messages to be sent by the caller. | ||||
|         # Thus, we must check that this context DOES NOT | ||||
|         # have a portal reference to ensure this is indeed the callee | ||||
|         # side and can relay a 'stop'. In the bidirectional case, | ||||
|         # `Context.open_stream()` will create the `Actor._cids2qs` | ||||
|         # entry from a call to `Actor.get_memchans()`. | ||||
|         # side and can relay a 'stop'. | ||||
| 
 | ||||
|         # In the bidirectional case, `Context.open_stream()` will create | ||||
|         # the `Actor._cids2qs` entry from a call to | ||||
|         # `Actor.get_memchans()` and will send the stop message in | ||||
|         # ``__aexit__()`` on teardown so it **does not** need to be | ||||
|         # called here. | ||||
|         if not self._ctx._portal: | ||||
|             try: | ||||
|                 # only for 2 way streams can we can send | ||||
|  | @ -201,32 +218,41 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|                 # it can't traverse the transport. | ||||
|                 log.debug(f'Channel for {self} was already closed') | ||||
| 
 | ||||
|         # close the local mem chan??!? | ||||
|         # close the local mem chan ``self._rx_chan`` ??!? | ||||
| 
 | ||||
|         # NOT if we're a ``MsgStream``! | ||||
|         # DEFINITELY NOT if we're a bi-dir ``MsgStream``! | ||||
|         # BECAUSE this same core-msg-loop mem recv-chan is used to deliver | ||||
|         # the potential final result from the surrounding inter-actor | ||||
|         # `Context` so we don't want to close it until that context has | ||||
|         # run to completion. | ||||
| 
 | ||||
|         # XXX: Notes on old behaviour. | ||||
|         # XXX: Notes on old behaviour: | ||||
|         # await rx_chan.aclose() | ||||
| 
 | ||||
|         # In the receive-only case, ``Portal.open_stream_from()`` should | ||||
|         # call this explicitly on teardown but additionally if for some | ||||
|         # reason stream consumer code tries to manually receive a new | ||||
|         # In the receive-only case, ``Portal.open_stream_from()`` used | ||||
|         # to rely on this call explicitly on teardown such that a new | ||||
|         # call to ``.receive()`` after ``rx_chan`` had been closed, would | ||||
|         # result in us raising a ``trio.EndOfChannel`` (since we | ||||
|         # remapped the ``trio.ClosedResourceError`). However, now if for some | ||||
|         # reason the stream's consumer code tries to manually receive a new | ||||
|         # value before ``.aclose()`` is called **but** the far end has | ||||
|         # stopped `.receive()` **must** raise ``trio.EndofChannel`` in | ||||
|         # order to avoid an infinite hang on ``.__anext__()``. So we can | ||||
|         # instead uncomment this check and close the underlying msg-loop | ||||
|         # mem chan below and not then **not** check for ``self._eoc`` in | ||||
|         # ``.receive()`` (if for some reason we think that check is | ||||
|         # a bottle neck - not likely) such that the | ||||
|         # ``trio.ClosedResourceError`` would instead trigger the | ||||
|         # ``trio.EndOfChannel`` in ``.receive()`` (as it originally was | ||||
|         # before bi-dir streaming support). | ||||
|         # order to avoid an infinite hang on ``.__anext__()``; this is | ||||
|         # why we added ``self._eoc`` to denote stream closure indepedent | ||||
|         # of ``rx_chan``. | ||||
| 
 | ||||
|         # if not isinstance(self, MsgStream): | ||||
|         #     await rx_chan.aclose() | ||||
|         # In theory we could still use this old method and close the | ||||
|         # underlying msg-loop mem chan as above and then **not** check | ||||
|         # for ``self._eoc`` in ``.receive()`` (if for some reason we | ||||
|         # think that check is a bottle neck - not likely) **but** then | ||||
|         # we would need to map the resulting | ||||
|         # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in | ||||
|         # ``.receive()`` (as it originally was before bi-dir streaming | ||||
|         # support) in order to trigger stream closure. The old behaviour | ||||
|         # is arguably more confusing since we lose detection of the | ||||
|         # runtime's closure of ``rx_chan`` in the case where we may | ||||
|         # still need to consume msgs that are "in transit" from the far | ||||
|         # end (eg. for ``Context.result()``). | ||||
| 
 | ||||
|     # TODO: but make it broadcasting to consumers | ||||
|     # def clone(self): | ||||
|  | @ -251,6 +277,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): | |||
|         self, | ||||
|         data: Any | ||||
|     ) -> None: | ||||
|         '''Send a message over this stream to the far end. | ||||
| 
 | ||||
|         ''' | ||||
|         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -409,7 +438,8 @@ class Context: | |||
|                 yield rchan | ||||
| 
 | ||||
|             except trio.EndOfChannel: | ||||
|                 # stream iteration stop signal | ||||
|                 # likely the far end sent us a 'stop' message to | ||||
|                 # terminate the stream. | ||||
|                 raise | ||||
| 
 | ||||
|             else: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue