diff --git a/tractor/_actor.py b/tractor/_actor.py index 52b301f..3f2a89c 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -41,26 +41,16 @@ async def _invoke( kwargs: Dict[str, Any], task_status=trio.TASK_STATUS_IGNORED ): - """Invoke local func and return results over provided channel. + """Invoke local func and deliver result(s) over provided channel. """ - sig = inspect.signature(func) treat_as_gen = False cs = None cancel_scope = trio.CancelScope() ctx = Context(chan, cid, cancel_scope) _context.set(ctx) if getattr(func, '_tractor_stream_function', False): - if 'ctx' not in sig.parameters: - raise TypeError( - "The first argument to the stream function " - f"{func.__name__} must be `ctx: tractor.Context`" - ) + # handle decorated ``@tractor.stream`` async functions kwargs['ctx'] = ctx - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `ctx` kwarg in its - # signature will be treated as one. treat_as_gen = True try: is_async_partial = False diff --git a/tractor/_streaming.py b/tractor/_streaming.py index df7a783..9ed0f14 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,3 +1,4 @@ +import inspect from contextvars import ContextVar from dataclasses import dataclass from typing import Any @@ -39,4 +40,10 @@ def stream(func): """Mark an async function as a streaming routine. """ func._tractor_stream_function = True + sig = inspect.signature(func) + if 'ctx' not in sig.parameters: + raise TypeError( + "The first argument to the stream function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) return func diff --git a/tractor/msg.py b/tractor/msg.py index 59842ef..cd064a0 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -12,7 +12,7 @@ import wrapt from .log import get_logger from . import current_actor -from ._streaming import Context, stream +from ._streaming import Context __all__ = ['pub'] @@ -97,29 +97,32 @@ def pub( ): """Publisher async generator decorator. - A publisher can be called multiple times from different actors - but will only spawn a finite set of internal tasks to stream values - to each caller. The ``tasks` argument to the decorator (``Set[str]``) - specifies the names of the mutex set of publisher tasks. - When the publisher function is called, an argument ``task_name`` must be - passed to specify which task (of the set named in ``tasks``) should be - used. This allows for using the same publisher with different input - (arguments) without allowing more concurrent tasks then necessary. + A publisher can be called multiple times from different actors but + will only spawn a finite set of internal tasks to stream values to + each caller. The ``tasks: Set[str]`` argument to the decorator + specifies the names of the mutex set of publisher tasks. When the + publisher function is called, an argument ``task_name`` must be + passed to specify which task (of the set named in ``tasks``) should + be used. This allows for using the same publisher with different + input (arguments) without allowing more concurrent tasks then + necessary. - Values yielded from the decorated async generator - must be ``Dict[str, Dict[str, Any]]`` where the fist level key is the - topic string an determines which subscription the packet will be delivered - to and the value is a packet ``Dict[str, Any]`` by default of the form: + Values yielded from the decorated async generator must be + ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic + string and determines which subscription the packet will be + delivered to and the value is a packet ``Dict[str, Any]`` by default + of the form: .. ::python - {topic: value} + {topic: str: value: Any} - The caller can instead opt to pass a ``packetizer`` callback who's return - value will be delivered as the published response. + The caller can instead opt to pass a ``packetizer`` callback who's + return value will be delivered as the published response. - The decorated function must *accept* an argument :func:`get_topics` which - dynamically returns the tuple of current subscriber topics: + The decorated async generator function must accept an argument + :func:`get_topics` which dynamically returns the tuple of current + subscriber topics: .. code:: python @@ -162,15 +165,15 @@ def pub( print(f"Subscriber received {value}") - Here, you don't need to provide the ``ctx`` argument since the remote actor - provides it automatically to the spawned task. If you were to call - ``pub_service()`` directly from a wrapping function you would need to - provide this explicitly. + Here, you don't need to provide the ``ctx`` argument since the + remote actor provides it automatically to the spawned task. If you + were to call ``pub_service()`` directly from a wrapping function you + would need to provide this explicitly. - Remember you only need this if you need *a finite set of tasks* running in - a single actor to stream data to an arbitrary number of subscribers. If you - are ok to have a new task running for every call to ``pub_service()`` then - probably don't need this. + Remember you only need this if you need *a finite set of tasks* + running in a single actor to stream data to an arbitrary number of + subscribers. If you are ok to have a new task running for every call + to ``pub_service()`` then probably don't need this. """ # handle the decorator not called with () case if wrapped is None: @@ -181,10 +184,7 @@ def pub( for name in tasks: task2lock[name] = trio.StrictFIFOLock() - async def takes_ctx(get_topics, ctx=None): - pass - - @wrapt.decorator(adapter=takes_ctx) + @wrapt.decorator async def wrapper(agen, instance, args, kwargs): # this is used to extract arguments properly as per # the `wrapt` docs @@ -249,7 +249,6 @@ def pub( # invoke it await _execute(*args, **kwargs) - funcname = wrapped.__name__ if not inspect.isasyncgenfunction(wrapped): raise TypeError( @@ -261,4 +260,8 @@ def pub( "`get_topics` argument" ) - return wrapper(stream(wrapped)) + # XXX: manually monkey the wrapped function since + # ``wrapt.decorator`` doesn't seem to want to play nice with its + # whole "adapter" thing... + wrapped._tractor_stream_function = True # type: ignore + return wrapper(wrapped)