From 22670afe58cacc29926c2bcd49179d0cefca4e76 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jan 2019 21:23:49 -0500 Subject: [PATCH] Generalize the publisher/fan-out system Start working toward a more general (on-demand) pub-sub system which can be brought into ``tractor``. Right now this just means making the code in the `fan_out_to_ctxs()` less specific but, eventually I think this function should be coupled with a decorator and shipped as a standard "message pattern". Additionally, - try out making `BrokerFeed` a `@dataclass` - strip out all the `trio.Event` / uneeded nursery / extra task crap from `start_quote_stream()` --- piker/brokers/data.py | 221 ++++++++++++++++++++---------------------- 1 file changed, 103 insertions(+), 118 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index ff959ec0..bc3ff369 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -3,12 +3,13 @@ Live data feed machinery """ import time from functools import partial +from dataclasses import dataclass, field from itertools import cycle import socket import json from types import ModuleType import typing -from typing import Coroutine, Callable, Dict, List, Any +from typing import Coroutine, Callable, Dict, List, Any, Tuple import contextlib from operator import itemgetter @@ -44,8 +45,9 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def stream_quotes( + get_topics: typing.Callable, + get_quotes: Coroutine, brokermod: ModuleType, - request_quotes: Coroutine, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -58,8 +60,14 @@ async def stream_quotes( sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching - while True: # use an event here to trigger exit? + async def request_quotes(): + """Get quotes for current symbol subscription set. + """ + symbols = get_topics() + # subscription can be changed at any time + return await get_quotes(symbols) if symbols else () + while True: # use an event here to trigger exit? prequote_start = time.time() with trio.move_on_after(3) as cancel_scope: @@ -71,14 +79,13 @@ async def stream_quotes( if cancelled: log.warn("Quote query timed out after 3 seconds, retrying...") # handle network outages by idling until response is received - # quotes = await wait_for_network(partial(get_quotes, tickers)) quotes = await wait_for_network(request_quotes) - new_quotes = [] + new_quotes = {} if diff_cached: # If cache is enabled then only deliver "new" changes. # Useful for polling setups but obviously should be - # disabled if you're rx-ing event data. + # disabled if you're rx-ing per-tick data. for quote in quotes: symbol = quote['symbol'] last = _cache.setdefault(symbol, {}) @@ -87,10 +94,11 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - new_quotes.append(quote) + new_quotes[symbol] = quote else: - new_quotes = quotes log.info(f"Delivering quotes:\n{quotes}") + for quote in quotes: + newquotes[quote['symbol']] = quote yield new_quotes @@ -112,7 +120,8 @@ async def stream_quotes( # TODO: at this point probably just just make this a class and # a lot of these functions should be methods. It will definitely # make stateful UI apps easier to implement -class BrokerFeed(typing.NamedTuple): +@dataclass +class BrokerFeed: """A per broker "client feed" container. A structure to keep track of components used by @@ -124,20 +133,26 @@ class BrokerFeed(typing.NamedTuple): mod: ModuleType client: object exit_stack: contextlib.AsyncExitStack - quoter_keys: List[str] = ['stock', 'option'] - tasks: Dict[str, trio.Event] = dict.fromkeys( - quoter_keys, False) - quoters: Dict[str, typing.Coroutine] = {} - subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} + quoter_keys: Tuple[str] = ('stock', 'option') + locks: Dict[str, trio.StrictFIFOLock] = field( + default_factory=lambda: + {'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()} + ) + quoters: Dict[str, typing.Coroutine] = field(default_factory=dict) + subscriptions: Dict[str, Dict[str, set]] = field( + default_factory=partial(dict, **{'option': {}, 'stock': {}}) + ) -async def fan_out_to_chans( +async def fan_out_to_ctxs( + pub_gen: typing.AsyncGenerator, feed: BrokerFeed, get_quotes: Coroutine, - symbols2chans: Dict[str, tractor.Channel], + topics2ctxs: Dict[str, tractor.Context], + topic_key: str = 'key', + packet_key: str = 'symbol', rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue - cid: str = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. """ @@ -146,45 +161,44 @@ async def fan_out_to_chans( rate = broker_limit log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") - async def request(): - """Get quotes for current symbol subscription set. - """ - symbols = list(symbols2chans.keys()) - # subscription can be changed at any time - return await get_quotes(symbols) if symbols else () + def get_topics(): + return tuple(topics2ctxs.keys()) - async for quotes in stream_quotes( - feed.mod, request, rate, + async for published in pub_gen( + get_topics, + get_quotes, + feed.mod, + rate, diff_cached=diff_cached, ): - chan_payloads = {} - for quote in quotes: - packet = {quote['symbol']: quote} - for chan, cid in symbols2chans.get(quote['key'], set()): - chan_payloads.setdefault( - (chan, cid), - {'yield': {}, 'cid': cid} - )['yield'].update(packet) + ctx_payloads = {} + for packet_key, data in published.items(): + # grab each suscription topic using provided key for lookup + topic = data[topic_key] + # build a new dict packet for passing to multiple underlying channels + packet = {packet_key: data} + for ctx in topics2ctxs.get(topic, set()): + ctx_payloads.setdefault(ctx, {}).update(packet), # deliver to each subscriber (fan out) - if chan_payloads: - for (chan, cid), payload in chan_payloads.items(): + if ctx_payloads: + for ctx, payload in ctx_payloads.items(): try: - await chan.send(payload) + await ctx.send_yield(payload) except ( # That's right, anything you can think of... trio.ClosedStreamError, ConnectionResetError, ConnectionRefusedError, ): - log.warn(f"{chan} went down?") - for chanset in symbols2chans.values(): - chanset.discard((chan, cid)) + log.warn(f"{ctx.chan} went down?") + for ctx_set in topics2ctxs.values(): + ctx_set.discard(ctx) - if not any(symbols2chans.values()): + if not any(topics2ctxs.values()): log.warn(f"No subs left for broker {feed.mod.name}, exiting task") break - log.info(f"Terminating stream quoter task for {feed.mod.name}") + log.info(f"Terminating stream quoter task for {pub_gen.__name__}") async def symbol_data(broker: str, tickers: List[str]): @@ -241,12 +255,12 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -def modify_quote_stream(broker, feed_type, symbols, chan, cid): +def modify_quote_stream(broker, feed_type, symbols, ctx): """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. """ - log.info(f"{chan} changed symbol subscription to {symbols}") + log.info(f"{ctx.chan} changed symbol subscription to {symbols}") ss = tractor.current_actor().statespace feed = ss['feeds'].get(broker) if feed is None: @@ -254,26 +268,23 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid): "`get_cached_feed()` must be called before modifying its stream" ) - symbols2chans = feed.subscriptions[feed_type] + symbols2ctxs = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan for ticker in symbols: - symbols2chans.setdefault(ticker, set()).add((chan, cid)) + symbols2ctxs.setdefault(ticker, set()).add(ctx) # remove any existing symbol subscriptions if symbol is not # found in ``symbols`` # TODO: this can likely be factored out into the pub-sub api for ticker in filter( - lambda ticker: ticker not in symbols, symbols2chans.copy() + lambda ticker: ticker not in symbols, symbols2ctxs.copy() ): - chanset = symbols2chans.get(ticker) - # XXX: cid will be different on unsub call - for item in chanset.copy(): - if (chan, cid) == item: - chanset.discard(item) + ctx_set = symbols2ctxs.get(ticker) + ctx_set.discard(ctx) - if not chanset: + if not ctx_set: # pop empty sets which will trigger bg quoter task termination - symbols2chans.pop(ticker) + symbols2ctxs.pop(ticker) async def get_cached_feed( @@ -310,8 +321,7 @@ async def start_quote_stream( symbols: List[Any], feed_type: str = 'stock', diff_cached: bool = True, - chan: tractor.Channel = None, - cid: str = None, + ctx: tractor.Context = None, rate: int = 3, ) -> None: """Handle per-broker quote stream subscriptions using a "lazy" pub-sub @@ -321,16 +331,17 @@ async def start_quote_stream( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - actor = tractor.current_actor() - # set log level after fork - get_console_log(actor.loglevel) + # XXX: why do we need this again? + get_console_log(tractor.current_actor().loglevel) + # pull global vars from local actor symbols = list(symbols) log.info( - f"{chan.uid} subscribed to {broker} for symbols {symbols}") + f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") # another actor task may have already created it feed = await get_cached_feed(broker) - symbols2chans = feed.subscriptions[feed_type] + symbols2ctxs = feed.subscriptions[feed_type] + task_is_dead = None if feed_type == 'stock': get_quotes = feed.quoters.setdefault( @@ -341,7 +352,7 @@ async def start_quote_stream( # out bad symbols for now) payload = await smoke_quote(get_quotes, symbols, broker) # push initial smoke quote response for client initialization - await chan.send({'yield': payload, 'cid': cid}) + await ctx.send_yield(payload) elif feed_type == 'option': # FIXME: yeah we need maybe a more general way to specify # the arg signature for the option feed beasides a symbol @@ -355,77 +366,51 @@ async def start_quote_stream( for quote in await get_quotes(symbols) } # push initial smoke quote response for client initialization - await chan.send({'yield': payload, 'cid': cid}) + await ctx.send_yield(payload) try: # update map from each symbol to requesting client's chan - modify_quote_stream(broker, feed_type, symbols, chan, cid) + modify_quote_stream(broker, feed_type, symbols, ctx) - # event indicating that task was started and then killed - task_is_dead = feed.tasks.get(feed_type) - if task_is_dead is False: - task_is_dead = trio.Event() - task_is_dead.set() - feed.tasks[feed_type] = task_is_dead + # prevents more then one broker feed task from spawning + lock = feed.locks.get(feed_type) - if not task_is_dead.is_set(): - # block and let existing feed task deliver - # stream data until it is cancelled in which case - # we'll take over and spawn it again - await task_is_dead.wait() - # client channel was likely disconnected - # but we still want to keep the broker task - # alive if there are other consumers (including - # ourselves) - if any(symbols2chans.values()): - log.warn( - f"Data feed task for {feed.mod.name} was cancelled but" - f" there are still active clients, respawning") - - # no data feeder task yet; so start one - respawn = True - while respawn: - respawn = False - log.info(f"Spawning data feed task for {feed.mod.name}") - try: - async with trio.open_nursery() as nursery: - nursery.start_soon( - partial( - fan_out_to_chans, feed, get_quotes, - symbols2chans, - diff_cached=diff_cached, - cid=cid, - rate=rate, - ) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # we'll take over and spawn it again + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info(f"Spawning data feed task for {feed.mod.name}") + try: + # unblocks when no more symbols subscriptions exist and the + # quote streamer task terminates + await fan_out_to_ctxs( + stream_quotes, + feed, + get_quotes, + symbols2ctxs, + diff_cached=diff_cached, + rate=rate, ) - # it's alive! - task_is_dead.clear() - - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True - - # unblocks when no more symbols subscriptions exist and the - # quote streamer task terminates (usually because another call - # was made to `modify_quoter` to unsubscribe from streaming - # symbols) + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True finally: - log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}") - task_is_dead.set() - # if we're cancelled externally unsubscribe our quote feed - modify_quote_stream(broker, feed_type, [], chan, cid) + modify_quote_stream(broker, feed_type, [], ctx) # if there are truly no more subscriptions with this broker # drop from broker subs dict - if not any(symbols2chans.values()): - log.info(f"No more subscriptions for {broker}") - # broker2symbolsubs.pop(broker, None) + if not any(symbols2ctxs.values()): + log.info(f"No more subscriptions for broker {broker}") # destroy the API client await feed.exit_stack.aclose() -class DataFeed(object): +class DataFeed: """Data feed client for streaming symbol data from a (remote) ``brokerd`` data daemon. """ @@ -472,7 +457,7 @@ class DataFeed(object): filename=test ) else: - log.info(f"Starting new stream for {self._symbols}") + log.info(f"Starting new stream for {symbols}") # start live streaming from broker daemon quote_gen = await self.portal.run( "piker.brokers.data",