forked from goodboy/tractor
1
0
Fork 0

Adjustments for non-frozen context dataclass change

transport_hardening
Tyler Goodlet 2021-06-13 20:21:49 -04:00
parent fc8d02f963
commit d896d84b28
1 changed files with 34 additions and 14 deletions

View File

@ -1,9 +1,13 @@
"""
Messaging pattern APIs and helpers.
NOTE: this module is likely deprecated by the new bi-directional streaming
support provided by ``tractor.Context.open_stream()`` and friends.
"""
import inspect
import typing
from typing import Dict, Any, Set, Callable
from typing import Dict, Any, Set, Callable, List, Tuple
from functools import partial
from async_generator import aclosing
@ -20,7 +24,7 @@ log = get_logger('messaging')
async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: Dict[str, set],
topics2ctxs: Dict[str, list],
packetizer: typing.Callable = None,
) -> None:
"""Request and fan out quotes to each subscribed actor channel.
@ -34,24 +38,27 @@ async def fan_out_to_ctxs(
async for published in pub_gen:
ctx_payloads: Dict[str, Any] = {}
ctx_payloads: List[Tuple[Context, Any]] = []
for topic, data in published.items():
log.debug(f"publishing {topic, data}")
# build a new dict packet or invoke provided packetizer
if packetizer is None:
packet = {topic: data}
else:
packet = packetizer(topic, data)
for ctx in topics2ctxs.get(topic, set()):
ctx_payloads.setdefault(ctx, {}).update(packet),
for ctx in topics2ctxs.get(topic, list()):
ctx_payloads.append((ctx, packet))
if not ctx_payloads:
log.debug(f"Unconsumed values:\n{published}")
# deliver to each subscriber (fan out)
if ctx_payloads:
for ctx, payload in ctx_payloads.items():
for ctx, payload in ctx_payloads:
try:
await ctx.send_yield(payload)
except (
@ -60,15 +67,24 @@ async def fan_out_to_ctxs(
ConnectionRefusedError,
):
log.warning(f"{ctx.chan} went down?")
for ctx_set in topics2ctxs.values():
ctx_set.discard(ctx)
for ctx_list in topics2ctxs.values():
try:
ctx_list.remove(ctx)
except ValueError:
continue
if not get_topics():
log.warning(f"No subscribers left for {pub_gen}")
break
def modify_subs(topics2ctxs, topics, ctx):
def modify_subs(
topics2ctxs: Dict[str, List[Context]],
topics: Set[str],
ctx: Context,
) -> None:
"""Absolute symbol subscription list for each quote stream.
Effectively a symbol subscription api.
@ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx):
# update map from each symbol to requesting client's chan
for topic in topics:
topics2ctxs.setdefault(topic, set()).add(ctx)
topics2ctxs.setdefault(topic, list()).append(ctx)
# remove any existing symbol subscriptions if symbol is not
# found in ``symbols``
@ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx):
for topic in filter(
lambda topic: topic not in topics, topics2ctxs.copy()
):
ctx_set = topics2ctxs.get(topic)
ctx_set.discard(ctx)
ctx_list = topics2ctxs.get(topic)
if ctx_list:
try:
ctx_list.remove(ctx)
except ValueError:
pass
if not ctx_set:
if not ctx_list:
# pop empty sets which will trigger bg quoter task termination
topics2ctxs.pop(topic)
@ -256,7 +276,7 @@ def pub(
respawn = True
finally:
# remove all subs for this context
modify_subs(topics2ctxs, (), ctx)
modify_subs(topics2ctxs, set(), ctx)
# if there are truly no more subscriptions with this broker
# drop from broker subs dict