Explicitly formalize context/streaming teardown
Add clear teardown semantics for `Context` such that the remote side cancellation propagation happens only on error or if client code explicitly requests it (either by exit flag to `Portal.open_context()` or by manually calling `Context.cancel()`). Add `Context.result()` to wait on and capture the final result from a remote context function; any lingering msg sequence will be consumed/discarded. Changes in order to make this possible: - pass the runtime msg loop's feeder receive channel in to the context on the calling (portal opening) side such that a final 'return' msg can be waited upon using `Context.result()` which delivers the final return value from the callee side `@tractor.context` async function. - always await a final result from the target context function in `Portal.open_context()`'s `__aexit__()` if the context has not been (requested to be) cancelled by client code on block exit. - add an internal `Context._cancel_called` for context "cancel requested" tracking (much like `trio`'s cancel scope). - allow flagging a stream as terminated using an internal `._eoc` flag which will mark the stream as stopped for iteration. - drop `StopAsyncIteration` catching in `.receive()`; it does nothing.transport_hardening
							parent
							
								
									0e6f017929
								
							
						
					
					
						commit
						a146034cb7
					
				|  | @ -17,7 +17,12 @@ from async_generator import asynccontextmanager | ||||||
| from ._state import current_actor | from ._state import current_actor | ||||||
| from ._ipc import Channel | from ._ipc import Channel | ||||||
| from .log import get_logger | from .log import get_logger | ||||||
| from ._exceptions import unpack_error, NoResult, RemoteActorError | from ._exceptions import ( | ||||||
|  |     unpack_error, | ||||||
|  |     NoResult, | ||||||
|  |     RemoteActorError, | ||||||
|  |     ContextCancelled, | ||||||
|  | ) | ||||||
| from ._streaming import Context, ReceiveMsgStream | from ._streaming import Context, ReceiveMsgStream | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -84,7 +89,7 @@ class Portal: | ||||||
|         ns: str, |         ns: str, | ||||||
|         func: str, |         func: str, | ||||||
|         kwargs, |         kwargs, | ||||||
|     ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: |     ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: | ||||||
|         """Submit a function to be scheduled and run by actor, return the |         """Submit a function to be scheduled and run by actor, return the | ||||||
|         associated caller id, response queue, response type str, |         associated caller id, response queue, response type str, | ||||||
|         first message packet as a tuple. |         first message packet as a tuple. | ||||||
|  | @ -327,7 +332,14 @@ class Portal: | ||||||
|             # message right now since there shouldn't be a reason to |             # message right now since there shouldn't be a reason to | ||||||
|             # stop and restart the stream, right? |             # stop and restart the stream, right? | ||||||
|             try: |             try: | ||||||
|  | 
 | ||||||
|  |                 # We are for sure done with this stream and no more | ||||||
|  |                 # messages are expected to be delivered from the | ||||||
|  |                 # runtime's msg loop. | ||||||
|  |                 await recv_chan.aclose() | ||||||
|  | 
 | ||||||
|                 await ctx.cancel() |                 await ctx.cancel() | ||||||
|  | 
 | ||||||
|             except trio.ClosedResourceError: |             except trio.ClosedResourceError: | ||||||
|                 # if the far end terminates before we send a cancel the |                 # if the far end terminates before we send a cancel the | ||||||
|                 # underlying transport-channel may already be closed. |                 # underlying transport-channel may already be closed. | ||||||
|  | @ -337,18 +349,21 @@ class Portal: | ||||||
| 
 | 
 | ||||||
|     @asynccontextmanager |     @asynccontextmanager | ||||||
|     async def open_context( |     async def open_context( | ||||||
|  | 
 | ||||||
|         self, |         self, | ||||||
|         func: Callable, |         func: Callable, | ||||||
|  |         cancel_on_exit: bool = False, | ||||||
|         **kwargs, |         **kwargs, | ||||||
|  | 
 | ||||||
|     ) -> AsyncGenerator[Tuple[Context, Any], None]: |     ) -> AsyncGenerator[Tuple[Context, Any], None]: | ||||||
|         """Open an inter-actor task context. |         '''Open an inter-actor task context. | ||||||
| 
 | 
 | ||||||
|         This is a synchronous API which allows for deterministic |         This is a synchronous API which allows for deterministic | ||||||
|         setup/teardown of a remote task. The yielded ``Context`` further |         setup/teardown of a remote task. The yielded ``Context`` further | ||||||
|         allows for opening bidirectional streams - see |         allows for opening bidirectional streams, explicit cancellation | ||||||
|         ``Context.open_stream()``. |         and synchronized final result collection. See ``tractor.Context``. | ||||||
| 
 | 
 | ||||||
|         """ |         ''' | ||||||
|         # conduct target func method structural checks |         # conduct target func method structural checks | ||||||
|         if not inspect.iscoroutinefunction(func) and ( |         if not inspect.iscoroutinefunction(func) and ( | ||||||
|             getattr(func, '_tractor_contex_function', False) |             getattr(func, '_tractor_contex_function', False) | ||||||
|  | @ -358,8 +373,8 @@ class Portal: | ||||||
| 
 | 
 | ||||||
|         fn_mod_path, fn_name = func_deats(func) |         fn_mod_path, fn_name = func_deats(func) | ||||||
| 
 | 
 | ||||||
|  |         recv_chan: Optional[trio.MemoryReceiveChannel] = None | ||||||
| 
 | 
 | ||||||
|         recv_chan: trio.ReceiveMemoryChannel = None |  | ||||||
|         try: |         try: | ||||||
|             cid, recv_chan, functype, first_msg = await self._submit( |             cid, recv_chan, functype, first_msg = await self._submit( | ||||||
|                 fn_mod_path, fn_name, kwargs) |                 fn_mod_path, fn_name, kwargs) | ||||||
|  | @ -383,13 +398,38 @@ class Portal: | ||||||
| 
 | 
 | ||||||
|             # deliver context instance and .started() msg value in open |             # deliver context instance and .started() msg value in open | ||||||
|             # tuple. |             # tuple. | ||||||
|             ctx = Context(self.channel, cid, _portal=self) |             ctx = Context( | ||||||
|  |                 self.channel, | ||||||
|  |                 cid, | ||||||
|  |                 _portal=self, | ||||||
|  |                 _recv_chan=recv_chan, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|             try: |             try: | ||||||
|                 yield ctx, first |                 yield ctx, first | ||||||
| 
 | 
 | ||||||
|             finally: |                 if cancel_on_exit: | ||||||
|                     await ctx.cancel() |                     await ctx.cancel() | ||||||
| 
 | 
 | ||||||
|  |                 else: | ||||||
|  |                     if not ctx._cancel_called: | ||||||
|  |                         await ctx.result() | ||||||
|  | 
 | ||||||
|  |             except ContextCancelled: | ||||||
|  |                 # if the context was cancelled by client code | ||||||
|  |                 # then we don't need to raise since user code | ||||||
|  |                 # is expecting this. | ||||||
|  |                 if not ctx._cancel_called: | ||||||
|  |                     raise | ||||||
|  | 
 | ||||||
|  |             except BaseException: | ||||||
|  |                 # the context cancels itself on any deviation | ||||||
|  |                 await ctx.cancel() | ||||||
|  |                 raise | ||||||
|  | 
 | ||||||
|  |             finally: | ||||||
|  |                 log.info(f'Context for {func.__name__} completed') | ||||||
|  | 
 | ||||||
|         finally: |         finally: | ||||||
|             if recv_chan is not None: |             if recv_chan is not None: | ||||||
|                 await recv_chan.aclose() |                 await recv_chan.aclose() | ||||||
|  |  | ||||||
|  | @ -35,10 +35,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|     which invoked a remote streaming function using `Portal.run()`. |     which invoked a remote streaming function using `Portal.run()`. | ||||||
| 
 | 
 | ||||||
|     Termination rules: |     Termination rules: | ||||||
|  | 
 | ||||||
|     - if the local task signals stop iteration a cancel signal is |     - if the local task signals stop iteration a cancel signal is | ||||||
|       relayed to the remote task indicating to stop streaming |       relayed to the remote task indicating to stop streaming | ||||||
|     - if the remote task signals the end of a stream, raise a |     - if the remote task signals the end of a stream, raise | ||||||
|       ``StopAsyncIteration`` to terminate the local ``async for`` |       a ``StopAsyncIteration`` to terminate the local ``async for`` | ||||||
| 
 | 
 | ||||||
|     """ |     """ | ||||||
|     def __init__( |     def __init__( | ||||||
|  | @ -51,12 +52,19 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|         self._rx_chan = rx_chan |         self._rx_chan = rx_chan | ||||||
|         self._shielded = shield |         self._shielded = shield | ||||||
| 
 | 
 | ||||||
|  |         # flag to denote end of stream | ||||||
|  |         self._eoc: bool = False | ||||||
|  | 
 | ||||||
|     # delegate directly to underlying mem channel |     # delegate directly to underlying mem channel | ||||||
|     def receive_nowait(self): |     def receive_nowait(self): | ||||||
|         msg = self._rx_chan.receive_nowait() |         msg = self._rx_chan.receive_nowait() | ||||||
|         return msg['yield'] |         return msg['yield'] | ||||||
| 
 | 
 | ||||||
|     async def receive(self): |     async def receive(self): | ||||||
|  |         # see ``.aclose()`` to an alt to always checking this | ||||||
|  |         if self._eoc: | ||||||
|  |             raise trio.EndOfChannel | ||||||
|  | 
 | ||||||
|         try: |         try: | ||||||
|             msg = await self._rx_chan.receive() |             msg = await self._rx_chan.receive() | ||||||
|             return msg['yield'] |             return msg['yield'] | ||||||
|  | @ -72,9 +80,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
| 
 | 
 | ||||||
|             if msg.get('stop'): |             if msg.get('stop'): | ||||||
|                 log.debug(f"{self} was stopped at remote end") |                 log.debug(f"{self} was stopped at remote end") | ||||||
|  |                 self._eoc = True | ||||||
|  | 
 | ||||||
|                 # when the send is closed we assume the stream has |                 # when the send is closed we assume the stream has | ||||||
|                 # terminated and signal this local iterator to stop |                 # terminated and signal this local iterator to stop | ||||||
|                 await self.aclose() |                 await self.aclose() | ||||||
|  | 
 | ||||||
|  |                 # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||||
|  |                 # raise a ``StopAsyncIteration``. | ||||||
|                 raise trio.EndOfChannel |                 raise trio.EndOfChannel | ||||||
| 
 | 
 | ||||||
|             # TODO: test that shows stream raising an expected error!!! |             # TODO: test that shows stream raising an expected error!!! | ||||||
|  | @ -85,7 +98,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|             else: |             else: | ||||||
|                 raise |                 raise | ||||||
| 
 | 
 | ||||||
|         except (trio.ClosedResourceError, StopAsyncIteration): |         except trio.ClosedResourceError: | ||||||
|             # XXX: this indicates that a `stop` message was |             # XXX: this indicates that a `stop` message was | ||||||
|             # sent by the far side of the underlying channel. |             # sent by the far side of the underlying channel. | ||||||
|             # Currently this is triggered by calling ``.aclose()`` on |             # Currently this is triggered by calling ``.aclose()`` on | ||||||
|  | @ -108,8 +121,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|             # terminated and signal this local iterator to stop |             # terminated and signal this local iterator to stop | ||||||
|             await self.aclose() |             await self.aclose() | ||||||
| 
 | 
 | ||||||
|             # await self._ctx.send_stop() |             raise  # propagate | ||||||
|             raise StopAsyncIteration |  | ||||||
| 
 | 
 | ||||||
|         except trio.Cancelled: |         except trio.Cancelled: | ||||||
|             # relay cancels to the remote task |             # relay cancels to the remote task | ||||||
|  | @ -138,12 +150,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|         on close. |         on close. | ||||||
| 
 | 
 | ||||||
|         """ |         """ | ||||||
|         # TODO: proper adherance to trio's `.aclose()` semantics: |         # XXX: keep proper adherance to trio's `.aclose()` semantics: | ||||||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose |         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||||
|         rx_chan = self._rx_chan |         rx_chan = self._rx_chan | ||||||
| 
 | 
 | ||||||
|         if rx_chan._closed: |         if rx_chan._closed: | ||||||
|             log.warning(f"{self} is already closed") |             log.warning(f"{self} is already closed") | ||||||
|  | 
 | ||||||
|  |             # this stream has already been closed so silently succeed as | ||||||
|  |             # per ``trio.AsyncResource`` semantics. | ||||||
|  |             # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|         # TODO: broadcasting to multiple consumers |         # TODO: broadcasting to multiple consumers | ||||||
|  | @ -173,12 +189,45 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||||
|         # `Context.open_stream()` will create the `Actor._cids2qs` |         # `Context.open_stream()` will create the `Actor._cids2qs` | ||||||
|         # entry from a call to `Actor.get_memchans()`. |         # entry from a call to `Actor.get_memchans()`. | ||||||
|         if not self._ctx._portal: |         if not self._ctx._portal: | ||||||
|  |             try: | ||||||
|                 # only for 2 way streams can we can send |                 # only for 2 way streams can we can send | ||||||
|                 # stop from the caller side |                 # stop from the caller side | ||||||
|                 await self._ctx.send_stop() |                 await self._ctx.send_stop() | ||||||
| 
 | 
 | ||||||
|         # close the local mem chan |             except trio.BrokenResourceError: | ||||||
|         await rx_chan.aclose() |                 # the underlying channel may already have been pulled | ||||||
|  |                 # in which case our stop message is meaningless since | ||||||
|  |                 # it can't traverse the transport. | ||||||
|  |                 log.debug(f'Channel for {self} was already closed') | ||||||
|  | 
 | ||||||
|  |         self._eoc = True | ||||||
|  | 
 | ||||||
|  |         # close the local mem chan??!? | ||||||
|  | 
 | ||||||
|  |         # NOT if we're a ``MsgStream``! | ||||||
|  |         # BECAUSE this same core-msg-loop mem recv-chan is used to deliver | ||||||
|  |         # the potential final result from the surrounding inter-actor | ||||||
|  |         # `Context` so we don't want to close it until that context has | ||||||
|  |         # run to completion. | ||||||
|  | 
 | ||||||
|  |         # XXX: Notes on old behaviour. | ||||||
|  | 
 | ||||||
|  |         # In the receive-only case, ``Portal.open_stream_from()`` should | ||||||
|  |         # call this explicitly on teardown but additionally if for some | ||||||
|  |         # reason stream consumer code tries to manually receive a new | ||||||
|  |         # value before ``.aclose()`` is called **but** the far end has | ||||||
|  |         # stopped `.receive()` **must** raise ``trio.EndofChannel`` in | ||||||
|  |         # order to avoid an infinite hang on ``.__anext__()``. So we can | ||||||
|  |         # instead uncomment this check and close the underlying msg-loop | ||||||
|  |         # mem chan below and not then **not** check for ``self._eoc`` in | ||||||
|  |         # ``.receive()`` (if for some reason we think that check is | ||||||
|  |         # a bottle neck - not likely) such that the | ||||||
|  |         # ``trio.ClosedResourceError`` would instead trigger the | ||||||
|  |         # ``trio.EndOfChannel`` in ``.receive()`` (as it originally was | ||||||
|  |         # before bi-dir streaming support). | ||||||
|  | 
 | ||||||
|  |         # if not isinstance(self, MsgStream): | ||||||
|  |         #     await rx_chan.aclose() | ||||||
| 
 | 
 | ||||||
|     # TODO: but make it broadcasting to consumers |     # TODO: but make it broadcasting to consumers | ||||||
|     # def clone(self): |     # def clone(self): | ||||||
|  | @ -206,29 +255,29 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||||
|         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) |         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @dataclass(frozen=True) | @dataclass | ||||||
| class Context: | class Context: | ||||||
|     """An IAC (inter-actor communication) context. |     '''An inter-actor task communication context. | ||||||
| 
 | 
 | ||||||
|     Allows maintaining task or protocol specific state between communicating |     Allows maintaining task or protocol specific state between | ||||||
|     actors. A unique context is created on the receiving end for every request |     2 communicating actor tasks. A unique context is created on the | ||||||
|     to a remote actor. |     callee side/end for every request to a remote actor from a portal. | ||||||
| 
 | 
 | ||||||
|     A context can be cancelled and (eventually) restarted from |     A context can be cancelled and (possibly eventually restarted) from | ||||||
|     either side of the underlying IPC channel. |     either side of the underlying IPC channel. | ||||||
| 
 | 
 | ||||||
|     A context can be used to open task oriented message streams. |     A context can be used to open task oriented message streams and can | ||||||
|  |     be thought of as an IPC aware inter-actor cancel scope. | ||||||
| 
 | 
 | ||||||
|     """ |     ''' | ||||||
|     chan: Channel |     chan: Channel | ||||||
|     cid: str |     cid: str | ||||||
| 
 | 
 | ||||||
|     # TODO: should we have seperate types for caller vs. callee |  | ||||||
|     # side contexts? The caller always opens a portal whereas the callee |  | ||||||
|     # is always responding back through a context-stream |  | ||||||
| 
 |  | ||||||
|     # only set on the caller side |     # only set on the caller side | ||||||
|     _portal: Optional['Portal'] = None    # type: ignore # noqa |     _portal: Optional['Portal'] = None    # type: ignore # noqa | ||||||
|  |     _recv_chan: Optional[trio.MemoryReceiveChannel] = None | ||||||
|  |     _result: Optional[Any] = False | ||||||
|  |     _cancel_called: bool = False | ||||||
| 
 | 
 | ||||||
|     # only set on the callee side |     # only set on the callee side | ||||||
|     _cancel_scope: Optional[trio.CancelScope] = None |     _cancel_scope: Optional[trio.CancelScope] = None | ||||||
|  | @ -247,12 +296,14 @@ class Context: | ||||||
|         await self.chan.send({'stop': True, 'cid': self.cid}) |         await self.chan.send({'stop': True, 'cid': self.cid}) | ||||||
| 
 | 
 | ||||||
|     async def cancel(self) -> None: |     async def cancel(self) -> None: | ||||||
|         """Cancel this inter-actor-task context. |         '''Cancel this inter-actor-task context. | ||||||
| 
 | 
 | ||||||
|         Request that the far side cancel it's current linked context, |         Request that the far side cancel it's current linked context, | ||||||
|         timeout quickly to sidestep 2-generals... |         Timeout quickly in an attempt to sidestep 2-generals... | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         self._cancel_called = True | ||||||
| 
 | 
 | ||||||
|         """ |  | ||||||
|         if self._portal:  # caller side: |         if self._portal:  # caller side: | ||||||
|             if not self._portal: |             if not self._portal: | ||||||
|                 raise RuntimeError( |                 raise RuntimeError( | ||||||
|  | @ -290,17 +341,31 @@ class Context: | ||||||
|             # https://github.com/goodboy/tractor/issues/36 |             # https://github.com/goodboy/tractor/issues/36 | ||||||
|             self._cancel_scope.cancel() |             self._cancel_scope.cancel() | ||||||
| 
 | 
 | ||||||
|     # TODO: do we need a restart api? |  | ||||||
|     # async def restart(self) -> None: |  | ||||||
|     #     pass |  | ||||||
| 
 |  | ||||||
|     @asynccontextmanager |     @asynccontextmanager | ||||||
|     async def open_stream( |     async def open_stream( | ||||||
|  | 
 | ||||||
|         self, |         self, | ||||||
|         shield: bool = False, |         shield: bool = False, | ||||||
|     ) -> AsyncGenerator[MsgStream, None]: |  | ||||||
|         # TODO |  | ||||||
| 
 | 
 | ||||||
|  |     ) -> AsyncGenerator[MsgStream, None]: | ||||||
|  |         '''Open a ``MsgStream``, a bi-directional stream connected to the | ||||||
|  |         cross-actor (far end) task for this ``Context``. | ||||||
|  | 
 | ||||||
|  |         This context manager must be entered on both the caller and | ||||||
|  |         callee for the stream to logically be considered "connected". | ||||||
|  | 
 | ||||||
|  |         A ``MsgStream`` is currently "one-shot" use, meaning if you | ||||||
|  |         close it you can not "re-open" it for streaming and instead you | ||||||
|  |         must re-establish a new surrounding ``Context`` using | ||||||
|  |         ``Portal.open_context()``.  In the future this may change but | ||||||
|  |         currently there seems to be no obvious reason to support | ||||||
|  |         "re-opening": | ||||||
|  |             - pausing a stream can be done with a message. | ||||||
|  |             - task errors will normally require a restart of the entire | ||||||
|  |               scope of the inter-actor task context due to the nature of | ||||||
|  |               ``trio``'s cancellation system. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|         actor = current_actor() |         actor = current_actor() | ||||||
| 
 | 
 | ||||||
|         # here we create a mem chan that corresponds to the |         # here we create a mem chan that corresponds to the | ||||||
|  | @ -316,6 +381,19 @@ class Context: | ||||||
|             self.cid |             self.cid | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |         # XXX: If the underlying receive mem chan has been closed then | ||||||
|  |         # likely client code has already exited a ``.open_stream()`` | ||||||
|  |         # block prior. we error here until such a time that we decide | ||||||
|  |         # allowing streams to be "re-connected" is supported and/or | ||||||
|  |         # a good idea. | ||||||
|  |         if recv_chan._closed: | ||||||
|  |             task = trio.lowlevel.current_task().name | ||||||
|  |             raise trio.ClosedResourceError( | ||||||
|  |                 f'stream for {actor.uid[0]}:{task} has already been closed.' | ||||||
|  |                 '\nRe-opening a closed stream is not yet supported!' | ||||||
|  |                 '\nConsider re-calling the containing `@tractor.context` func' | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|         async with MsgStream( |         async with MsgStream( | ||||||
|             ctx=self, |             ctx=self, | ||||||
|             rx_chan=recv_chan, |             rx_chan=recv_chan, | ||||||
|  | @ -326,19 +404,65 @@ class Context: | ||||||
|                 self._portal._streams.add(rchan) |                 self._portal._streams.add(rchan) | ||||||
| 
 | 
 | ||||||
|             try: |             try: | ||||||
|  |                 # ensure we aren't cancelled before delivering | ||||||
|  |                 # the stream | ||||||
|  |                 # await trio.lowlevel.checkpoint() | ||||||
|                 yield rchan |                 yield rchan | ||||||
| 
 | 
 | ||||||
|             except trio.EndOfChannel: |             except trio.EndOfChannel: | ||||||
|  |                 # stream iteration stop signal | ||||||
|                 raise |                 raise | ||||||
| 
 | 
 | ||||||
|             else: |             else: | ||||||
|                 # signal ``StopAsyncIteration`` on far end. |                 # XXX: Make the stream "one-shot use".  On exit, signal | ||||||
|  |                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the | ||||||
|  |                 # far end. | ||||||
|                 await self.send_stop() |                 await self.send_stop() | ||||||
| 
 | 
 | ||||||
|             finally: |             finally: | ||||||
|                 if self._portal: |                 if self._portal: | ||||||
|                     self._portal._streams.remove(rchan) |                     self._portal._streams.remove(rchan) | ||||||
| 
 | 
 | ||||||
|  |     async def result(self) -> Any: | ||||||
|  |         '''From a caller side, wait for and return the final result from | ||||||
|  |         the callee side task. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         assert self._portal, "Context.result() can not be called from callee!" | ||||||
|  |         assert self._recv_chan | ||||||
|  | 
 | ||||||
|  |         if self._result is False: | ||||||
|  | 
 | ||||||
|  |             if not self._recv_chan._closed:  # type: ignore | ||||||
|  | 
 | ||||||
|  |                 # wait for a final context result consuming | ||||||
|  |                 # and discarding any bi dir stream msgs still | ||||||
|  |                 # in transit from the far end. | ||||||
|  |                 while True: | ||||||
|  | 
 | ||||||
|  |                     msg = await self._recv_chan.receive() | ||||||
|  |                     try: | ||||||
|  |                         self._result = msg['return'] | ||||||
|  |                         break | ||||||
|  |                     except KeyError: | ||||||
|  | 
 | ||||||
|  |                         if 'yield' in msg: | ||||||
|  |                             # far end task is still streaming to us.. | ||||||
|  |                             log.warning(f'Remote stream deliverd {msg}') | ||||||
|  |                             # do disard | ||||||
|  |                             continue | ||||||
|  | 
 | ||||||
|  |                         elif 'stop' in msg: | ||||||
|  |                             log.debug('Remote stream terminated') | ||||||
|  |                             continue | ||||||
|  | 
 | ||||||
|  |                         # internal error should never get here | ||||||
|  |                         assert msg.get('cid'), ( | ||||||
|  |                             "Received internal error at portal?") | ||||||
|  |                         raise unpack_error(msg, self._portal.channel) | ||||||
|  | 
 | ||||||
|  |         return self._result | ||||||
|  | 
 | ||||||
|     async def started(self, value: Optional[Any] = None) -> None: |     async def started(self, value: Optional[Any] = None) -> None: | ||||||
| 
 | 
 | ||||||
|         if self._portal: |         if self._portal: | ||||||
|  | @ -347,6 +471,10 @@ class Context: | ||||||
| 
 | 
 | ||||||
|         await self.chan.send({'started': value, 'cid': self.cid}) |         await self.chan.send({'started': value, 'cid': self.cid}) | ||||||
| 
 | 
 | ||||||
|  |     # TODO: do we need a restart api? | ||||||
|  |     # async def restart(self) -> None: | ||||||
|  |     #     pass | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| def stream(func: Callable) -> Callable: | def stream(func: Callable) -> Callable: | ||||||
|     """Mark an async function as a streaming routine with ``@stream``. |     """Mark an async function as a streaming routine with ``@stream``. | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue