Allow recording data feeds to disk

Add a couple functions for storing and retrieving live json data feed
recordings to disk using a very rudimentary character + newline delimited
format.

Also, split out the pub-sub logic from `stream_quotes()` into a new
func, `fan_out_to_chans()`. Eventually I want to formalize this pattern
into a decorator exposed through `tractor`.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-11-22 15:56:02 -05:00
parent f038fdd42f
commit c23982393d
1 changed files with 111 additions and 53 deletions

View File

@ -3,9 +3,11 @@ Live data feed machinery
""" """
import time import time
from functools import partial from functools import partial
from itertools import cycle
import socket import socket
import json
from types import ModuleType from types import ModuleType
from typing import Coroutine, Callable, Dict from typing import Coroutine, Callable, Dict, List
import trio import trio
import tractor import tractor
@ -14,7 +16,7 @@ from ..log import get_logger, get_console_log
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.core') log = get_logger('broker.data')
async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
@ -40,11 +42,9 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
async def stream_quotes( async def stream_quotes(
brokermod: ModuleType, brokermod: ModuleType,
get_quotes: Coroutine, request_quotes: Coroutine,
tickers2chans: Dict[str, tractor.Channel],
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None,
) -> None: ) -> None:
"""Stream quotes for a sequence of tickers at the given ``rate`` """Stream quotes for a sequence of tickers at the given ``rate``
per second. per second.
@ -52,53 +52,84 @@ async def stream_quotes(
A stock-broker client ``get_quotes()`` async context manager must be A stock-broker client ``get_quotes()`` async context manager must be
provided which returns an async quote retrieval function. provided which returns an async quote retrieval function.
""" """
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
sleeptime = round(1. / rate, 3) sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching _cache = {} # ticker to quote caching
while True: # use an event here to trigger exit? while True: # use an event here to trigger exit?
prequote_start = time.time() prequote_start = time.time()
if not any(tickers2chans.values()): # tickers = list(tickers2chans.keys())
log.warn(f"No subs left for broker {brokermod.name}, exiting task")
break
tickers = list(tickers2chans.keys())
with trio.move_on_after(3) as cancel_scope: with trio.move_on_after(3) as cancel_scope:
quotes = await get_quotes(tickers) quotes = await request_quotes()
cancelled = cancel_scope.cancelled_caught cancelled = cancel_scope.cancelled_caught
if cancelled: if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...") log.warn("Quote query timed out after 3 seconds, retrying...")
# handle network outages by idling until response is received # handle network outages by idling until response is received
quotes = await wait_for_network(partial(get_quotes, tickers)) # quotes = await wait_for_network(partial(get_quotes, tickers))
quotes = await wait_for_network(request_quotes)
postquote_start = time.time() postquote_start = time.time()
new_quotes = {}
if diff_cached:
# if cache is enabled then only deliver "new" changes
for symbol, quote in quotes.items():
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
new_quotes[symbol] = quote
else:
new_quotes = quotes
yield new_quotes
# latency monitoring
req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3)
tot = req_time + proc_time
log.debug(f"Request + processing took {tot}")
delay = sleeptime - tot
if delay <= 0:
log.warn(
f"Took {req_time} (request) + {proc_time} (processing) "
f"= {tot} secs (> {sleeptime}) for processing quotes?")
else:
log.debug(f"Sleeping for {delay}")
await trio.sleep(delay)
async def fan_out_to_chans(
brokermod: ModuleType,
get_quotes: Coroutine,
tickers2chans: Dict[str, tractor.Channel],
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
cid: str = None,
) -> None:
"""Request and fan out quotes to each subscribed actor channel.
"""
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
async def request():
"""Get quotes for current symbol subscription set.
"""
return await get_quotes(list(tickers2chans.keys()))
async for quotes in stream_quotes(brokermod, request, rate):
chan_payloads = {} chan_payloads = {}
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
if diff_cached: # set symbol quotes for each subscriber
# if cache is enabled then only deliver "new" changes for chan, cid in tickers2chans.get(symbol, set()):
last = _cache.setdefault(symbol, {}) chan_payloads.setdefault(
new = set(quote.items()) - set(last.items()) chan,
if new: {'yield': {}, 'cid': cid}
log.info( )['yield'][symbol] = quote
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
for chan, cid in tickers2chans.get(symbol, set()):
chan_payloads.setdefault(
chan,
{'yield': {}, 'cid': cid}
)['yield'][symbol] = quote
else:
for chan, cid in tickers2chans[symbol]:
chan_payloads.setdefault(
chan,
{'yield': {}, 'cid': cid}
)['yield'][symbol] = quote
# deliver to each subscriber (fan out) # deliver to each subscriber (fan out)
if chan_payloads: if chan_payloads:
@ -114,19 +145,9 @@ async def stream_quotes(
for chanset in tickers2chans.values(): for chanset in tickers2chans.values():
chanset.discard((chan, cid)) chanset.discard((chan, cid))
# latency monitoring if not any(tickers2chans.values()):
req_time = round(postquote_start - prequote_start, 3) log.warn(f"No subs left for broker {brokermod.name}, exiting task")
proc_time = round(time.time() - postquote_start, 3) break
tot = req_time + proc_time
log.debug(f"Request + processing took {tot}")
delay = sleeptime - tot
if delay <= 0:
log.warn(
f"Took {req_time} (request) + {proc_time} (processing) "
f"= {tot} secs (> {sleeptime}) for processing quotes?")
else:
log.debug(f"Sleeping for {delay}")
await trio.sleep(delay)
log.info(f"Terminating stream quoter task for {brokermod.name}") log.info(f"Terminating stream quoter task for {brokermod.name}")
@ -226,7 +247,7 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None):
async def start_quote_stream( async def start_quote_stream(
broker: str, broker: str,
tickers: [str], tickers: List[str],
chan: tractor.Channel = None, chan: tractor.Channel = None,
cid: str = None, cid: str = None,
) -> None: ) -> None:
@ -256,8 +277,8 @@ async def start_quote_stream(
log.info(f"Subscribing with existing `{broker}` daemon") log.info(f"Subscribing with existing `{broker}` daemon")
tickers2chans = broker2tickersubs[broker] tickers2chans = broker2tickersubs[broker]
# do a smoke quote (note this mutates the input list and filters out bad # do a smoke quote (note this mutates the input list and filters
# symbols for now) # out bad symbols for now)
payload = await smoke_quote(get_quotes, tickers, broker) payload = await smoke_quote(get_quotes, tickers, broker)
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await chan.send({'yield': payload, 'cid': cid}) await chan.send({'yield': payload, 'cid': cid})
@ -271,7 +292,7 @@ async def start_quote_stream(
log.info(f"Spawning quoter task for {brokermod.name}") log.info(f"Spawning quoter task for {brokermod.name}")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
nursery.start_soon(partial( nursery.start_soon(partial(
stream_quotes, brokermod, get_quotes, tickers2chans, fan_out_to_chans, brokermod, get_quotes, tickers2chans,
cid=cid) cid=cid)
) )
dtasks.add(broker) dtasks.add(broker)
@ -293,3 +314,40 @@ async def start_quote_stream(
log.info(f"No more subscriptions for {broker}") log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None) broker2tickersubs.pop(broker, None)
dtasks.discard(broker) dtasks.discard(broker)
async def stream_to_file(
watchlist_name: str,
filename: str,
portal: tractor._portal.Portal,
tickers: List[str],
brokermod: ModuleType,
rate: int,
):
"""Record client side received quotes to file ``filename``.
"""
# an async generator instance
agen = await portal.run(
"piker.brokers.data", 'start_quote_stream',
broker=brokermod.name, tickers=tickers)
fname = filename or f'{watchlist_name}.jsonstream'
with open(fname, 'a') as f:
async for quotes in agen:
f.write(json.dumps(quotes))
f.write('\n--\n')
return fname
async def stream_from_file(
filename: str,
):
with open(filename, 'r') as quotes_file:
content = quotes_file.read()
pkts = content.split('--')[:-1] # simulate 2 separate quote packets
payloads = [json.loads(pkt) for pkt in pkts]
for payload in cycle(payloads):
yield payload
await trio.sleep(0.3)