diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 79cc956..11a268a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -190,7 +190,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # was it shouldn't matter since it's unlikely a user # will try to re-use a stream after attemping to close # it). - await self._ctx.send_stop() + with trio.CancelScope(shield=True): + await self._ctx.send_stop() except ( trio.BrokenResourceError, @@ -283,11 +284,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): class MsgStream(ReceiveMsgStream, trio.abc.Channel): - """ + ''' Bidirectional message stream for use within an inter-actor actor ``Context```. - """ + ''' async def send( self, data: Any @@ -297,6 +298,8 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' # if self._eoc: # raise trio.ClosedResourceError('This stream is already ded') + # if self._ctx._error: + # raise self._ctx._error await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -330,6 +333,7 @@ class Context: # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa _result: Optional[Any] = False + _error: Optional[BaseException] = None # status flags _cancel_called: bool = False @@ -340,6 +344,8 @@ class Context: # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None + _backpressure: bool = False + async def send_yield(self, data: Any) -> None: warnings.warn( @@ -353,23 +359,28 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - def _error_from_remote_msg( + def _maybe_error_from_remote_msg( self, msg: Dict[str, Any], ) -> None: - '''Unpack and raise a msg error into the local scope + ''' + Unpack and raise a msg error into the local scope nursery for this context. Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. ''' - assert self._scope_nursery + self._error = unpack_error(msg, self.chan) - async def raiser(): - raise unpack_error(msg, self.chan) + if self._scope_nursery: - self._scope_nursery.start_soon(raiser) + async def raiser(): + __tracebackhide__ = True + raise self._error + + if not self._scope_nursery.cancel_scope.cancel_called: + self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: '''Cancel this inter-actor-task context. @@ -433,6 +444,8 @@ class Context: async def open_stream( self, + backpressure: bool = False, + msg_buffer_size: Optional[int] = None, ) -> AsyncGenerator[MsgStream, None]: ''' @@ -482,7 +495,9 @@ class Context: ctx = actor.get_context( self.chan, self.cid, + msg_buffer_size=msg_buffer_size, ) + ctx._backpressure = backpressure assert ctx is self # XXX: If the underlying channel feeder receive mem chan has