forked from goodboy/tractor
				
			Move `Context.open_stream()` impl to `._streaming`
Exactly like how it's organized for `Portal.open_context()`, put the main streaming API `@acm` with the `MsgStream` code and bind the method to the new module func. Other, - rename `Context.result()` -> `.wait_for_result()` to better match the blocking semantics and rebind `.result()` as deprecated. - add doc-str for `Context.maybe_raise()`.remotes/1757153874605917753/main
							parent
							
								
									54386900e0
								
							
						
					
					
						commit
						18b4618b5f
					
				|  | @ -86,7 +86,10 @@ from .msg import ( | |||
| from ._ipc import ( | ||||
|     Channel, | ||||
| ) | ||||
| from ._streaming import MsgStream | ||||
| from ._streaming import ( | ||||
|     MsgStream, | ||||
|     open_stream_from_ctx, | ||||
| ) | ||||
| from ._state import ( | ||||
|     current_actor, | ||||
|     debug_mode, | ||||
|  | @ -978,198 +981,6 @@ class Context: | |||
|             assert self._scope | ||||
|             self._scope.cancel() | ||||
| 
 | ||||
|     # TODO? should we move this to `._streaming` much like we | ||||
|     # moved `Portal.open_context()`'s def to this mod? | ||||
|     @acm | ||||
|     async def open_stream( | ||||
|         self, | ||||
|         allow_overruns: bool|None = False, | ||||
|         msg_buffer_size: int|None = None, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[MsgStream, None]: | ||||
|         ''' | ||||
|         Open a ``MsgStream``, a bi-directional stream connected to the | ||||
|         cross-actor (far end) task for this ``Context``. | ||||
| 
 | ||||
|         This context manager must be entered on both the caller and | ||||
|         callee for the stream to logically be considered "connected". | ||||
| 
 | ||||
|         A ``MsgStream`` is currently "one-shot" use, meaning if you | ||||
|         close it you can not "re-open" it for streaming and instead you | ||||
|         must re-establish a new surrounding ``Context`` using | ||||
|         ``Portal.open_context()``.  In the future this may change but | ||||
|         currently there seems to be no obvious reason to support | ||||
|         "re-opening": | ||||
|           - pausing a stream can be done with a message. | ||||
|           - task errors will normally require a restart of the entire | ||||
|             scope of the inter-actor task context due to the nature of | ||||
|             ``trio``'s cancellation system. | ||||
| 
 | ||||
|         ''' | ||||
|         actor: Actor = self._actor | ||||
| 
 | ||||
|         # If the surrounding context has been cancelled by some | ||||
|         # task with a handle to THIS, we error here immediately | ||||
|         # since it likely means the surrounding lexical-scope has | ||||
|         # errored, been `trio.Cancelled` or at the least | ||||
|         # `Context.cancel()` was called by some task. | ||||
|         if self._cancel_called: | ||||
| 
 | ||||
|             # XXX NOTE: ALWAYS RAISE any remote error here even if | ||||
|             # it's an expected `ContextCancelled` due to a local | ||||
|             # task having called `.cancel()`! | ||||
|             # | ||||
|             # WHY: we expect the error to always bubble up to the | ||||
|             # surrounding `Portal.open_context()` call and be | ||||
|             # absorbed there (silently) and we DO NOT want to | ||||
|             # actually try to stream - a cancel msg was already | ||||
|             # sent to the other side! | ||||
|             self.maybe_raise( | ||||
|                 raise_ctxc_from_self_call=True, | ||||
|             ) | ||||
|             # NOTE: this is diff then calling | ||||
|             # `._maybe_raise_remote_err()` specifically | ||||
|             # because we want to raise a ctxc on any task entering this `.open_stream()` | ||||
|             # AFTER cancellation was already been requested, | ||||
|             # we DO NOT want to absorb any ctxc ACK silently! | ||||
|             # if self._remote_error: | ||||
|             #     raise self._remote_error | ||||
| 
 | ||||
|             # XXX NOTE: if no `ContextCancelled` has been responded | ||||
|             # back from the other side (yet), we raise a different | ||||
|             # runtime error indicating that this task's usage of | ||||
|             # `Context.cancel()` and then `.open_stream()` is WRONG! | ||||
|             task: str = trio.lowlevel.current_task().name | ||||
|             raise RuntimeError( | ||||
|                 'Stream opened after `Context.cancel()` called..?\n' | ||||
|                 f'task: {actor.uid[0]}:{task}\n' | ||||
|                 f'{self}' | ||||
|             ) | ||||
| 
 | ||||
|         if ( | ||||
|             not self._portal | ||||
|             and not self._started_called | ||||
|         ): | ||||
|             raise RuntimeError( | ||||
|                 'Context.started()` must be called before opening a stream' | ||||
|             ) | ||||
| 
 | ||||
|         # NOTE: in one way streaming this only happens on the | ||||
|         # parent-ctx-task side (on the side that calls | ||||
|         # `Actor.start_remote_task()`) so if you try to send | ||||
|         # a stop from the caller to the callee in the | ||||
|         # single-direction-stream case you'll get a lookup error | ||||
|         # currently. | ||||
|         ctx: Context = actor.get_context( | ||||
|             chan=self.chan, | ||||
|             cid=self.cid, | ||||
|             nsf=self._nsf, | ||||
|             # side=self.side, | ||||
| 
 | ||||
|             msg_buffer_size=msg_buffer_size, | ||||
|             allow_overruns=allow_overruns, | ||||
|         ) | ||||
|         ctx._allow_overruns: bool = allow_overruns | ||||
|         assert ctx is self | ||||
| 
 | ||||
|         # XXX: If the underlying channel feeder receive mem chan has | ||||
|         # been closed then likely client code has already exited | ||||
|         # a ``.open_stream()`` block prior or there was some other | ||||
|         # unanticipated error or cancellation from ``trio``. | ||||
| 
 | ||||
|         if ctx._rx_chan._closed: | ||||
|             raise trio.ClosedResourceError( | ||||
|                 'The underlying channel for this stream was already closed!\n' | ||||
|             ) | ||||
| 
 | ||||
|         # NOTE: implicitly this will call `MsgStream.aclose()` on | ||||
|         # `.__aexit__()` due to stream's parent `Channel` type! | ||||
|         # | ||||
|         # XXX NOTE XXX: ensures the stream is "one-shot use", | ||||
|         # which specifically means that on exit, | ||||
|         # - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to | ||||
|         #   the far end indicating that the caller exited | ||||
|         #   the streaming context purposefully by letting | ||||
|         #   the exit block exec. | ||||
|         # - this is diff from the cancel/error case where | ||||
|         #   a cancel request from this side or an error | ||||
|         #   should be sent to the far end indicating the | ||||
|         #   stream WAS NOT just closed normally/gracefully. | ||||
|         async with MsgStream( | ||||
|             ctx=self, | ||||
|             rx_chan=ctx._rx_chan, | ||||
|         ) as stream: | ||||
| 
 | ||||
|             # NOTE: we track all existing streams per portal for | ||||
|             # the purposes of attempting graceful closes on runtime | ||||
|             # cancel requests. | ||||
|             if self._portal: | ||||
|                 self._portal._streams.add(stream) | ||||
| 
 | ||||
|             try: | ||||
|                 self._stream_opened: bool = True | ||||
|                 self._stream = stream | ||||
| 
 | ||||
|                 # XXX: do we need this? | ||||
|                 # ensure we aren't cancelled before yielding the stream | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield stream | ||||
| 
 | ||||
|                 # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|                 # wait for any immediate child in debug before popping the | ||||
|                 # context from the runtime msg loop otherwise inside | ||||
|                 # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in | ||||
|                 # the case where that msg is global debugger unlock (via | ||||
|                 # a "stop" msg for a stream), this can result in a deadlock | ||||
|                 # where the root is waiting on the lock to clear but the | ||||
|                 # child has already cleared it and clobbered IPC. | ||||
|                 # | ||||
|                 # await maybe_wait_for_debugger() | ||||
| 
 | ||||
|                 # XXX TODO: pretty sure this isn't needed (see | ||||
|                 # note above this block) AND will result in | ||||
|                 # a double `.send_stop()` call. The only reason to | ||||
|                 # put it here would be to due with "order" in | ||||
|                 # terms of raising any remote error (as per | ||||
|                 # directly below) or bc the stream's | ||||
|                 # `.__aexit__()` block might not get run | ||||
|                 # (doubtful)? Either way if we did put this back | ||||
|                 # in we also need a state var to avoid the double | ||||
|                 # stop-msg send.. | ||||
|                 # | ||||
|                 # await stream.aclose() | ||||
| 
 | ||||
|             # NOTE: absorb and do not raise any | ||||
|             # EoC received from the other side such that | ||||
|             # it is not raised inside the surrounding | ||||
|             # context block's scope! | ||||
|             except trio.EndOfChannel as eoc: | ||||
|                 if ( | ||||
|                     eoc | ||||
|                     and | ||||
|                     stream.closed | ||||
|                 ): | ||||
|                     # sanity, can remove? | ||||
|                     assert eoc is stream._eoc | ||||
| 
 | ||||
|                     log.warning( | ||||
|                         'Stream was terminated by EoC\n\n' | ||||
|                         # NOTE: won't show the error <Type> but | ||||
|                         # does show txt followed by IPC msg. | ||||
|                         f'{str(eoc)}\n' | ||||
|                     ) | ||||
| 
 | ||||
|             finally: | ||||
|                 if self._portal: | ||||
|                     try: | ||||
|                         self._portal._streams.remove(stream) | ||||
|                     except KeyError: | ||||
|                         log.warning( | ||||
|                             f'Stream was already destroyed?\n' | ||||
|                             f'actor: {self.chan.uid}\n' | ||||
|                             f'ctx id: {self.cid}' | ||||
|                         ) | ||||
| 
 | ||||
|     # TODO: replace all the `._maybe_raise_remote_err()` usage | ||||
|     # with instances of this!! | ||||
|     def maybe_raise( | ||||
|  | @ -1178,6 +989,14 @@ class Context: | |||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> Exception|None: | ||||
|         ''' | ||||
|         Check for for a remote error delivered by the runtime from | ||||
|         our peer (task); if set immediately raise. | ||||
| 
 | ||||
|         This is a convenience wrapper for | ||||
|         `._maybe_raise_remote_err(self._remote_error)`. | ||||
| 
 | ||||
|         ''' | ||||
|         __tracebackhide__: bool = hide_tb | ||||
|         if re := self._remote_error: | ||||
|             return self._maybe_raise_remote_err( | ||||
|  | @ -1290,8 +1109,7 @@ class Context: | |||
| 
 | ||||
|         raise remote_error | ||||
| 
 | ||||
|     # TODO: change  to `.wait_for_result()`? | ||||
|     async def result( | ||||
|     async def wait_for_result( | ||||
|         self, | ||||
|         hide_tb: bool = True, | ||||
| 
 | ||||
|  | @ -1380,18 +1198,27 @@ class Context: | |||
|                 (not self._cancel_called) | ||||
|             ) | ||||
|         ) | ||||
|         # TODO: eventually make `.outcome: Outcome` and thus return | ||||
|         # `self.outcome.unwrap()` here! | ||||
|         return self.outcome | ||||
| 
 | ||||
|     # TODO: switch this with above! | ||||
|     # -[ ] should be named `.wait_for_outcome()` and instead do | ||||
|     #     a `.outcome.Outcome.unwrap()` ? | ||||
|     # | ||||
|     # @property | ||||
|     # def result(self) -> Any|None: | ||||
|     #     if self._final_result_is_set(): | ||||
|     #         return self._result | ||||
| 
 | ||||
|     #     raise RuntimeError('No result is available!') | ||||
|     async def result( | ||||
|         self, | ||||
|         *args, | ||||
|         **kwargs, | ||||
|     ) -> Any|Exception: | ||||
|         log.warning( | ||||
|             '`Context.result()` is DEPRECATED!\n' | ||||
|             'Use `Context.[no]wait_for_result()` instead!\n' | ||||
|         ) | ||||
|         return await self.wait_for_result( | ||||
|             *args, | ||||
|             **kwargs, | ||||
|         ) | ||||
| 
 | ||||
|     @property | ||||
|     def maybe_error(self) -> BaseException|None: | ||||
|  | @ -1447,6 +1274,9 @@ class Context: | |||
|         return self._result is not Unresolved | ||||
| 
 | ||||
|     # def get_result_nowait(self) -> Any|None: | ||||
|     # def get_outcome_nowait(self) -> Any|None: | ||||
|     # def recv_result_nowait(self) -> Any|None: | ||||
|     # def receive_outcome_nowait(self) -> Any|None: | ||||
|     # TODO: use `outcome.Outcome` here instead? | ||||
|     @property | ||||
|     def outcome(self) -> ( | ||||
|  | @ -1476,7 +1306,6 @@ class Context: | |||
|     def has_outcome(self) -> bool: | ||||
|         return bool(self.maybe_error) or self._final_result_is_set() | ||||
| 
 | ||||
|     # @property | ||||
|     def repr_outcome( | ||||
|         self, | ||||
|         show_error_fields: bool = False, | ||||
|  | @ -1498,7 +1327,8 @@ class Context: | |||
|             # just deliver the type name. | ||||
|             if ( | ||||
|                 (reprol := getattr(merr, 'reprol', False)) | ||||
|                 and show_error_fields | ||||
|                 and | ||||
|                 show_error_fields | ||||
|             ): | ||||
|                 return reprol() | ||||
| 
 | ||||
|  | @ -1515,10 +1345,6 @@ class Context: | |||
|                     repr(merr) | ||||
|                 ) | ||||
| 
 | ||||
|             # just the type name | ||||
|             # else:  # but wen? | ||||
|             #     return type(merr).__name__ | ||||
| 
 | ||||
|             # for all other errors show their regular output | ||||
|             return ( | ||||
|                 str(merr) | ||||
|  | @ -1572,7 +1398,7 @@ class Context: | |||
|                 _,  # any non-unresolved value | ||||
|                 None, | ||||
|             ) if self._final_result_is_set(): | ||||
|                 status = 'returned' | ||||
|                 status = 'result-returned' | ||||
| 
 | ||||
|             # normal operation but still in a pre-`Return`-result | ||||
|             # dialog phase | ||||
|  | @ -1940,6 +1766,11 @@ class Context: | |||
|             # ow, indicate unable to deliver by default | ||||
|             return False | ||||
| 
 | ||||
|     # NOTE: similar to `Portal.open_context()`, this impl is found in | ||||
|     # the `._streaming`` mod to make reading/groking the details | ||||
|     # simpler code-org-wise. | ||||
|     open_stream = open_stream_from_ctx | ||||
| 
 | ||||
| 
 | ||||
| # TODO: exception tb masking by using a manual | ||||
| # `.__aexit__()`/.__aenter__()` pair on a type? | ||||
|  |  | |||
|  | @ -26,6 +26,7 @@ import inspect | |||
| from pprint import pformat | ||||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncGenerator, | ||||
|     Callable, | ||||
|     AsyncIterator, | ||||
|     TYPE_CHECKING, | ||||
|  | @ -51,6 +52,7 @@ from tractor.msg import ( | |||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._runtime import Actor | ||||
|     from ._context import Context | ||||
|     from ._ipc import Channel | ||||
| 
 | ||||
|  | @ -550,6 +552,213 @@ class MsgStream(trio.abc.Channel): | |||
|     #     ... | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_stream_from_ctx( | ||||
|     ctx: Context, | ||||
|     allow_overruns: bool|None = False, | ||||
|     msg_buffer_size: int|None = None, | ||||
| 
 | ||||
| ) -> AsyncGenerator[MsgStream, None]: | ||||
|     ''' | ||||
|     Open a `MsgStream`, a bi-directional msg transport dialog | ||||
|     connected to the cross-actor peer task for an IPC `Context`. | ||||
| 
 | ||||
|     This context manager must be entered in both the "parent" (task | ||||
|     which entered `Portal.open_context()`) and "child" (RPC task | ||||
|     which is decorated by `@context`) tasks for the stream to | ||||
|     logically be considered "open"; if one side begins sending to an | ||||
|     un-opened peer, depending on policy config, msgs will either be | ||||
|     queued until the other side opens and/or a `StreamOverrun` will | ||||
|     (eventually) be raised. | ||||
| 
 | ||||
|                          ------ - ------ | ||||
| 
 | ||||
|     Runtime semantics design: | ||||
| 
 | ||||
|     A `MsgStream` session adheres to "one-shot use" semantics, | ||||
|     meaning if you close the scope it **can not** be "re-opened". | ||||
| 
 | ||||
|     Instead you must re-establish a new surrounding RPC `Context` | ||||
|     (RTC: remote task context?) using `Portal.open_context()`. | ||||
| 
 | ||||
|     In the future this *design choice* may need to be changed but | ||||
|     currently there seems to be no obvious reason to support such | ||||
|     semantics.. | ||||
| 
 | ||||
|     - "pausing a stream" can be supported with a message implemented | ||||
|       by the `tractor` application dev. | ||||
| 
 | ||||
|     - any remote error will normally require a restart of the entire | ||||
|       `trio.Task`'s scope due to the nature of `trio`'s cancellation | ||||
|       (`CancelScope`) system and semantics (level triggered). | ||||
| 
 | ||||
|     ''' | ||||
|     actor: Actor = ctx._actor | ||||
| 
 | ||||
|     # If the surrounding context has been cancelled by some | ||||
|     # task with a handle to THIS, we error here immediately | ||||
|     # since it likely means the surrounding lexical-scope has | ||||
|     # errored, been `trio.Cancelled` or at the least | ||||
|     # `Context.cancel()` was called by some task. | ||||
|     if ctx._cancel_called: | ||||
| 
 | ||||
|         # XXX NOTE: ALWAYS RAISE any remote error here even if | ||||
|         # it's an expected `ContextCancelled` due to a local | ||||
|         # task having called `.cancel()`! | ||||
|         # | ||||
|         # WHY: we expect the error to always bubble up to the | ||||
|         # surrounding `Portal.open_context()` call and be | ||||
|         # absorbed there (silently) and we DO NOT want to | ||||
|         # actually try to stream - a cancel msg was already | ||||
|         # sent to the other side! | ||||
|         ctx.maybe_raise( | ||||
|             raise_ctxc_from_self_call=True, | ||||
|         ) | ||||
|         # NOTE: this is diff then calling | ||||
|         # `._maybe_raise_remote_err()` specifically | ||||
|         # because we want to raise a ctxc on any task entering this `.open_stream()` | ||||
|         # AFTER cancellation was already been requested, | ||||
|         # we DO NOT want to absorb any ctxc ACK silently! | ||||
|         # if ctx._remote_error: | ||||
|         #     raise ctx._remote_error | ||||
| 
 | ||||
|         # XXX NOTE: if no `ContextCancelled` has been responded | ||||
|         # back from the other side (yet), we raise a different | ||||
|         # runtime error indicating that this task's usage of | ||||
|         # `Context.cancel()` and then `.open_stream()` is WRONG! | ||||
|         task: str = trio.lowlevel.current_task().name | ||||
|         raise RuntimeError( | ||||
|             'Stream opened after `Context.cancel()` called..?\n' | ||||
|             f'task: {actor.uid[0]}:{task}\n' | ||||
|             f'{ctx}' | ||||
|         ) | ||||
| 
 | ||||
|     if ( | ||||
|         not ctx._portal | ||||
|         and not ctx._started_called | ||||
|     ): | ||||
|         raise RuntimeError( | ||||
|             'Context.started()` must be called before opening a stream' | ||||
|         ) | ||||
| 
 | ||||
|     # NOTE: in one way streaming this only happens on the | ||||
|     # parent-ctx-task side (on the side that calls | ||||
|     # `Actor.start_remote_task()`) so if you try to send | ||||
|     # a stop from the caller to the callee in the | ||||
|     # single-direction-stream case you'll get a lookup error | ||||
|     # currently. | ||||
|     ctx: Context = actor.get_context( | ||||
|         chan=ctx.chan, | ||||
|         cid=ctx.cid, | ||||
|         nsf=ctx._nsf, | ||||
|         # side=ctx.side, | ||||
| 
 | ||||
|         msg_buffer_size=msg_buffer_size, | ||||
|         allow_overruns=allow_overruns, | ||||
|     ) | ||||
|     ctx._allow_overruns: bool = allow_overruns | ||||
|     assert ctx is ctx | ||||
| 
 | ||||
|     # XXX: If the underlying channel feeder receive mem chan has | ||||
|     # been closed then likely client code has already exited | ||||
|     # a ``.open_stream()`` block prior or there was some other | ||||
|     # unanticipated error or cancellation from ``trio``. | ||||
| 
 | ||||
|     if ctx._rx_chan._closed: | ||||
|         raise trio.ClosedResourceError( | ||||
|             'The underlying channel for this stream was already closed!\n' | ||||
|         ) | ||||
| 
 | ||||
|     # NOTE: implicitly this will call `MsgStream.aclose()` on | ||||
|     # `.__aexit__()` due to stream's parent `Channel` type! | ||||
|     # | ||||
|     # XXX NOTE XXX: ensures the stream is "one-shot use", | ||||
|     # which specifically means that on exit, | ||||
|     # - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to | ||||
|     #   the far end indicating that the caller exited | ||||
|     #   the streaming context purposefully by letting | ||||
|     #   the exit block exec. | ||||
|     # - this is diff from the cancel/error case where | ||||
|     #   a cancel request from this side or an error | ||||
|     #   should be sent to the far end indicating the | ||||
|     #   stream WAS NOT just closed normally/gracefully. | ||||
|     async with MsgStream( | ||||
|         ctx=ctx, | ||||
|         rx_chan=ctx._rx_chan, | ||||
|     ) as stream: | ||||
| 
 | ||||
|         # NOTE: we track all existing streams per portal for | ||||
|         # the purposes of attempting graceful closes on runtime | ||||
|         # cancel requests. | ||||
|         if ctx._portal: | ||||
|             ctx._portal._streams.add(stream) | ||||
| 
 | ||||
|         try: | ||||
|             ctx._stream_opened: bool = True | ||||
|             ctx._stream = stream | ||||
| 
 | ||||
|             # XXX: do we need this? | ||||
|             # ensure we aren't cancelled before yielding the stream | ||||
|             # await trio.lowlevel.checkpoint() | ||||
|             yield stream | ||||
| 
 | ||||
|             # XXX: (MEGA IMPORTANT) if this is a root opened process we | ||||
|             # wait for any immediate child in debug before popping the | ||||
|             # context from the runtime msg loop otherwise inside | ||||
|             # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in | ||||
|             # the case where that msg is global debugger unlock (via | ||||
|             # a "stop" msg for a stream), this can result in a deadlock | ||||
|             # where the root is waiting on the lock to clear but the | ||||
|             # child has already cleared it and clobbered IPC. | ||||
|             # | ||||
|             # await maybe_wait_for_debugger() | ||||
| 
 | ||||
|             # XXX TODO: pretty sure this isn't needed (see | ||||
|             # note above this block) AND will result in | ||||
|             # a double `.send_stop()` call. The only reason to | ||||
|             # put it here would be to due with "order" in | ||||
|             # terms of raising any remote error (as per | ||||
|             # directly below) or bc the stream's | ||||
|             # `.__aexit__()` block might not get run | ||||
|             # (doubtful)? Either way if we did put this back | ||||
|             # in we also need a state var to avoid the double | ||||
|             # stop-msg send.. | ||||
|             # | ||||
|             # await stream.aclose() | ||||
| 
 | ||||
|         # NOTE: absorb and do not raise any | ||||
|         # EoC received from the other side such that | ||||
|         # it is not raised inside the surrounding | ||||
|         # context block's scope! | ||||
|         except trio.EndOfChannel as eoc: | ||||
|             if ( | ||||
|                 eoc | ||||
|                 and | ||||
|                 stream.closed | ||||
|             ): | ||||
|                 # sanity, can remove? | ||||
|                 assert eoc is stream._eoc | ||||
| 
 | ||||
|                 log.warning( | ||||
|                     'Stream was terminated by EoC\n\n' | ||||
|                     # NOTE: won't show the error <Type> but | ||||
|                     # does show txt followed by IPC msg. | ||||
|                     f'{str(eoc)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|         finally: | ||||
|             if ctx._portal: | ||||
|                 try: | ||||
|                     ctx._portal._streams.remove(stream) | ||||
|                 except KeyError: | ||||
|                     log.warning( | ||||
|                         f'Stream was already destroyed?\n' | ||||
|                         f'actor: {ctx.chan.uid}\n' | ||||
|                         f'ctx id: {ctx.cid}' | ||||
|                     ) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def stream(func: Callable) -> Callable: | ||||
|     ''' | ||||
|     Mark an async function as a streaming routine with ``@stream``. | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue