From 3d0de25f9368ec030e619ded372b089e129b8970 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jan 2019 00:10:13 -0500 Subject: [PATCH] Do proper `wrapt` arg extraction for type checking Use an inner function / closure to properly process required arguments at call time as is recommended in the `wrap` docs. Do async gen and arg introspection at decorate time and raise appropriate type errors. --- tractor/msg.py | 119 +++++++++++++++++++++++++++++-------------------- 1 file changed, 71 insertions(+), 48 deletions(-) diff --git a/tractor/msg.py b/tractor/msg.py index 53c36fd..ebed198 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -1,8 +1,9 @@ """ Messaging pattern APIs and helpers. """ +import inspect import typing -from typing import Dict, Any, Set, Union +from typing import Dict, Any, Set, Union, Callable from functools import partial from async_generator import aclosing @@ -11,6 +12,7 @@ import wrapt from .log import get_logger from . import current_actor +from ._ipc import Context __all__ = ['pub'] @@ -181,58 +183,79 @@ def pub( @wrapt.decorator(adapter=takes_ctx) async def wrapper(agen, instance, args, kwargs): - task_name = None - if tasks: - try: - task_name = kwargs.pop('task_name') - except KeyError: + # this is used to extract arguments properly as per + # the `wrapt` docs + async def _execute( + ctx: Context, + topics: Set[str], + *args, + # *, + task_name: str = None, + packetizer: Callable = None, + **kwargs, + ): + if tasks and task_name is None: raise TypeError( - f"{agen} must be called with a `task_name` named argument " - f"with a falue from {tasks}") + f"{agen} must be called with a `task_name` named " + f"argument with a falue from {tasks}") - # pop required kwargs used internally - ctx = kwargs.pop('ctx') - topics = kwargs.pop('topics') - packetizer = kwargs.pop('packetizer', None) + ss = current_actor().statespace + lockmap = ss.setdefault('_pubtask2lock', task2lock) + lock = lockmap[task_name] - ss = current_actor().statespace - lockmap = ss.setdefault('_pubtask2lock', task2lock) - lock = lockmap[task_name] + all_subs = ss.setdefault('_subs', {}) + topics2ctxs = all_subs.setdefault(task_name, {}) - all_subs = ss.setdefault('_subs', {}) - topics2ctxs = all_subs.setdefault(task_name, {}) + try: + modify_subs(topics2ctxs, topics, ctx) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # the next waiting task will take over and spawn it again + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info( + f"Spawning data feed task for {funcname}") + try: + # unblocks when no more symbols subscriptions exist + # and the streamer task terminates + await fan_out_to_ctxs( + pub_async_gen_func=partial( + agen, *args, **kwargs), + topics2ctxs=topics2ctxs, + packetizer=packetizer, + ) + log.info( + f"Terminating stream task {task_name or ''}" + f" for {agen.__name__}") + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + finally: + # remove all subs for this context + modify_subs(topics2ctxs, (), ctx) - try: - modify_subs(topics2ctxs, topics, ctx) - # block and let existing feed task deliver - # stream data until it is cancelled in which case - # the next waiting task will take over and spawn it again - async with lock: - # no data feeder task yet; so start one - respawn = True - while respawn: - respawn = False - log.info(f"Spawning data feed task for {agen.__name__}") - try: - # unblocks when no more symbols subscriptions exist - # and the streamer task terminates - await fan_out_to_ctxs( - pub_async_gen_func=partial(agen, *args, **kwargs), - topics2ctxs=topics2ctxs, - packetizer=packetizer, - ) - log.info(f"Terminating stream task {task_name or ''}" - f" for {agen.__name__}") - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True - finally: - # remove all subs for this context - modify_subs(topics2ctxs, (), ctx) + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(topics2ctxs.values()): + log.info( + f"No more subscriptions for publisher {task_name}") - # if there are truly no more subscriptions with this broker - # drop from broker subs dict - if not any(topics2ctxs.values()): - log.info(f"No more subscriptions for publisher {task_name}") + # invoke it + await _execute(*args, **kwargs) + + + funcname = wrapped.__name__ + if not inspect.isasyncgenfunction(wrapped): + raise TypeError( + f"Publisher {funcname} must be an async generator function" + ) + if 'get_topics' not in inspect.signature(wrapped).parameters: + raise TypeError( + f"Publisher async gen {funcname} must define a " + "`get_topics` argument" + ) return wrapper(wrapped)