From fbb6af47f8bec54e07fa7756f0c120f17ced2d66 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Jan 2019 12:19:01 -0500 Subject: [PATCH] Add a pub-sub messaging decorator API Add a draft pub-sub API `@tractor.msg.pub` which allows for decorating an asyn generator which can stream topic keyed dictionaries for delivery to multiple calling / consuming tasks. --- tractor/__init__.py | 2 + tractor/msg.py | 154 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 tractor/msg.py diff --git a/tractor/__init__.py b/tractor/__init__.py index 5ffcfd5..4214c2d 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -18,6 +18,7 @@ from ._actor import ( from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed +from . import msg __all__ = [ @@ -30,6 +31,7 @@ __all__ = [ 'MultiError', 'RemoteActorError', 'ModuleNotExposed', + 'msg' ] diff --git a/tractor/msg.py b/tractor/msg.py new file mode 100644 index 0000000..16c3982 --- /dev/null +++ b/tractor/msg.py @@ -0,0 +1,154 @@ +""" +Messaging pattern APIs and helpers. +""" +import typing +from typing import Dict +from functools import partial + +import trio +import wrapt + +from ._ipc import Context +from .log import get_logger +from . import current_actor + +__all__ = ['pub'] + +log = get_logger('messaging') + + +async def fan_out_to_ctxs( + pub_gen: typing.AsyncGenerator, + topics2ctxs: Dict[str, Context], + topic_key: str = 'key', +) -> None: + """Request and fan out quotes to each subscribed actor channel. + """ + def get_topics(): + return tuple(topics2ctxs.keys()) + + async for published in pub_gen( + get_topics=get_topics, + ): + ctx_payloads = {} + for packet_key, data in published.items(): + # grab each suscription topic using provided key for lookup + topic = data[topic_key] + # build a new dict packet for passing to multiple channels + packet = {packet_key: data} + for ctx in topics2ctxs.get(topic, set()): + ctx_payloads.setdefault(ctx, {}).update(packet), + + # deliver to each subscriber (fan out) + if ctx_payloads: + for ctx, payload in ctx_payloads.items(): + try: + await ctx.send_yield(payload) + except ( + # That's right, anything you can think of... + trio.ClosedStreamError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warn(f"{ctx.chan} went down?") + for ctx_set in topics2ctxs.values(): + ctx_set.discard(ctx) + + if not any(topics2ctxs.values()): + log.warn(f"No subscribers left for {pub_gen.__name__}") + break + + +def modify_subs(topics2ctxs, topics, ctx): + """Absolute symbol subscription list for each quote stream. + + Effectively a symbol subscription api. + """ + log.info(f"{ctx.chan} changed subscription to {topics}") + + # update map from each symbol to requesting client's chan + for ticker in topics: + topics2ctxs.setdefault(ticker, set()).add(ctx) + + # remove any existing symbol subscriptions if symbol is not + # found in ``symbols`` + # TODO: this can likely be factored out into the pub-sub api + for ticker in filter( + lambda topic: topic not in topics, topics2ctxs.copy() + ): + ctx_set = topics2ctxs.get(ticker) + ctx_set.discard(ctx) + + if not ctx_set: + # pop empty sets which will trigger bg quoter task termination + topics2ctxs.pop(ticker) + + +def pub(*, tasks=()): + """Publisher async generator decorator. + + A publisher can be called many times from different actor's + remote tasks but will only spawn one internal task to deliver + values to all callers. Values yielded from the decorated + async generator are sent back to each calling task, filtered by + topic on the producer (server) side. + + Must be called with a topic as the first arg. + """ + task2lock = {} + for name in tasks: + task2lock[name] = trio.StrictFIFOLock() + + @wrapt.decorator + async def wrapper(agen, instance, args, kwargs): + if tasks: + task_key = kwargs.pop('task_key') + if not task_key: + raise TypeError( + f"{agen} must be called with a `task_key` named argument " + f"with a falue from {tasks}") + + # pop required kwargs used internally + ctx = kwargs.pop('ctx') + topics = kwargs.pop('topics') + topic_key = kwargs.pop('topic_key') + + lock = task2lock[task_key] + ss = current_actor().statespace + all_subs = ss.setdefault('_subs', {}) + topics2ctxs = all_subs.setdefault(task_key, {}) + + try: + modify_subs(topics2ctxs, topics, ctx) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # we'll take over and spawn it again + # update map from each symbol to requesting client's chan + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info(f"Spawning data feed task for {agen.__name__}") + try: + # unblocks when no more symbols subscriptions exist + # and the streamer task terminates + await fan_out_to_ctxs( + pub_gen=partial(agen, *args, **kwargs), + topics2ctxs=topics2ctxs, + topic_key=topic_key, + ) + log.info(f"Terminating stream task for {task_key}" + f" for {agen.__name__}") + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + finally: + # remove all subs for this context + modify_subs(topics2ctxs, (), ctx) + + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(topics2ctxs.values()): + log.info(f"No more subscriptions for publisher {task_key}") + + return wrapper