Generalize the publisher/fan-out system

Start working toward a more general (on-demand) pub-sub system which
can be brought into ``tractor``. Right now this just means making
the code in the `fan_out_to_ctxs()` less specific but, eventually
I think this function should be coupled with a decorator and shipped
as a standard "message pattern".

Additionally,
- try out making `BrokerFeed` a `@dataclass`
- strip out all the `trio.Event` / uneeded nursery / extra task crap
  from `start_quote_stream()`
kivy_mainline_and_py3.8
Tyler Goodlet 2019-01-14 21:23:49 -05:00
parent c94ce47aa6
commit 22670afe58
1 changed files with 103 additions and 118 deletions

View File

@ -3,12 +3,13 @@ Live data feed machinery
""" """
import time import time
from functools import partial from functools import partial
from dataclasses import dataclass, field
from itertools import cycle from itertools import cycle
import socket import socket
import json import json
from types import ModuleType from types import ModuleType
import typing import typing
from typing import Coroutine, Callable, Dict, List, Any from typing import Coroutine, Callable, Dict, List, Any, Tuple
import contextlib import contextlib
from operator import itemgetter from operator import itemgetter
@ -44,8 +45,9 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
async def stream_quotes( async def stream_quotes(
get_topics: typing.Callable,
get_quotes: Coroutine,
brokermod: ModuleType, brokermod: ModuleType,
request_quotes: Coroutine,
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None: ) -> None:
@ -58,8 +60,14 @@ async def stream_quotes(
sleeptime = round(1. / rate, 3) sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching _cache = {} # ticker to quote caching
while True: # use an event here to trigger exit? async def request_quotes():
"""Get quotes for current symbol subscription set.
"""
symbols = get_topics()
# subscription can be changed at any time
return await get_quotes(symbols) if symbols else ()
while True: # use an event here to trigger exit?
prequote_start = time.time() prequote_start = time.time()
with trio.move_on_after(3) as cancel_scope: with trio.move_on_after(3) as cancel_scope:
@ -71,14 +79,13 @@ async def stream_quotes(
if cancelled: if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...") log.warn("Quote query timed out after 3 seconds, retrying...")
# handle network outages by idling until response is received # handle network outages by idling until response is received
# quotes = await wait_for_network(partial(get_quotes, tickers))
quotes = await wait_for_network(request_quotes) quotes = await wait_for_network(request_quotes)
new_quotes = [] new_quotes = {}
if diff_cached: if diff_cached:
# If cache is enabled then only deliver "new" changes. # If cache is enabled then only deliver "new" changes.
# Useful for polling setups but obviously should be # Useful for polling setups but obviously should be
# disabled if you're rx-ing event data. # disabled if you're rx-ing per-tick data.
for quote in quotes: for quote in quotes:
symbol = quote['symbol'] symbol = quote['symbol']
last = _cache.setdefault(symbol, {}) last = _cache.setdefault(symbol, {})
@ -87,10 +94,11 @@ async def stream_quotes(
log.info( log.info(
f"New quote {quote['symbol']}:\n{new}") f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote _cache[symbol] = quote
new_quotes.append(quote) new_quotes[symbol] = quote
else: else:
new_quotes = quotes
log.info(f"Delivering quotes:\n{quotes}") log.info(f"Delivering quotes:\n{quotes}")
for quote in quotes:
newquotes[quote['symbol']] = quote
yield new_quotes yield new_quotes
@ -112,7 +120,8 @@ async def stream_quotes(
# TODO: at this point probably just just make this a class and # TODO: at this point probably just just make this a class and
# a lot of these functions should be methods. It will definitely # a lot of these functions should be methods. It will definitely
# make stateful UI apps easier to implement # make stateful UI apps easier to implement
class BrokerFeed(typing.NamedTuple): @dataclass
class BrokerFeed:
"""A per broker "client feed" container. """A per broker "client feed" container.
A structure to keep track of components used by A structure to keep track of components used by
@ -124,20 +133,26 @@ class BrokerFeed(typing.NamedTuple):
mod: ModuleType mod: ModuleType
client: object client: object
exit_stack: contextlib.AsyncExitStack exit_stack: contextlib.AsyncExitStack
quoter_keys: List[str] = ['stock', 'option'] quoter_keys: Tuple[str] = ('stock', 'option')
tasks: Dict[str, trio.Event] = dict.fromkeys( locks: Dict[str, trio.StrictFIFOLock] = field(
quoter_keys, False) default_factory=lambda:
quoters: Dict[str, typing.Coroutine] = {} {'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()}
subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} )
quoters: Dict[str, typing.Coroutine] = field(default_factory=dict)
subscriptions: Dict[str, Dict[str, set]] = field(
default_factory=partial(dict, **{'option': {}, 'stock': {}})
)
async def fan_out_to_chans( async def fan_out_to_ctxs(
pub_gen: typing.AsyncGenerator,
feed: BrokerFeed, feed: BrokerFeed,
get_quotes: Coroutine, get_quotes: Coroutine,
symbols2chans: Dict[str, tractor.Channel], topics2ctxs: Dict[str, tractor.Context],
topic_key: str = 'key',
packet_key: str = 'symbol',
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None,
) -> None: ) -> None:
"""Request and fan out quotes to each subscribed actor channel. """Request and fan out quotes to each subscribed actor channel.
""" """
@ -146,45 +161,44 @@ async def fan_out_to_chans(
rate = broker_limit rate = broker_limit
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
async def request(): def get_topics():
"""Get quotes for current symbol subscription set. return tuple(topics2ctxs.keys())
"""
symbols = list(symbols2chans.keys())
# subscription can be changed at any time
return await get_quotes(symbols) if symbols else ()
async for quotes in stream_quotes( async for published in pub_gen(
feed.mod, request, rate, get_topics,
get_quotes,
feed.mod,
rate,
diff_cached=diff_cached, diff_cached=diff_cached,
): ):
chan_payloads = {} ctx_payloads = {}
for quote in quotes: for packet_key, data in published.items():
packet = {quote['symbol']: quote} # grab each suscription topic using provided key for lookup
for chan, cid in symbols2chans.get(quote['key'], set()): topic = data[topic_key]
chan_payloads.setdefault( # build a new dict packet for passing to multiple underlying channels
(chan, cid), packet = {packet_key: data}
{'yield': {}, 'cid': cid} for ctx in topics2ctxs.get(topic, set()):
)['yield'].update(packet) ctx_payloads.setdefault(ctx, {}).update(packet),
# deliver to each subscriber (fan out) # deliver to each subscriber (fan out)
if chan_payloads: if ctx_payloads:
for (chan, cid), payload in chan_payloads.items(): for ctx, payload in ctx_payloads.items():
try: try:
await chan.send(payload) await ctx.send_yield(payload)
except ( except (
# That's right, anything you can think of... # That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError, trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warn(f"{chan} went down?") log.warn(f"{ctx.chan} went down?")
for chanset in symbols2chans.values(): for ctx_set in topics2ctxs.values():
chanset.discard((chan, cid)) ctx_set.discard(ctx)
if not any(symbols2chans.values()): if not any(topics2ctxs.values()):
log.warn(f"No subs left for broker {feed.mod.name}, exiting task") log.warn(f"No subs left for broker {feed.mod.name}, exiting task")
break break
log.info(f"Terminating stream quoter task for {feed.mod.name}") log.info(f"Terminating stream quoter task for {pub_gen.__name__}")
async def symbol_data(broker: str, tickers: List[str]): async def symbol_data(broker: str, tickers: List[str]):
@ -241,12 +255,12 @@ async def smoke_quote(get_quotes, tickers, broker):
########################################### ###########################################
def modify_quote_stream(broker, feed_type, symbols, chan, cid): def modify_quote_stream(broker, feed_type, symbols, ctx):
"""Absolute symbol subscription list for each quote stream. """Absolute symbol subscription list for each quote stream.
Effectively a symbol subscription api. Effectively a symbol subscription api.
""" """
log.info(f"{chan} changed symbol subscription to {symbols}") log.info(f"{ctx.chan} changed symbol subscription to {symbols}")
ss = tractor.current_actor().statespace ss = tractor.current_actor().statespace
feed = ss['feeds'].get(broker) feed = ss['feeds'].get(broker)
if feed is None: if feed is None:
@ -254,26 +268,23 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid):
"`get_cached_feed()` must be called before modifying its stream" "`get_cached_feed()` must be called before modifying its stream"
) )
symbols2chans = feed.subscriptions[feed_type] symbols2ctxs = feed.subscriptions[feed_type]
# update map from each symbol to requesting client's chan # update map from each symbol to requesting client's chan
for ticker in symbols: for ticker in symbols:
symbols2chans.setdefault(ticker, set()).add((chan, cid)) symbols2ctxs.setdefault(ticker, set()).add(ctx)
# remove any existing symbol subscriptions if symbol is not # remove any existing symbol subscriptions if symbol is not
# found in ``symbols`` # found in ``symbols``
# TODO: this can likely be factored out into the pub-sub api # TODO: this can likely be factored out into the pub-sub api
for ticker in filter( for ticker in filter(
lambda ticker: ticker not in symbols, symbols2chans.copy() lambda ticker: ticker not in symbols, symbols2ctxs.copy()
): ):
chanset = symbols2chans.get(ticker) ctx_set = symbols2ctxs.get(ticker)
# XXX: cid will be different on unsub call ctx_set.discard(ctx)
for item in chanset.copy():
if (chan, cid) == item:
chanset.discard(item)
if not chanset: if not ctx_set:
# pop empty sets which will trigger bg quoter task termination # pop empty sets which will trigger bg quoter task termination
symbols2chans.pop(ticker) symbols2ctxs.pop(ticker)
async def get_cached_feed( async def get_cached_feed(
@ -310,8 +321,7 @@ async def start_quote_stream(
symbols: List[Any], symbols: List[Any],
feed_type: str = 'stock', feed_type: str = 'stock',
diff_cached: bool = True, diff_cached: bool = True,
chan: tractor.Channel = None, ctx: tractor.Context = None,
cid: str = None,
rate: int = 3, rate: int = 3,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub """Handle per-broker quote stream subscriptions using a "lazy" pub-sub
@ -321,16 +331,17 @@ async def start_quote_stream(
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
limit to one task per process for now. limit to one task per process for now.
""" """
actor = tractor.current_actor() # XXX: why do we need this again?
# set log level after fork get_console_log(tractor.current_actor().loglevel)
get_console_log(actor.loglevel)
# pull global vars from local actor # pull global vars from local actor
symbols = list(symbols) symbols = list(symbols)
log.info( log.info(
f"{chan.uid} subscribed to {broker} for symbols {symbols}") f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}")
# another actor task may have already created it # another actor task may have already created it
feed = await get_cached_feed(broker) feed = await get_cached_feed(broker)
symbols2chans = feed.subscriptions[feed_type] symbols2ctxs = feed.subscriptions[feed_type]
task_is_dead = None
if feed_type == 'stock': if feed_type == 'stock':
get_quotes = feed.quoters.setdefault( get_quotes = feed.quoters.setdefault(
@ -341,7 +352,7 @@ async def start_quote_stream(
# out bad symbols for now) # out bad symbols for now)
payload = await smoke_quote(get_quotes, symbols, broker) payload = await smoke_quote(get_quotes, symbols, broker)
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await chan.send({'yield': payload, 'cid': cid}) await ctx.send_yield(payload)
elif feed_type == 'option': elif feed_type == 'option':
# FIXME: yeah we need maybe a more general way to specify # FIXME: yeah we need maybe a more general way to specify
# the arg signature for the option feed beasides a symbol # the arg signature for the option feed beasides a symbol
@ -355,77 +366,51 @@ async def start_quote_stream(
for quote in await get_quotes(symbols) for quote in await get_quotes(symbols)
} }
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await chan.send({'yield': payload, 'cid': cid}) await ctx.send_yield(payload)
try: try:
# update map from each symbol to requesting client's chan # update map from each symbol to requesting client's chan
modify_quote_stream(broker, feed_type, symbols, chan, cid) modify_quote_stream(broker, feed_type, symbols, ctx)
# event indicating that task was started and then killed # prevents more then one broker feed task from spawning
task_is_dead = feed.tasks.get(feed_type) lock = feed.locks.get(feed_type)
if task_is_dead is False:
task_is_dead = trio.Event()
task_is_dead.set()
feed.tasks[feed_type] = task_is_dead
if not task_is_dead.is_set():
# block and let existing feed task deliver # block and let existing feed task deliver
# stream data until it is cancelled in which case # stream data until it is cancelled in which case
# we'll take over and spawn it again # we'll take over and spawn it again
await task_is_dead.wait() async with lock:
# client channel was likely disconnected
# but we still want to keep the broker task
# alive if there are other consumers (including
# ourselves)
if any(symbols2chans.values()):
log.warn(
f"Data feed task for {feed.mod.name} was cancelled but"
f" there are still active clients, respawning")
# no data feeder task yet; so start one # no data feeder task yet; so start one
respawn = True respawn = True
while respawn: while respawn:
respawn = False respawn = False
log.info(f"Spawning data feed task for {feed.mod.name}") log.info(f"Spawning data feed task for {feed.mod.name}")
try: try:
async with trio.open_nursery() as nursery: # unblocks when no more symbols subscriptions exist and the
nursery.start_soon( # quote streamer task terminates
partial( await fan_out_to_ctxs(
fan_out_to_chans, feed, get_quotes, stream_quotes,
symbols2chans, feed,
get_quotes,
symbols2ctxs,
diff_cached=diff_cached, diff_cached=diff_cached,
cid=cid,
rate=rate, rate=rate,
) )
)
# it's alive!
task_is_dead.clear()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception("Respawning failed data feed task") log.exception("Respawning failed data feed task")
respawn = True respawn = True
# unblocks when no more symbols subscriptions exist and the
# quote streamer task terminates (usually because another call
# was made to `modify_quoter` to unsubscribe from streaming
# symbols)
finally: finally:
log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}")
task_is_dead.set()
# if we're cancelled externally unsubscribe our quote feed # if we're cancelled externally unsubscribe our quote feed
modify_quote_stream(broker, feed_type, [], chan, cid) modify_quote_stream(broker, feed_type, [], ctx)
# if there are truly no more subscriptions with this broker # if there are truly no more subscriptions with this broker
# drop from broker subs dict # drop from broker subs dict
if not any(symbols2chans.values()): if not any(symbols2ctxs.values()):
log.info(f"No more subscriptions for {broker}") log.info(f"No more subscriptions for broker {broker}")
# broker2symbolsubs.pop(broker, None)
# destroy the API client # destroy the API client
await feed.exit_stack.aclose() await feed.exit_stack.aclose()
class DataFeed(object): class DataFeed:
"""Data feed client for streaming symbol data from a (remote) """Data feed client for streaming symbol data from a (remote)
``brokerd`` data daemon. ``brokerd`` data daemon.
""" """
@ -472,7 +457,7 @@ class DataFeed(object):
filename=test filename=test
) )
else: else:
log.info(f"Starting new stream for {self._symbols}") log.info(f"Starting new stream for {symbols}")
# start live streaming from broker daemon # start live streaming from broker daemon
quote_gen = await self.portal.run( quote_gen = await self.portal.run(
"piker.brokers.data", "piker.brokers.data",