forked from goodboy/tractor
1
0
Fork 0

Validate stream functions at decorate time

stream_functions
Tyler Goodlet 2019-03-29 19:10:32 -04:00
parent 5c0ae47cf5
commit f885b02c73
3 changed files with 44 additions and 44 deletions

View File

@ -41,26 +41,16 @@ async def _invoke(
kwargs: Dict[str, Any], kwargs: Dict[str, Any],
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
"""Invoke local func and return results over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
sig = inspect.signature(func)
treat_as_gen = False treat_as_gen = False
cs = None cs = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, cancel_scope) ctx = Context(chan, cid, cancel_scope)
_context.set(ctx) _context.set(ctx)
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
if 'ctx' not in sig.parameters: # handle decorated ``@tractor.stream`` async functions
raise TypeError(
"The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `ctx` kwarg in its
# signature will be treated as one.
treat_as_gen = True treat_as_gen = True
try: try:
is_async_partial = False is_async_partial = False

View File

@ -1,3 +1,4 @@
import inspect
from contextvars import ContextVar from contextvars import ContextVar
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
@ -39,4 +40,10 @@ def stream(func):
"""Mark an async function as a streaming routine. """Mark an async function as a streaming routine.
""" """
func._tractor_stream_function = True func._tractor_stream_function = True
sig = inspect.signature(func)
if 'ctx' not in sig.parameters:
raise TypeError(
"The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func return func

View File

@ -12,7 +12,7 @@ import wrapt
from .log import get_logger from .log import get_logger
from . import current_actor from . import current_actor
from ._streaming import Context, stream from ._streaming import Context
__all__ = ['pub'] __all__ = ['pub']
@ -97,29 +97,32 @@ def pub(
): ):
"""Publisher async generator decorator. """Publisher async generator decorator.
A publisher can be called multiple times from different actors A publisher can be called multiple times from different actors but
but will only spawn a finite set of internal tasks to stream values will only spawn a finite set of internal tasks to stream values to
to each caller. The ``tasks` argument to the decorator (``Set[str]``) each caller. The ``tasks: Set[str]`` argument to the decorator
specifies the names of the mutex set of publisher tasks. specifies the names of the mutex set of publisher tasks. When the
When the publisher function is called, an argument ``task_name`` must be publisher function is called, an argument ``task_name`` must be
passed to specify which task (of the set named in ``tasks``) should be passed to specify which task (of the set named in ``tasks``) should
used. This allows for using the same publisher with different input be used. This allows for using the same publisher with different
(arguments) without allowing more concurrent tasks then necessary. input (arguments) without allowing more concurrent tasks then
necessary.
Values yielded from the decorated async generator Values yielded from the decorated async generator must be
must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic
topic string an determines which subscription the packet will be delivered string and determines which subscription the packet will be
to and the value is a packet ``Dict[str, Any]`` by default of the form: delivered to and the value is a packet ``Dict[str, Any]`` by default
of the form:
.. ::python .. ::python
{topic: value} {topic: str: value: Any}
The caller can instead opt to pass a ``packetizer`` callback who's return The caller can instead opt to pass a ``packetizer`` callback who's
value will be delivered as the published response. return value will be delivered as the published response.
The decorated function must *accept* an argument :func:`get_topics` which The decorated async generator function must accept an argument
dynamically returns the tuple of current subscriber topics: :func:`get_topics` which dynamically returns the tuple of current
subscriber topics:
.. code:: python .. code:: python
@ -162,15 +165,15 @@ def pub(
print(f"Subscriber received {value}") print(f"Subscriber received {value}")
Here, you don't need to provide the ``ctx`` argument since the remote actor Here, you don't need to provide the ``ctx`` argument since the
provides it automatically to the spawned task. If you were to call remote actor provides it automatically to the spawned task. If you
``pub_service()`` directly from a wrapping function you would need to were to call ``pub_service()`` directly from a wrapping function you
provide this explicitly. would need to provide this explicitly.
Remember you only need this if you need *a finite set of tasks* running in Remember you only need this if you need *a finite set of tasks*
a single actor to stream data to an arbitrary number of subscribers. If you running in a single actor to stream data to an arbitrary number of
are ok to have a new task running for every call to ``pub_service()`` then subscribers. If you are ok to have a new task running for every call
probably don't need this. to ``pub_service()`` then probably don't need this.
""" """
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
@ -181,10 +184,7 @@ def pub(
for name in tasks: for name in tasks:
task2lock[name] = trio.StrictFIFOLock() task2lock[name] = trio.StrictFIFOLock()
async def takes_ctx(get_topics, ctx=None): @wrapt.decorator
pass
@wrapt.decorator(adapter=takes_ctx)
async def wrapper(agen, instance, args, kwargs): async def wrapper(agen, instance, args, kwargs):
# this is used to extract arguments properly as per # this is used to extract arguments properly as per
# the `wrapt` docs # the `wrapt` docs
@ -249,7 +249,6 @@ def pub(
# invoke it # invoke it
await _execute(*args, **kwargs) await _execute(*args, **kwargs)
funcname = wrapped.__name__ funcname = wrapped.__name__
if not inspect.isasyncgenfunction(wrapped): if not inspect.isasyncgenfunction(wrapped):
raise TypeError( raise TypeError(
@ -261,4 +260,8 @@ def pub(
"`get_topics` argument" "`get_topics` argument"
) )
return wrapper(stream(wrapped)) # XXX: manually monkey the wrapped function since
# ``wrapt.decorator`` doesn't seem to want to play nice with its
# whole "adapter" thing...
wrapped._tractor_stream_function = True # type: ignore
return wrapper(wrapped)