From b826ec81032d55565e5539230e0b687a270128c0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Dec 2021 10:57:58 -0500 Subject: [PATCH] Better idea, enable backpressure on opened streams Keeping it disabled on context open will help with detecting any stream connection which was never opened on one side of the task pair. In that case we can report that there was an overrun **and** a stream wasn't opened versus if the stream is explicitly configured not to use bp then we throw the standard overflow. Use `trio.Nursery._closed` to detect "closure" XD since it seems to be the most reliable way to determine if a spawn call will trigger a runtime error. --- tractor/_streaming.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 11a268a..6c57065 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -298,8 +298,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' # if self._eoc: # raise trio.ClosedResourceError('This stream is already ded') - # if self._ctx._error: - # raise self._ctx._error + + if self._ctx._error: + raise self._ctx._error # from None await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -309,6 +310,10 @@ class Context: ''' An inter-actor, ``trio`` task communication context. + NB: This class should never be instatiated directly, it is delivered + by either runtime machinery to a remotely started task or by entering + ``Portal.open_context()``. + Allows maintaining task or protocol specific state between 2 communicating actor tasks. A unique context is created on the callee side/end for every request to a remote actor from a portal. @@ -370,20 +375,26 @@ class Context: Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. + ''' self._error = unpack_error(msg, self.chan) + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? if self._scope_nursery: async def raiser(): - __tracebackhide__ = True - raise self._error + raise self._error from None - if not self._scope_nursery.cancel_scope.cancel_called: + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + if not self._scope_nursery._closed: # type: ignore self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: - '''Cancel this inter-actor-task context. + ''' + Cancel this inter-actor-task context. Request that the far side cancel it's current linked context, Timeout quickly in an attempt to sidestep 2-generals... @@ -444,7 +455,7 @@ class Context: async def open_stream( self, - backpressure: bool = False, + backpressure: Optional[bool] = True, msg_buffer_size: Optional[int] = None, ) -> AsyncGenerator[MsgStream, None]: @@ -555,7 +566,7 @@ class Context: try: self._result = msg['return'] break - except KeyError: + except KeyError as msgerr: if 'yield' in msg: # far end task is still streaming to us so discard @@ -569,7 +580,10 @@ class Context: # internal error should never get here assert msg.get('cid'), ( "Received internal error at portal?") - raise unpack_error(msg, self._portal.channel) + + raise unpack_error( + msg, self._portal.channel + ) from msgerr return self._result