Add a normalizer routine which emits quote differentials/ticks

its_happening
Tyler Goodlet 2020-08-09 00:03:09 -04:00
parent 75824f7afa
commit 5fe8e420b8
1 changed files with 98 additions and 17 deletions

View File

@ -9,6 +9,7 @@ from datetime import datetime
from functools import partial from functools import partial
import itertools import itertools
import configparser import configparser
from pprint import pformat
from typing import ( from typing import (
List, Tuple, Dict, Any, Iterator, NamedTuple, List, Tuple, Dict, Any, Iterator, NamedTuple,
AsyncGenerator, AsyncGenerator,
@ -838,7 +839,8 @@ _qt_stock_keys = {
# 'low52w': 'low52w', # put in info widget # 'low52w': 'low52w', # put in info widget
# 'high52w': 'high52w', # 'high52w': 'high52w',
# "lastTradePriceTrHrs": 7.99, # "lastTradePriceTrHrs": 7.99,
'lastTradeTime': ('fill_time', datetime.fromisoformat), # 'lastTradeTime': ('fill_time', datetime.fromisoformat),
'lastTradeTime': 'fill_time',
"lastTradeTick": 'tick', # ("Equal", "Up", "Down") "lastTradeTick": 'tick', # ("Equal", "Up", "Down")
# "symbolId": 3575753, # "symbolId": 3575753,
# "tier": "", # "tier": "",
@ -914,6 +916,7 @@ def format_stock_quote(
new[new_key] = value new[new_key] = value
displayable[new_key] = display_value displayable[new_key] = display_value
new['displayable'] = displayable
return new, displayable return new, displayable
@ -974,6 +977,7 @@ def format_option_quote(
quote: dict, quote: dict,
symbol_data: dict, symbol_data: dict,
keymap: dict = _qt_option_keys, keymap: dict = _qt_option_keys,
include_displayables: bool = True,
) -> Tuple[dict, dict]: ) -> Tuple[dict, dict]:
"""Remap a list of quote dicts ``quotes`` using the mapping of old keys """Remap a list of quote dicts ``quotes`` using the mapping of old keys
-> new keys ``keymap`` returning 2 dicts: one with raw data and the other -> new keys ``keymap`` returning 2 dicts: one with raw data and the other
@ -1061,7 +1065,10 @@ async def get_cached_client(
await client._exit_stack.aclose() await client._exit_stack.aclose()
async def smoke_quote(get_quotes, tickers): # , broker): async def smoke_quote(
get_quotes,
tickers
):
"""Do an initial "smoke" request for symbols in ``tickers`` filtering """Do an initial "smoke" request for symbols in ``tickers`` filtering
out any symbols not supported by the broker queried in the call to out any symbols not supported by the broker queried in the call to
``get_quotes()``. ``get_quotes()``.
@ -1100,6 +1107,7 @@ async def smoke_quote(get_quotes, tickers): # , broker):
log.error( log.error(
f"{symbol} seems to be defunct") f"{symbol} seems to be defunct")
quote['symbol'] = symbol
payload[symbol] = quote payload[symbol] = quote
return payload return payload
@ -1108,20 +1116,90 @@ async def smoke_quote(get_quotes, tickers): # , broker):
########################################### ###########################################
# unbounded, shared between streaming tasks
_symbol_info_cache = {}
# function to format packets delivered to subscribers # function to format packets delivered to subscribers
def packetizer( def packetizer(
topic: str, topic: str,
quotes: Dict[str, Any], quotes: Dict[str, Any],
formatter: Callable,
symbol_data: Dict[str, Any],
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Normalize quotes by name into dicts using broker-specific """Normalize quotes by name into dicts using broker-specific
processing. processing.
""" """
new = {} # repack into symbol keyed dict
for quote in quotes: return {q['symbol']: q for q in quotes}
new[quote['symbol']], _ = formatter(quote, symbol_data)
def normalize(
quotes: Dict[str, Any],
_cache: Dict[str, Any], # dict held in scope of the streaming loop
formatter: Callable,
) -> Dict[str, Any]:
"""Deliver normalized quotes by name into dicts using
broker-specific processing; only emit changes differeing from the
last quote sample creating a psuedo-tick type datum.
"""
new = {}
# XXX: this is effectively emitting "sampled ticks"
# useful for polling setups but obviously should be
# disabled if you're already rx-ing per-tick data.
for quote in quotes:
symbol = quote['symbol']
# look up last quote from cache
last = _cache.setdefault(symbol, {})
_cache[symbol] = quote
# compute volume difference
last_volume = last.get('volume', 0)
current_volume = quote['volume']
volume_diff = current_volume - last_volume
# find all keys that have match to a new value compared
# to the last quote received
changed = set(quote.items()) - set(last.items())
if changed:
log.info(f"New quote {symbol}:\n{changed}")
# TODO: can we reduce the # of iterations here and in
# called funcs?
payload = {k: quote[k] for k, v in changed}
payload['symbol'] = symbol # required by formatter
# TODO: we should probaby do the "computed" fields
# processing found inside this func in a downstream actor?
fquote, _ = formatter(payload, _symbol_info_cache)
fquote['key'] = fquote['symbol'] = symbol
# if there was volume likely the last size of
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
# volume = payload.get('volume')
if volume_diff:
if volume_diff < 0:
log.error(f"Uhhh {symbol} volume: {volume_diff} ?")
fquote['volume_delta'] = volume_diff
# TODO: We can emit 2 ticks here:
# - one for the volume differential
# - one for the last known trade size
# The first in theory can be unwound and
# interpolated assuming the broker passes an
# accurate daily VWAP value.
# To make this work we need a universal ``size``
# field that is normalized before hitting this logic.
fquote['size'] = quote.get('lastTradeSize', 0)
if 'last' not in fquote:
fquote['last'] = quote.get('lastTradePrice', float('nan'))
new[symbol] = fquote
if new:
log.info(f"New quotes:\n{pformat(new)}")
return new return new
@ -1130,13 +1208,12 @@ async def stream_quotes(
ctx: tractor.Context, # marks this as a streaming func ctx: tractor.Context, # marks this as a streaming func
symbols: List[str], symbols: List[str],
feed_type: str = 'stock', feed_type: str = 'stock',
diff_cached: bool = True,
rate: int = 3, rate: int = 3,
loglevel: str = None, loglevel: str = None,
# feed_type: str = 'stock', # feed_type: str = 'stock',
) -> AsyncGenerator[str, Dict[str, Any]]: ) -> AsyncGenerator[str, Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(tractor.current_actor().loglevel) get_console_log(loglevel)
async with get_cached_client('questrade') as client: async with get_cached_client('questrade') as client:
if feed_type == 'stock': if feed_type == 'stock':
@ -1145,17 +1222,25 @@ async def stream_quotes(
# do a smoke quote (note this mutates the input list and filters # do a smoke quote (note this mutates the input list and filters
# out bad symbols for now) # out bad symbols for now)
payload = await smoke_quote(get_quotes, list(symbols)) first_quotes = await smoke_quote(get_quotes, list(symbols))
else: else:
formatter = format_option_quote formatter = format_option_quote
get_quotes = await option_quoter(client, symbols) get_quotes = await option_quoter(client, symbols)
# packetize # packetize
payload = { first_quotes = {
quote['symbol']: quote quote['symbol']: quote
for quote in await get_quotes(symbols) for quote in await get_quotes(symbols)
} }
# update global symbol data state
sd = await client.symbol_info(symbols) sd = await client.symbol_info(symbols)
_symbol_info_cache.update(sd)
# pre-process first set of quotes
payload = {}
for sym, quote in first_quotes.items():
fquote, _ = formatter(quote, sd)
payload[sym] = fquote
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await ctx.send_yield(payload) await ctx.send_yield(payload)
@ -1168,15 +1253,11 @@ async def stream_quotes(
task_name=feed_type, task_name=feed_type,
ctx=ctx, ctx=ctx,
topics=symbols, topics=symbols,
packetizer=partial( packetizer=packetizer,
packetizer,
formatter=formatter,
symbol_data=sd,
),
# actual target "streaming func" args # actual target "streaming func" args
get_quotes=get_quotes, get_quotes=get_quotes,
diff_cached=diff_cached, normalizer=partial(normalize, formatter=formatter),
rate=rate, rate=rate,
) )
log.info("Terminating stream quoter task") log.info("Terminating stream quoter task")