Add options streaming

Well that was a doozy; had to rejig pretty much all of it.

The deats:
- Track broker components in a new `DataFeed` namedtuple
- port to new list based batch quotes (not dicts any more)
- lock access to cached broker-client / data-feed instantiation
- respawn tasks that fail due to the network
kivy_mainline_and_py3.8
Tyler Goodlet 2018-11-30 01:34:32 -05:00
parent cd7d8d024d
commit c7cf0cde9c
1 changed files with 162 additions and 97 deletions

View File

@ -7,7 +7,10 @@ from itertools import cycle
import socket
import json
from types import ModuleType
from typing import Coroutine, Callable, Dict, List
import typing
from typing import Coroutine, Callable, Dict, List, Any
import contextlib
from operator import itemgetter
import trio
import tractor
@ -56,12 +59,14 @@ async def stream_quotes(
_cache = {} # ticker to quote caching
while True: # use an event here to trigger exit?
prequote_start = time.time()
# tickers = list(tickers2chans.keys())
with trio.move_on_after(3) as cancel_scope:
quotes = await request_quotes()
postquote_start = time.time()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...")
@ -69,18 +74,20 @@ async def stream_quotes(
# quotes = await wait_for_network(partial(get_quotes, tickers))
quotes = await wait_for_network(request_quotes)
postquote_start = time.time()
new_quotes = {}
new_quotes = []
if diff_cached:
# if cache is enabled then only deliver "new" changes
for symbol, quote in quotes.items():
# If cache is enabled then only deliver "new" changes.
# Useful for polling setups but obviously should be
# disabled if you're rx-ing event data.
for quote in quotes:
symbol = quote['symbol']
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
new_quotes[symbol] = quote
new_quotes.append(quote)
else:
new_quotes = quotes
@ -101,31 +108,51 @@ async def stream_quotes(
await trio.sleep(delay)
class DataFeed(typing.NamedTuple):
"""A per broker "data feed" container.
A structure to keep track of components used by
real-time data daemons.
"""
mod: ModuleType
client: object
quoter_keys: List[str] = ['stock', 'option']
tasks: Dict[str, trio._core._run.Task] = dict.fromkeys(
quoter_keys, False)
quoters: Dict[str, typing.Coroutine] = {}
subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}}
async def fan_out_to_chans(
brokermod: ModuleType,
feed: DataFeed,
get_quotes: Coroutine,
tickers2chans: Dict[str, tractor.Channel],
symbols2chans: Dict[str, tractor.Channel],
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None,
) -> None:
"""Request and fan out quotes to each subscribed actor channel.
"""
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
async def request():
"""Get quotes for current symbol subscription set.
"""
return await get_quotes(list(tickers2chans.keys()))
return await get_quotes(list(symbols2chans.keys()))
async for quotes in stream_quotes(brokermod, request, rate):
async for quotes in stream_quotes(
feed.mod, request, rate,
diff_cached=diff_cached,
):
chan_payloads = {}
for symbol, quote in quotes.items():
for quote in quotes:
# is this too QT specific?
symbol = quote['symbol']
# set symbol quotes for each subscriber
for chan, cid in tickers2chans.get(symbol, set()):
for chan, cid in symbols2chans.get(quote['key'], set()):
chan_payloads.setdefault(
chan,
{'yield': {}, 'cid': cid}
@ -142,41 +169,21 @@ async def fan_out_to_chans(
ConnectionRefusedError,
):
log.warn(f"{chan} went down?")
for chanset in tickers2chans.values():
for chanset in symbols2chans.values():
chanset.discard((chan, cid))
if not any(tickers2chans.values()):
log.warn(f"No subs left for broker {brokermod.name}, exiting task")
if not any(symbols2chans.values()):
log.warn(f"No subs left for broker {feed.mod.name}, exiting task")
break
log.info(f"Terminating stream quoter task for {brokermod.name}")
log.info(f"Terminating stream quoter task for {feed.mod.name}")
async def get_cached_client(broker, tickers):
"""Get or create the current actor's cached broker client.
"""
# check if a cached client is in the local actor's statespace
clients = tractor.current_actor().statespace.setdefault('clients', {})
try:
return clients[broker]
except KeyError:
log.info(f"Creating new client for broker {broker}")
brokermod = get_brokermod(broker)
# TODO: move to AsyncExitStack in 3.7
client_cntxmng = brokermod.get_client()
client = await client_cntxmng.__aenter__()
get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = (
brokermod, client, client_cntxmng, get_quotes)
return brokermod, client, client_cntxmng, get_quotes
async def symbol_data(broker, tickers):
async def symbol_data(broker: str, tickers: List[str]):
"""Retrieve baseline symbol info from broker.
"""
_, client, _, get_quotes = await get_cached_client(broker, tickers)
return await client.symbol_data(tickers)
feed = await get_cached_feed(broker)
return await feed.client.symbol_data(tickers)
async def smoke_quote(get_quotes, tickers, broker):
@ -192,8 +199,9 @@ async def smoke_quote(get_quotes, tickers, broker):
# since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for symbols {tickers}")
quotes = await get_quotes(tickers)
# report any tickers that aren't returned in the first quote
invalid_tickers = set(tickers) - set(quotes)
invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes))
for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn(
@ -202,7 +210,8 @@ async def smoke_quote(get_quotes, tickers, broker):
# pop any tickers that return "empty" quotes
payload = {}
for symbol, quote in quotes.items():
for quote in quotes:
symbol = quote['symbol']
if quote is None:
log.warn(
f"Symbol `{symbol}` not found by broker"
@ -210,6 +219,12 @@ async def smoke_quote(get_quotes, tickers, broker):
# XXX: not this mutates the input list (for now)
tickers.remove(symbol)
continue
# report any unknown/invalid symbols (QT specific)
if quote.get('low52w', False) is None:
log.warn(
f"{symbol} seems to be defunct")
payload[symbol] = quote
return payload
@ -218,23 +233,25 @@ async def smoke_quote(get_quotes, tickers, broker):
###########################################
def modify_quote_stream(broker, tickers, chan=None, cid=None):
async def modify_quote_stream(broker, feed_type, tickers, chan=None, cid=None):
"""Absolute symbol subscription list for each quote stream.
Effectively a consumer subscription api.
Effectively a symbol subscription api.
"""
log.info(f"{chan} changed symbol subscription to {tickers}")
ss = tractor.current_actor().statespace
broker2tickersubs = ss['broker2tickersubs']
tickers2chans = broker2tickersubs.get(broker)
feed = await get_cached_feed(broker)
symbols2chans = feed.subscriptions[feed_type]
# update map from each symbol to requesting client's chan
for ticker in tickers:
tickers2chans.setdefault(ticker, set()).add((chan, cid))
symbols2chans.setdefault(ticker, set()).add((chan, cid))
# remove any existing symbol subscriptions if symbol is not
# found in ``tickers``
# TODO: this can likely be factored out into the pub-sub api
for ticker in filter(
lambda ticker: ticker not in tickers, tickers2chans.copy()
lambda ticker: ticker not in tickers, symbols2chans.copy()
):
chanset = tickers2chans.get(ticker)
chanset = symbols2chans.get(ticker)
# XXX: cid will be different on unsub call
for item in chanset.copy():
if chan in item:
@ -242,12 +259,42 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
if not chanset:
# pop empty sets which will trigger bg quoter task termination
tickers2chans.pop(ticker)
symbols2chans.pop(ticker)
async def get_cached_feed(
brokername: str,
) -> DataFeed:
"""Get/create a ``DataFeed`` from/in the current actor.
"""
# check if a cached client is in the local actor's statespace
ss = tractor.current_actor().statespace
feeds = ss['feeds']
lock = feeds['_lock']
feed_stack = ss['feed_stacks'][brokername]
async with lock:
try:
feed = feeds[brokername]
log.info(f"Subscribing with existing `{brokername}` daemon")
return feed
except KeyError:
log.info(f"Creating new client for broker {brokername}")
brokermod = get_brokermod(brokername)
client = await feed_stack.enter_async_context(
brokermod.get_client())
feed = DataFeed(
mod=brokermod,
client=client,
)
feeds[brokername] = feed
return feed
async def start_quote_stream(
broker: str,
tickers: List[str],
symbols: List[Any],
feed_type: str = 'stock',
diff_cached: bool = True,
chan: tractor.Channel = None,
cid: str = None,
) -> None:
@ -263,57 +310,75 @@ async def start_quote_stream(
get_console_log(actor.loglevel)
# pull global vars from local actor
ss = actor.statespace
broker2tickersubs = ss['broker2tickersubs']
clients = ss['clients']
dtasks = ss['dtasks']
tickers = list(tickers)
# broker2symbolsubs = ss.setdefault('broker2symbolsubs', {})
ss.setdefault('feeds', {'_lock': trio.Lock()})
feed_stacks = ss.setdefault('feed_stacks', {})
symbols = list(symbols)
log.info(
f"{chan.uid} subscribed to {broker} for tickers {tickers}")
brokermod, client, _, get_quotes = await get_cached_client(broker, tickers)
if broker not in broker2tickersubs:
tickers2chans = broker2tickersubs.setdefault(broker, {})
else:
log.info(f"Subscribing with existing `{broker}` daemon")
tickers2chans = broker2tickersubs[broker]
f"{chan.uid} subscribed to {broker} for symbols {symbols}")
feed_stack = feed_stacks.setdefault(broker, contextlib.AsyncExitStack())
# another actor task may have already created it
feed = await get_cached_feed(broker)
symbols2chans = feed.subscriptions[feed_type]
if feed_type == 'stock':
get_quotes = feed.quoters.setdefault(
'stock',
await feed.mod.stock_quoter(feed.client, symbols)
)
# do a smoke quote (note this mutates the input list and filters
# out bad symbols for now)
payload = await smoke_quote(get_quotes, tickers, broker)
payload = await smoke_quote(get_quotes, symbols, broker)
# push initial smoke quote response for client initialization
await chan.send({'yield': payload, 'cid': cid})
elif feed_type == 'option':
# FIXME: yeah we need maybe a more general way to specify
# the arg signature for the option feed beasides a symbol
# + expiry date.
get_quotes = feed.quoters.setdefault(
'option',
await feed.mod.option_quoter(feed.client, symbols)
)
# update map from each symbol to requesting client's chan
modify_quote_stream(broker, tickers, chan=chan, cid=cid)
await modify_quote_stream(broker, feed_type, symbols, chan, cid)
try:
if broker not in dtasks:
# no quoter task yet so start a daemon task
log.info(f"Spawning quoter task for {brokermod.name}")
if not feed.tasks.get(feed_type):
# no data feeder task yet; so start one
respawn = True
log.info(f"Spawning data feed task for {feed.mod.name}")
while respawn:
respawn = False
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(partial(
fan_out_to_chans, brokermod, get_quotes, tickers2chans,
cid=cid)
nursery.start_soon(
partial(
fan_out_to_chans, feed, get_quotes,
symbols2chans,
diff_cached=diff_cached,
cid=cid
)
dtasks.add(broker)
)
feed.tasks[feed_type] = True
except trio.BrokenResourceError:
log.exception("Respawning failed data feed task")
respawn = True
# unblocks when no more symbols subscriptions exist and the
# quote streamer task terminates (usually because another call
# was made to `modify_quoter` to unsubscribe from streaming
# symbols)
log.info(f"Terminated quoter task for {brokermod.name}")
# TODO: move to AsyncExitStack in 3.7
for _, _, cntxmng, _ in clients.values():
# FIXME: yes I know there's no error handling..
await cntxmng.__aexit__(None, None, None)
finally:
log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}")
feed.tasks.pop(feed_type)
# if there are truly no more subscriptions with this broker
# drop from broker subs dict
if not any(tickers2chans.values()):
if not any(symbols2chans.values()):
log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None)
dtasks.discard(broker)
# broker2symbolsubs.pop(broker, None)
# destroy the API client
await feed_stack.aclose()
async def stream_to_file(