Adjustments for non-frozen context dataclass change

bistream_backup
Tyler Goodlet 2021-06-13 20:21:49 -04:00
parent 3fa36f64ac
commit 8f468a8c86
1 changed files with 34 additions and 14 deletions

View File

@ -1,9 +1,13 @@
""" """
Messaging pattern APIs and helpers. Messaging pattern APIs and helpers.
NOTE: this module is likely deprecated by the new bi-directional streaming
support provided by ``tractor.Context.open_stream()`` and friends.
""" """
import inspect import inspect
import typing import typing
from typing import Dict, Any, Set, Callable from typing import Dict, Any, Set, Callable, List, Tuple
from functools import partial from functools import partial
from async_generator import aclosing from async_generator import aclosing
@ -20,7 +24,7 @@ log = get_logger('messaging')
async def fan_out_to_ctxs( async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: Dict[str, set], topics2ctxs: Dict[str, list],
packetizer: typing.Callable = None, packetizer: typing.Callable = None,
) -> None: ) -> None:
"""Request and fan out quotes to each subscribed actor channel. """Request and fan out quotes to each subscribed actor channel.
@ -34,24 +38,27 @@ async def fan_out_to_ctxs(
async for published in pub_gen: async for published in pub_gen:
ctx_payloads: Dict[str, Any] = {} ctx_payloads: List[Tuple[Context, Any]] = []
for topic, data in published.items(): for topic, data in published.items():
log.debug(f"publishing {topic, data}") log.debug(f"publishing {topic, data}")
# build a new dict packet or invoke provided packetizer # build a new dict packet or invoke provided packetizer
if packetizer is None: if packetizer is None:
packet = {topic: data} packet = {topic: data}
else: else:
packet = packetizer(topic, data) packet = packetizer(topic, data)
for ctx in topics2ctxs.get(topic, set()):
ctx_payloads.setdefault(ctx, {}).update(packet), for ctx in topics2ctxs.get(topic, list()):
ctx_payloads.append((ctx, packet))
if not ctx_payloads: if not ctx_payloads:
log.debug(f"Unconsumed values:\n{published}") log.debug(f"Unconsumed values:\n{published}")
# deliver to each subscriber (fan out) # deliver to each subscriber (fan out)
if ctx_payloads: if ctx_payloads:
for ctx, payload in ctx_payloads.items(): for ctx, payload in ctx_payloads:
try: try:
await ctx.send_yield(payload) await ctx.send_yield(payload)
except ( except (
@ -60,15 +67,24 @@ async def fan_out_to_ctxs(
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warning(f"{ctx.chan} went down?") log.warning(f"{ctx.chan} went down?")
for ctx_set in topics2ctxs.values(): for ctx_list in topics2ctxs.values():
ctx_set.discard(ctx) try:
ctx_list.remove(ctx)
except ValueError:
continue
if not get_topics(): if not get_topics():
log.warning(f"No subscribers left for {pub_gen}") log.warning(f"No subscribers left for {pub_gen}")
break break
def modify_subs(topics2ctxs, topics, ctx): def modify_subs(
topics2ctxs: Dict[str, List[Context]],
topics: Set[str],
ctx: Context,
) -> None:
"""Absolute symbol subscription list for each quote stream. """Absolute symbol subscription list for each quote stream.
Effectively a symbol subscription api. Effectively a symbol subscription api.
@ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx):
# update map from each symbol to requesting client's chan # update map from each symbol to requesting client's chan
for topic in topics: for topic in topics:
topics2ctxs.setdefault(topic, set()).add(ctx) topics2ctxs.setdefault(topic, list()).append(ctx)
# remove any existing symbol subscriptions if symbol is not # remove any existing symbol subscriptions if symbol is not
# found in ``symbols`` # found in ``symbols``
@ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx):
for topic in filter( for topic in filter(
lambda topic: topic not in topics, topics2ctxs.copy() lambda topic: topic not in topics, topics2ctxs.copy()
): ):
ctx_set = topics2ctxs.get(topic) ctx_list = topics2ctxs.get(topic)
ctx_set.discard(ctx) if ctx_list:
try:
ctx_list.remove(ctx)
except ValueError:
pass
if not ctx_set: if not ctx_list:
# pop empty sets which will trigger bg quoter task termination # pop empty sets which will trigger bg quoter task termination
topics2ctxs.pop(topic) topics2ctxs.pop(topic)
@ -256,7 +276,7 @@ def pub(
respawn = True respawn = True
finally: finally:
# remove all subs for this context # remove all subs for this context
modify_subs(topics2ctxs, (), ctx) modify_subs(topics2ctxs, set(), ctx)
# if there are truly no more subscriptions with this broker # if there are truly no more subscriptions with this broker
# drop from broker subs dict # drop from broker subs dict