diff --git a/tractor/msg.py b/tractor/msg.py index 560e644..28e3405 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -1,9 +1,13 @@ """ Messaging pattern APIs and helpers. + +NOTE: this module is likely deprecated by the new bi-directional streaming +support provided by ``tractor.Context.open_stream()`` and friends. + """ import inspect import typing -from typing import Dict, Any, Set, Callable +from typing import Dict, Any, Set, Callable, List, Tuple from functools import partial from async_generator import aclosing @@ -20,7 +24,7 @@ log = get_logger('messaging') async def fan_out_to_ctxs( pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy - topics2ctxs: Dict[str, set], + topics2ctxs: Dict[str, list], packetizer: typing.Callable = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. @@ -34,24 +38,27 @@ async def fan_out_to_ctxs( async for published in pub_gen: - ctx_payloads: Dict[str, Any] = {} + ctx_payloads: List[Tuple[Context, Any]] = [] for topic, data in published.items(): log.debug(f"publishing {topic, data}") + # build a new dict packet or invoke provided packetizer if packetizer is None: packet = {topic: data} + else: packet = packetizer(topic, data) - for ctx in topics2ctxs.get(topic, set()): - ctx_payloads.setdefault(ctx, {}).update(packet), + + for ctx in topics2ctxs.get(topic, list()): + ctx_payloads.append((ctx, packet)) if not ctx_payloads: log.debug(f"Unconsumed values:\n{published}") # deliver to each subscriber (fan out) if ctx_payloads: - for ctx, payload in ctx_payloads.items(): + for ctx, payload in ctx_payloads: try: await ctx.send_yield(payload) except ( @@ -60,15 +67,24 @@ async def fan_out_to_ctxs( ConnectionRefusedError, ): log.warning(f"{ctx.chan} went down?") - for ctx_set in topics2ctxs.values(): - ctx_set.discard(ctx) + for ctx_list in topics2ctxs.values(): + try: + ctx_list.remove(ctx) + except ValueError: + continue if not get_topics(): log.warning(f"No subscribers left for {pub_gen}") break -def modify_subs(topics2ctxs, topics, ctx): +def modify_subs( + + topics2ctxs: Dict[str, List[Context]], + topics: Set[str], + ctx: Context, + +) -> None: """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. @@ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx): # update map from each symbol to requesting client's chan for topic in topics: - topics2ctxs.setdefault(topic, set()).add(ctx) + topics2ctxs.setdefault(topic, list()).append(ctx) # remove any existing symbol subscriptions if symbol is not # found in ``symbols`` @@ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx): for topic in filter( lambda topic: topic not in topics, topics2ctxs.copy() ): - ctx_set = topics2ctxs.get(topic) - ctx_set.discard(ctx) + ctx_list = topics2ctxs.get(topic) + if ctx_list: + try: + ctx_list.remove(ctx) + except ValueError: + pass - if not ctx_set: + if not ctx_list: # pop empty sets which will trigger bg quoter task termination topics2ctxs.pop(topic) @@ -256,7 +276,7 @@ def pub( respawn = True finally: # remove all subs for this context - modify_subs(topics2ctxs, (), ctx) + modify_subs(topics2ctxs, set(), ctx) # if there are truly no more subscriptions with this broker # drop from broker subs dict