diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 512c432..8e8edf8 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -57,6 +57,9 @@ class Context: timeout quickly to sidestep 2-generals... """ + assert self._portal, ( + "No portal found, this is likely a callee side context") + cid = self.cid with trio.move_on_after(0.5) as cs: cs.shield = True @@ -95,7 +98,20 @@ def stream(func): """ func._tractor_stream_function = True sig = inspect.signature(func) - if 'ctx' not in sig.parameters: + params = sig.parameters + if 'stream' not in params and 'ctx' in params: + warnings.warn( + "`@tractr.stream decorated funcs should now declare a `stream` " + " arg, `ctx` is now designated for use with @tractor.context", + DeprecationWarning, + stacklevel=2, + ) + + if ( + 'ctx' not in params and + 'to_trio' not in params and + 'stream' not in params + ): raise TypeError( "The first argument to the stream function " f"{func.__name__} must be `ctx: tractor.Context`" @@ -120,7 +136,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: Context, rx_chan: trio.abc.ReceiveChannel, - portal: 'Portal', # noqa + portal: 'Portal', # type: ignore # noqa ) -> None: self._ctx = ctx self._rx_chan = rx_chan @@ -152,7 +168,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # raise the error message raise unpack_error(msg, self._portal.channel) - except trio.ClosedResourceError: + except (trio.ClosedResourceError, StopAsyncIteration): # XXX: this indicates that a `stop` message was # sent by the far side of the underlying channel. # Currently this is triggered by calling ``.aclose()`` on @@ -184,7 +200,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): @contextmanager def shield( self - ) -> Iterator['ReceiveStream']: # noqa + ) -> Iterator['ReceiveMsgStream']: # noqa """Shield this stream's underlying channel such that a local consumer task can be cancelled (and possibly restarted) using ``trio.Cancelled``. @@ -243,4 +259,4 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): self, data: Any ) -> None: - await self._chan.send({'yield': data, 'cid': self._cid}) + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})