From 348148ff1ed7a4e321b6ab06e1b3bd6a18deec34 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 19:58:52 -0400 Subject: [PATCH] Explicitly formalize context/streaming teardown Add clear teardown semantics for `Context` such that the remote side cancellation propagation happens only on error or if client code explicitly requests it (either by exit flag to `Portal.open_context()` or by manually calling `Context.cancel()`). Add `Context.result()` to wait on and capture the final result from a remote context function; any lingering msg sequence will be consumed/discarded. Changes in order to make this possible: - pass the runtime msg loop's feeder receive channel in to the context on the calling (portal opening) side such that a final 'return' msg can be waited upon using `Context.result()` which delivers the final return value from the callee side `@tractor.context` async function. - always await a final result from the target context function in `Portal.open_context()`'s `__aexit__()` if the context has not been (requested to be) cancelled by client code on block exit. - add an internal `Context._cancel_called` for context "cancel requested" tracking (much like `trio`'s cancel scope). - allow flagging a stream as terminated using an internal `._eoc` flag which will mark the stream as stopped for iteration. - drop `StopAsyncIteration` catching in `.receive()`; it does nothing. --- tractor/_portal.py | 58 +++++++++++-- tractor/_streaming.py | 194 +++++++++++++++++++++++++++++++++++------- 2 files changed, 210 insertions(+), 42 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 7f29e0e..32e71b0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -17,7 +17,12 @@ from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel from .log import get_logger -from ._exceptions import unpack_error, NoResult, RemoteActorError +from ._exceptions import ( + unpack_error, + NoResult, + RemoteActorError, + ContextCancelled, +) from ._streaming import Context, ReceiveMsgStream @@ -84,7 +89,7 @@ class Portal: ns: str, func: str, kwargs, - ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: + ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -327,7 +332,14 @@ class Portal: # message right now since there shouldn't be a reason to # stop and restart the stream, right? try: + + # We are for sure done with this stream and no more + # messages are expected to be delivered from the + # runtime's msg loop. + await recv_chan.aclose() + await ctx.cancel() + except trio.ClosedResourceError: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. @@ -337,18 +349,21 @@ class Portal: @asynccontextmanager async def open_context( + self, func: Callable, + cancel_on_exit: bool = False, **kwargs, + ) -> AsyncGenerator[Tuple[Context, Any], None]: - """Open an inter-actor task context. + '''Open an inter-actor task context. This is a synchronous API which allows for deterministic setup/teardown of a remote task. The yielded ``Context`` further - allows for opening bidirectional streams - see - ``Context.open_stream()``. + allows for opening bidirectional streams, explicit cancellation + and synchronized final result collection. See ``tractor.Context``. - """ + ''' # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -358,8 +373,8 @@ class Portal: fn_mod_path, fn_name = func_deats(func) + recv_chan: Optional[trio.MemoryReceiveChannel] = None - recv_chan: trio.ReceiveMemoryChannel = None try: cid, recv_chan, functype, first_msg = await self._submit( fn_mod_path, fn_name, kwargs) @@ -383,12 +398,37 @@ class Portal: # deliver context instance and .started() msg value in open # tuple. - ctx = Context(self.channel, cid, _portal=self) + ctx = Context( + self.channel, + cid, + _portal=self, + _recv_chan=recv_chan, + ) + try: yield ctx, first - finally: + if cancel_on_exit: + await ctx.cancel() + + else: + if not ctx._cancel_called: + await ctx.result() + + except ContextCancelled: + # if the context was cancelled by client code + # then we don't need to raise since user code + # is expecting this. + if not ctx._cancel_called: + raise + + except BaseException: + # the context cancels itself on any deviation await ctx.cancel() + raise + + finally: + log.info(f'Context for {func.__name__} completed') finally: if recv_chan is not None: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index fb5f8a8..ea01264 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -35,10 +35,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): which invoked a remote streaming function using `Portal.run()`. Termination rules: + - if the local task signals stop iteration a cancel signal is relayed to the remote task indicating to stop streaming - - if the remote task signals the end of a stream, raise a - ``StopAsyncIteration`` to terminate the local ``async for`` + - if the remote task signals the end of a stream, raise + a ``StopAsyncIteration`` to terminate the local ``async for`` """ def __init__( @@ -51,12 +52,19 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self._rx_chan = rx_chan self._shielded = shield + # flag to denote end of stream + self._eoc: bool = False + # delegate directly to underlying mem channel def receive_nowait(self): msg = self._rx_chan.receive_nowait() return msg['yield'] async def receive(self): + # see ``.aclose()`` to an alt to always checking this + if self._eoc: + raise trio.EndOfChannel + try: msg = await self._rx_chan.receive() return msg['yield'] @@ -72,9 +80,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if msg.get('stop'): log.debug(f"{self} was stopped at remote end") + self._eoc = True + # when the send is closed we assume the stream has # terminated and signal this local iterator to stop await self.aclose() + + # XXX: this causes ``ReceiveChannel.__anext__()`` to + # raise a ``StopAsyncIteration``. raise trio.EndOfChannel # TODO: test that shows stream raising an expected error!!! @@ -85,7 +98,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): else: raise - except (trio.ClosedResourceError, StopAsyncIteration): + except trio.ClosedResourceError: # XXX: this indicates that a `stop` message was # sent by the far side of the underlying channel. # Currently this is triggered by calling ``.aclose()`` on @@ -108,8 +121,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # terminated and signal this local iterator to stop await self.aclose() - # await self._ctx.send_stop() - raise StopAsyncIteration + raise # propagate except trio.Cancelled: # relay cancels to the remote task @@ -138,12 +150,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): on close. """ - # TODO: proper adherance to trio's `.aclose()` semantics: + # XXX: keep proper adherance to trio's `.aclose()` semantics: # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan if rx_chan._closed: log.warning(f"{self} is already closed") + + # this stream has already been closed so silently succeed as + # per ``trio.AsyncResource`` semantics. + # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose return # TODO: broadcasting to multiple consumers @@ -173,12 +189,45 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # `Context.open_stream()` will create the `Actor._cids2qs` # entry from a call to `Actor.get_memchans()`. if not self._ctx._portal: - # only for 2 way streams can we can send - # stop from the caller side - await self._ctx.send_stop() + try: + # only for 2 way streams can we can send + # stop from the caller side + await self._ctx.send_stop() - # close the local mem chan - await rx_chan.aclose() + except trio.BrokenResourceError: + # the underlying channel may already have been pulled + # in which case our stop message is meaningless since + # it can't traverse the transport. + log.debug(f'Channel for {self} was already closed') + + self._eoc = True + + # close the local mem chan??!? + + # NOT if we're a ``MsgStream``! + # BECAUSE this same core-msg-loop mem recv-chan is used to deliver + # the potential final result from the surrounding inter-actor + # `Context` so we don't want to close it until that context has + # run to completion. + + # XXX: Notes on old behaviour. + + # In the receive-only case, ``Portal.open_stream_from()`` should + # call this explicitly on teardown but additionally if for some + # reason stream consumer code tries to manually receive a new + # value before ``.aclose()`` is called **but** the far end has + # stopped `.receive()` **must** raise ``trio.EndofChannel`` in + # order to avoid an infinite hang on ``.__anext__()``. So we can + # instead uncomment this check and close the underlying msg-loop + # mem chan below and not then **not** check for ``self._eoc`` in + # ``.receive()`` (if for some reason we think that check is + # a bottle neck - not likely) such that the + # ``trio.ClosedResourceError`` would instead trigger the + # ``trio.EndOfChannel`` in ``.receive()`` (as it originally was + # before bi-dir streaming support). + + # if not isinstance(self, MsgStream): + # await rx_chan.aclose() # TODO: but make it broadcasting to consumers # def clone(self): @@ -206,29 +255,29 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) -@dataclass(frozen=True) +@dataclass class Context: - """An IAC (inter-actor communication) context. + '''An inter-actor task communication context. - Allows maintaining task or protocol specific state between communicating - actors. A unique context is created on the receiving end for every request - to a remote actor. + Allows maintaining task or protocol specific state between + 2 communicating actor tasks. A unique context is created on the + callee side/end for every request to a remote actor from a portal. - A context can be cancelled and (eventually) restarted from + A context can be cancelled and (possibly eventually restarted) from either side of the underlying IPC channel. - A context can be used to open task oriented message streams. + A context can be used to open task oriented message streams and can + be thought of as an IPC aware inter-actor cancel scope. - """ + ''' chan: Channel cid: str - # TODO: should we have seperate types for caller vs. callee - # side contexts? The caller always opens a portal whereas the callee - # is always responding back through a context-stream - # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa + _recv_chan: Optional[trio.MemoryReceiveChannel] = None + _result: Optional[Any] = False + _cancel_called: bool = False # only set on the callee side _cancel_scope: Optional[trio.CancelScope] = None @@ -247,12 +296,14 @@ class Context: await self.chan.send({'stop': True, 'cid': self.cid}) async def cancel(self) -> None: - """Cancel this inter-actor-task context. + '''Cancel this inter-actor-task context. Request that the far side cancel it's current linked context, - timeout quickly to sidestep 2-generals... + Timeout quickly in an attempt to sidestep 2-generals... + + ''' + self._cancel_called = True - """ if self._portal: # caller side: if not self._portal: raise RuntimeError( @@ -290,17 +341,31 @@ class Context: # https://github.com/goodboy/tractor/issues/36 self._cancel_scope.cancel() - # TODO: do we need a restart api? - # async def restart(self) -> None: - # pass - @asynccontextmanager async def open_stream( + self, shield: bool = False, - ) -> AsyncGenerator[MsgStream, None]: - # TODO + ) -> AsyncGenerator[MsgStream, None]: + '''Open a ``MsgStream``, a bi-directional stream connected to the + cross-actor (far end) task for this ``Context``. + + This context manager must be entered on both the caller and + callee for the stream to logically be considered "connected". + + A ``MsgStream`` is currently "one-shot" use, meaning if you + close it you can not "re-open" it for streaming and instead you + must re-establish a new surrounding ``Context`` using + ``Portal.open_context()``. In the future this may change but + currently there seems to be no obvious reason to support + "re-opening": + - pausing a stream can be done with a message. + - task errors will normally require a restart of the entire + scope of the inter-actor task context due to the nature of + ``trio``'s cancellation system. + + ''' actor = current_actor() # here we create a mem chan that corresponds to the @@ -316,6 +381,19 @@ class Context: self.cid ) + # XXX: If the underlying receive mem chan has been closed then + # likely client code has already exited a ``.open_stream()`` + # block prior. we error here until such a time that we decide + # allowing streams to be "re-connected" is supported and/or + # a good idea. + if recv_chan._closed: + task = trio.lowlevel.current_task().name + raise trio.ClosedResourceError( + f'stream for {actor.uid[0]}:{task} has already been closed.' + '\nRe-opening a closed stream is not yet supported!' + '\nConsider re-calling the containing `@tractor.context` func' + ) + async with MsgStream( ctx=self, rx_chan=recv_chan, @@ -326,19 +404,65 @@ class Context: self._portal._streams.add(rchan) try: + # ensure we aren't cancelled before delivering + # the stream + # await trio.lowlevel.checkpoint() yield rchan except trio.EndOfChannel: + # stream iteration stop signal raise else: - # signal ``StopAsyncIteration`` on far end. + # XXX: Make the stream "one-shot use". On exit, signal + # ``trio.EndOfChannel``/``StopAsyncIteration`` to the + # far end. await self.send_stop() finally: if self._portal: self._portal._streams.remove(rchan) + async def result(self) -> Any: + '''From a caller side, wait for and return the final result from + the callee side task. + + ''' + assert self._portal, "Context.result() can not be called from callee!" + assert self._recv_chan + + if self._result is False: + + if not self._recv_chan._closed: # type: ignore + + # wait for a final context result consuming + # and discarding any bi dir stream msgs still + # in transit from the far end. + while True: + + msg = await self._recv_chan.receive() + try: + self._result = msg['return'] + break + except KeyError: + + if 'yield' in msg: + # far end task is still streaming to us.. + log.warning(f'Remote stream deliverd {msg}') + # do disard + continue + + elif 'stop' in msg: + log.debug('Remote stream terminated') + continue + + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self._portal.channel) + + return self._result + async def started(self, value: Optional[Any] = None) -> None: if self._portal: @@ -347,6 +471,10 @@ class Context: await self.chan.send({'started': value, 'cid': self.cid}) + # TODO: do we need a restart api? + # async def restart(self) -> None: + # pass + def stream(func: Callable) -> Callable: """Mark an async function as a streaming routine with ``@stream``.