diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 11a268a..6c57065 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -298,8 +298,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' # if self._eoc: # raise trio.ClosedResourceError('This stream is already ded') - # if self._ctx._error: - # raise self._ctx._error + + if self._ctx._error: + raise self._ctx._error # from None await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -309,6 +310,10 @@ class Context: ''' An inter-actor, ``trio`` task communication context. + NB: This class should never be instatiated directly, it is delivered + by either runtime machinery to a remotely started task or by entering + ``Portal.open_context()``. + Allows maintaining task or protocol specific state between 2 communicating actor tasks. A unique context is created on the callee side/end for every request to a remote actor from a portal. @@ -370,20 +375,26 @@ class Context: Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. + ''' self._error = unpack_error(msg, self.chan) + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? if self._scope_nursery: async def raiser(): - __tracebackhide__ = True - raise self._error + raise self._error from None - if not self._scope_nursery.cancel_scope.cancel_called: + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + if not self._scope_nursery._closed: # type: ignore self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: - '''Cancel this inter-actor-task context. + ''' + Cancel this inter-actor-task context. Request that the far side cancel it's current linked context, Timeout quickly in an attempt to sidestep 2-generals... @@ -444,7 +455,7 @@ class Context: async def open_stream( self, - backpressure: bool = False, + backpressure: Optional[bool] = True, msg_buffer_size: Optional[int] = None, ) -> AsyncGenerator[MsgStream, None]: @@ -555,7 +566,7 @@ class Context: try: self._result = msg['return'] break - except KeyError: + except KeyError as msgerr: if 'yield' in msg: # far end task is still streaming to us so discard @@ -569,7 +580,10 @@ class Context: # internal error should never get here assert msg.get('cid'), ( "Received internal error at portal?") - raise unpack_error(msg, self._portal.channel) + + raise unpack_error( + msg, self._portal.channel + ) from msgerr return self._result