forked from goodboy/tractor
1
0
Fork 0

Add a warning for soon to be deprecated `ctx` use in `@stream` func

stream_contexts
Tyler Goodlet 2021-04-28 08:39:50 -04:00
parent 36251357b3
commit 80c96cab01
1 changed files with 21 additions and 5 deletions

View File

@ -57,6 +57,9 @@ class Context:
timeout quickly to sidestep 2-generals... timeout quickly to sidestep 2-generals...
""" """
assert self._portal, (
"No portal found, this is likely a callee side context")
cid = self.cid cid = self.cid
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
@ -95,7 +98,20 @@ def stream(func):
""" """
func._tractor_stream_function = True func._tractor_stream_function = True
sig = inspect.signature(func) sig = inspect.signature(func)
if 'ctx' not in sig.parameters: params = sig.parameters
if 'stream' not in params and 'ctx' in params:
warnings.warn(
"`@tractr.stream decorated funcs should now declare a `stream` "
" arg, `ctx` is now designated for use with @tractor.context",
DeprecationWarning,
stacklevel=2,
)
if (
'ctx' not in params and
'to_trio' not in params and
'stream' not in params
):
raise TypeError( raise TypeError(
"The first argument to the stream function " "The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`" f"{func.__name__} must be `ctx: tractor.Context`"
@ -120,7 +136,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self, self,
ctx: Context, ctx: Context,
rx_chan: trio.abc.ReceiveChannel, rx_chan: trio.abc.ReceiveChannel,
portal: 'Portal', # noqa portal: 'Portal', # type: ignore # noqa
) -> None: ) -> None:
self._ctx = ctx self._ctx = ctx
self._rx_chan = rx_chan self._rx_chan = rx_chan
@ -152,7 +168,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# raise the error message # raise the error message
raise unpack_error(msg, self._portal.channel) raise unpack_error(msg, self._portal.channel)
except trio.ClosedResourceError: except (trio.ClosedResourceError, StopAsyncIteration):
# XXX: this indicates that a `stop` message was # XXX: this indicates that a `stop` message was
# sent by the far side of the underlying channel. # sent by the far side of the underlying channel.
# Currently this is triggered by calling ``.aclose()`` on # Currently this is triggered by calling ``.aclose()`` on
@ -184,7 +200,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
@contextmanager @contextmanager
def shield( def shield(
self self
) -> Iterator['ReceiveStream']: # noqa ) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task """Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``. can be cancelled (and possibly restarted) using ``trio.Cancelled``.
@ -243,4 +259,4 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
self, self,
data: Any data: Any
) -> None: ) -> None:
await self._chan.send({'yield': data, 'cid': self._cid}) await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})