Add a normalizer routine which emits quote differentials/ticks

bar_select
Tyler Goodlet 2020-08-09 00:03:09 -04:00
parent 81fb327fe1
commit 9bbf0e0d7a
1 changed files with 98 additions and 17 deletions

View File

@ -9,6 +9,7 @@ from datetime import datetime
from functools import partial
import itertools
import configparser
from pprint import pformat
from typing import (
List, Tuple, Dict, Any, Iterator, NamedTuple,
AsyncGenerator,
@ -837,7 +838,8 @@ _qt_stock_keys = {
# 'low52w': 'low52w', # put in info widget
# 'high52w': 'high52w',
# "lastTradePriceTrHrs": 7.99,
'lastTradeTime': ('fill_time', datetime.fromisoformat),
# 'lastTradeTime': ('fill_time', datetime.fromisoformat),
'lastTradeTime': 'fill_time',
"lastTradeTick": 'tick', # ("Equal", "Up", "Down")
# "symbolId": 3575753,
# "tier": "",
@ -913,6 +915,7 @@ def format_stock_quote(
new[new_key] = value
displayable[new_key] = display_value
new['displayable'] = displayable
return new, displayable
@ -973,6 +976,7 @@ def format_option_quote(
quote: dict,
symbol_data: dict,
keymap: dict = _qt_option_keys,
include_displayables: bool = True,
) -> Tuple[dict, dict]:
"""Remap a list of quote dicts ``quotes`` using the mapping of old keys
-> new keys ``keymap`` returning 2 dicts: one with raw data and the other
@ -1060,7 +1064,10 @@ async def get_cached_client(
await client._exit_stack.aclose()
async def smoke_quote(get_quotes, tickers): # , broker):
async def smoke_quote(
get_quotes,
tickers
):
"""Do an initial "smoke" request for symbols in ``tickers`` filtering
out any symbols not supported by the broker queried in the call to
``get_quotes()``.
@ -1099,6 +1106,7 @@ async def smoke_quote(get_quotes, tickers): # , broker):
log.error(
f"{symbol} seems to be defunct")
quote['symbol'] = symbol
payload[symbol] = quote
return payload
@ -1107,20 +1115,90 @@ async def smoke_quote(get_quotes, tickers): # , broker):
###########################################
# unbounded, shared between streaming tasks
_symbol_info_cache = {}
# function to format packets delivered to subscribers
def packetizer(
topic: str,
quotes: Dict[str, Any],
formatter: Callable,
symbol_data: Dict[str, Any],
) -> Dict[str, Any]:
"""Normalize quotes by name into dicts using broker-specific
processing.
"""
new = {}
for quote in quotes:
new[quote['symbol']], _ = formatter(quote, symbol_data)
# repack into symbol keyed dict
return {q['symbol']: q for q in quotes}
def normalize(
quotes: Dict[str, Any],
_cache: Dict[str, Any], # dict held in scope of the streaming loop
formatter: Callable,
) -> Dict[str, Any]:
"""Deliver normalized quotes by name into dicts using
broker-specific processing; only emit changes differeing from the
last quote sample creating a psuedo-tick type datum.
"""
new = {}
# XXX: this is effectively emitting "sampled ticks"
# useful for polling setups but obviously should be
# disabled if you're already rx-ing per-tick data.
for quote in quotes:
symbol = quote['symbol']
# look up last quote from cache
last = _cache.setdefault(symbol, {})
_cache[symbol] = quote
# compute volume difference
last_volume = last.get('volume', 0)
current_volume = quote['volume']
volume_diff = current_volume - last_volume
# find all keys that have match to a new value compared
# to the last quote received
changed = set(quote.items()) - set(last.items())
if changed:
log.info(f"New quote {symbol}:\n{changed}")
# TODO: can we reduce the # of iterations here and in
# called funcs?
payload = {k: quote[k] for k, v in changed}
payload['symbol'] = symbol # required by formatter
# TODO: we should probaby do the "computed" fields
# processing found inside this func in a downstream actor?
fquote, _ = formatter(payload, _symbol_info_cache)
fquote['key'] = fquote['symbol'] = symbol
# if there was volume likely the last size of
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
# volume = payload.get('volume')
if volume_diff:
if volume_diff < 0:
log.error(f"Uhhh {symbol} volume: {volume_diff} ?")
fquote['volume_delta'] = volume_diff
# TODO: We can emit 2 ticks here:
# - one for the volume differential
# - one for the last known trade size
# The first in theory can be unwound and
# interpolated assuming the broker passes an
# accurate daily VWAP value.
# To make this work we need a universal ``size``
# field that is normalized before hitting this logic.
fquote['size'] = quote.get('lastTradeSize', 0)
if 'last' not in fquote:
fquote['last'] = quote.get('lastTradePrice', float('nan'))
new[symbol] = fquote
if new:
log.info(f"New quotes:\n{pformat(new)}")
return new
@ -1129,13 +1207,12 @@ async def stream_quotes(
ctx: tractor.Context, # marks this as a streaming func
symbols: List[str],
feed_type: str = 'stock',
diff_cached: bool = True,
rate: int = 3,
loglevel: str = None,
# feed_type: str = 'stock',
) -> AsyncGenerator[str, Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(tractor.current_actor().loglevel)
get_console_log(loglevel)
async with get_cached_client('questrade') as client:
if feed_type == 'stock':
@ -1144,17 +1221,25 @@ async def stream_quotes(
# do a smoke quote (note this mutates the input list and filters
# out bad symbols for now)
payload = await smoke_quote(get_quotes, list(symbols))
first_quotes = await smoke_quote(get_quotes, list(symbols))
else:
formatter = format_option_quote
get_quotes = await option_quoter(client, symbols)
# packetize
payload = {
first_quotes = {
quote['symbol']: quote
for quote in await get_quotes(symbols)
}
# update global symbol data state
sd = await client.symbol_info(symbols)
_symbol_info_cache.update(sd)
# pre-process first set of quotes
payload = {}
for sym, quote in first_quotes.items():
fquote, _ = formatter(quote, sd)
payload[sym] = fquote
# push initial smoke quote response for client initialization
await ctx.send_yield(payload)
@ -1167,15 +1252,11 @@ async def stream_quotes(
task_name=feed_type,
ctx=ctx,
topics=symbols,
packetizer=partial(
packetizer,
formatter=formatter,
symbol_data=sd,
),
packetizer=packetizer,
# actual target "streaming func" args
get_quotes=get_quotes,
diff_cached=diff_cached,
normalizer=partial(normalize, formatter=formatter),
rate=rate,
)
log.info("Terminating stream quoter task")