From 92b540d518ac11a6e0eacb27342737b78a85f01f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Dec 2021 19:19:53 -0500 Subject: [PATCH] Add internal msg stream backpressure controls In preparation for supporting both backpressure detection (through an optional error) as well as control over the msg channel buffer size, add internal configuration flags for both to contexts. Also adjust `Context._err_on_from_remote_msg()` -> `._maybe..` such that it can be called and will only raise if a scope nursery has been set. Add a `Context._error` for stashing the remote task's error that may be delivered in an `'error'` message. --- tractor/_streaming.py | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 79cc956..11a268a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -190,7 +190,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # was it shouldn't matter since it's unlikely a user # will try to re-use a stream after attemping to close # it). - await self._ctx.send_stop() + with trio.CancelScope(shield=True): + await self._ctx.send_stop() except ( trio.BrokenResourceError, @@ -283,11 +284,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): class MsgStream(ReceiveMsgStream, trio.abc.Channel): - """ + ''' Bidirectional message stream for use within an inter-actor actor ``Context```. - """ + ''' async def send( self, data: Any @@ -297,6 +298,8 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' # if self._eoc: # raise trio.ClosedResourceError('This stream is already ded') + # if self._ctx._error: + # raise self._ctx._error await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -330,6 +333,7 @@ class Context: # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa _result: Optional[Any] = False + _error: Optional[BaseException] = None # status flags _cancel_called: bool = False @@ -340,6 +344,8 @@ class Context: # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None + _backpressure: bool = False + async def send_yield(self, data: Any) -> None: warnings.warn( @@ -353,23 +359,28 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - def _error_from_remote_msg( + def _maybe_error_from_remote_msg( self, msg: Dict[str, Any], ) -> None: - '''Unpack and raise a msg error into the local scope + ''' + Unpack and raise a msg error into the local scope nursery for this context. Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. ''' - assert self._scope_nursery + self._error = unpack_error(msg, self.chan) - async def raiser(): - raise unpack_error(msg, self.chan) + if self._scope_nursery: - self._scope_nursery.start_soon(raiser) + async def raiser(): + __tracebackhide__ = True + raise self._error + + if not self._scope_nursery.cancel_scope.cancel_called: + self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: '''Cancel this inter-actor-task context. @@ -433,6 +444,8 @@ class Context: async def open_stream( self, + backpressure: bool = False, + msg_buffer_size: Optional[int] = None, ) -> AsyncGenerator[MsgStream, None]: ''' @@ -482,7 +495,9 @@ class Context: ctx = actor.get_context( self.chan, self.cid, + msg_buffer_size=msg_buffer_size, ) + ctx._backpressure = backpressure assert ctx is self # XXX: If the underlying channel feeder receive mem chan has