Port `DataFeed` api to broker specific normalizer routine
parent
5fe8e420b8
commit
37607d61ca
|
@ -81,10 +81,10 @@ class BrokerFeed:
|
||||||
|
|
||||||
@tractor.msg.pub(tasks=['stock', 'option'])
|
@tractor.msg.pub(tasks=['stock', 'option'])
|
||||||
async def stream_poll_requests(
|
async def stream_poll_requests(
|
||||||
get_topics: typing.Callable,
|
get_topics: Callable,
|
||||||
get_quotes: Coroutine,
|
get_quotes: Coroutine,
|
||||||
|
normalizer: Callable,
|
||||||
rate: int = 3, # delay between quote requests
|
rate: int = 3, # delay between quote requests
|
||||||
diff_cached: bool = True, # only deliver "new" quotes to the queue
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Stream requests for quotes for a set of symbols at the given
|
"""Stream requests for quotes for a set of symbols at the given
|
||||||
``rate`` (per second).
|
``rate`` (per second).
|
||||||
|
@ -129,59 +129,15 @@ async def stream_poll_requests(
|
||||||
quotes = await wait_for_network(request_quotes)
|
quotes = await wait_for_network(request_quotes)
|
||||||
|
|
||||||
new_quotes = {}
|
new_quotes = {}
|
||||||
if diff_cached:
|
|
||||||
# If cache is enabled then only deliver "new" changes.
|
|
||||||
# Useful for polling setups but obviously should be
|
|
||||||
# disabled if you're rx-ing per-tick data.
|
|
||||||
for quote in quotes:
|
|
||||||
symbol = quote['symbol']
|
|
||||||
last = _cache.setdefault(symbol, {})
|
|
||||||
last_volume = last.get('volume', 0)
|
|
||||||
|
|
||||||
# find all keys that have match to a new value compared
|
normalized = normalizer(quotes, _cache)
|
||||||
# to the last quote received
|
for symbol, quote in normalized.items():
|
||||||
new = set(quote.items()) - set(last.items())
|
# XXX: we append to a list for the options case where the
|
||||||
if new:
|
# subscription topic (key) is the same for all
|
||||||
log.info(f"New quote {symbol}:\n{new}")
|
# expiries even though this is uncessary for the
|
||||||
_cache[symbol] = quote
|
# stock case (different topic [i.e. symbol] for each
|
||||||
|
# quote).
|
||||||
# only ship diff updates and other required fields
|
new_quotes.setdefault(quote['key'], []).append(quote)
|
||||||
payload = {k: quote[k] for k, v in new}
|
|
||||||
payload['symbol'] = symbol
|
|
||||||
|
|
||||||
# if there was volume likely the last size of
|
|
||||||
# shares traded is useful info and it's possible
|
|
||||||
# that the set difference from above will disregard
|
|
||||||
# a "size" value since the same # of shares were traded
|
|
||||||
volume = payload.get('volume')
|
|
||||||
if volume:
|
|
||||||
volume_since_last_quote = volume - last_volume
|
|
||||||
assert volume_since_last_quote > 0
|
|
||||||
payload['volume_delta'] = volume_since_last_quote
|
|
||||||
|
|
||||||
# TODO: We can emit 2 ticks here:
|
|
||||||
# - one for the volume differential
|
|
||||||
# - one for the last known trade size
|
|
||||||
# The first in theory can be unwound and
|
|
||||||
# interpolated assuming the broker passes an
|
|
||||||
# accurate daily VWAP value.
|
|
||||||
# To make this work we need a universal ``size``
|
|
||||||
# field that is normalized before hitting this logic.
|
|
||||||
# XXX: very questrade specific
|
|
||||||
payload['size'] = quote['lastTradeSize']
|
|
||||||
|
|
||||||
log.info(f"New paylod {symbol}:\n{payload}")
|
|
||||||
|
|
||||||
# XXX: we append to a list for the options case where the
|
|
||||||
# subscription topic (key) is the same for all
|
|
||||||
# expiries even though this is uncessary for the
|
|
||||||
# stock case (different topic [i.e. symbol] for each
|
|
||||||
# quote).
|
|
||||||
new_quotes.setdefault(quote['key'], []).append(payload)
|
|
||||||
else:
|
|
||||||
# log.debug(f"Delivering quotes:\n{quotes}")
|
|
||||||
for quote in quotes:
|
|
||||||
new_quotes.setdefault(quote['key'], []).append(quote)
|
|
||||||
|
|
||||||
if new_quotes:
|
if new_quotes:
|
||||||
yield new_quotes
|
yield new_quotes
|
||||||
|
@ -208,53 +164,6 @@ async def symbol_data(broker: str, tickers: List[str]):
|
||||||
return await feed.client.symbol_info(tickers)
|
return await feed.client.symbol_info(tickers)
|
||||||
|
|
||||||
|
|
||||||
async def smoke_quote(get_quotes, tickers, broker):
|
|
||||||
"""Do an initial "smoke" request for symbols in ``tickers`` filtering
|
|
||||||
out any symbols not supported by the broker queried in the call to
|
|
||||||
``get_quotes()``.
|
|
||||||
"""
|
|
||||||
# TODO: trim out with #37
|
|
||||||
#################################################
|
|
||||||
# get a single quote filtering out any bad tickers
|
|
||||||
# NOTE: this code is always run for every new client
|
|
||||||
# subscription even when a broker quoter task is already running
|
|
||||||
# since the new client needs to know what symbols are accepted
|
|
||||||
log.warn(f"Retrieving smoke quote for symbols {tickers}")
|
|
||||||
quotes = await get_quotes(tickers)
|
|
||||||
|
|
||||||
# report any tickers that aren't returned in the first quote
|
|
||||||
invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes))
|
|
||||||
for symbol in invalid_tickers:
|
|
||||||
tickers.remove(symbol)
|
|
||||||
log.warn(
|
|
||||||
f"Symbol `{symbol}` not found by broker `{broker}`"
|
|
||||||
)
|
|
||||||
|
|
||||||
# pop any tickers that return "empty" quotes
|
|
||||||
payload = {}
|
|
||||||
for quote in quotes:
|
|
||||||
symbol = quote['symbol']
|
|
||||||
if quote is None:
|
|
||||||
log.warn(
|
|
||||||
f"Symbol `{symbol}` not found by broker"
|
|
||||||
f" `{broker}`")
|
|
||||||
# XXX: not this mutates the input list (for now)
|
|
||||||
tickers.remove(symbol)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# report any unknown/invalid symbols (QT specific)
|
|
||||||
if quote.get('low52w', False) is None:
|
|
||||||
log.error(
|
|
||||||
f"{symbol} seems to be defunct")
|
|
||||||
|
|
||||||
payload[symbol] = quote
|
|
||||||
|
|
||||||
return payload
|
|
||||||
|
|
||||||
# end of section to be trimmed out with #37
|
|
||||||
###########################################
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def get_cached_feed(
|
async def get_cached_feed(
|
||||||
brokername: str,
|
brokername: str,
|
||||||
|
@ -297,7 +206,6 @@ async def start_quote_stream(
|
||||||
broker: str,
|
broker: str,
|
||||||
symbols: List[Any],
|
symbols: List[Any],
|
||||||
feed_type: str = 'stock',
|
feed_type: str = 'stock',
|
||||||
diff_cached: bool = True,
|
|
||||||
rate: int = 3,
|
rate: int = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub
|
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub
|
||||||
|
@ -316,8 +224,6 @@ async def start_quote_stream(
|
||||||
f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}")
|
f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}")
|
||||||
# another actor task may have already created it
|
# another actor task may have already created it
|
||||||
async with get_cached_feed(broker) as feed:
|
async with get_cached_feed(broker) as feed:
|
||||||
# function to format packets delivered to subscribers
|
|
||||||
packetizer = None
|
|
||||||
|
|
||||||
if feed_type == 'stock':
|
if feed_type == 'stock':
|
||||||
get_quotes = feed.quoters.setdefault(
|
get_quotes = feed.quoters.setdefault(
|
||||||
|
@ -326,7 +232,7 @@ async def start_quote_stream(
|
||||||
)
|
)
|
||||||
# do a smoke quote (note this mutates the input list and filters
|
# do a smoke quote (note this mutates the input list and filters
|
||||||
# out bad symbols for now)
|
# out bad symbols for now)
|
||||||
payload = await smoke_quote(get_quotes, symbols, broker)
|
first_quotes = await feed.mod.smoke_quote(get_quotes, symbols)
|
||||||
formatter = feed.mod.format_stock_quote
|
formatter = feed.mod.format_stock_quote
|
||||||
|
|
||||||
elif feed_type == 'option':
|
elif feed_type == 'option':
|
||||||
|
@ -338,22 +244,27 @@ async def start_quote_stream(
|
||||||
await feed.mod.option_quoter(feed.client, symbols)
|
await feed.mod.option_quoter(feed.client, symbols)
|
||||||
)
|
)
|
||||||
# packetize
|
# packetize
|
||||||
payload = {
|
first_quotes = {
|
||||||
quote['symbol']: quote
|
quote['symbol']: quote
|
||||||
for quote in await get_quotes(symbols)
|
for quote in await get_quotes(symbols)
|
||||||
}
|
}
|
||||||
formatter = feed.mod.format_option_quote
|
formatter = feed.mod.format_option_quote
|
||||||
|
|
||||||
sd = await feed.client.symbol_info(symbols)
|
sd = await feed.client.symbol_info(symbols)
|
||||||
# formatter = partial(formatter, symbol_data=sd)
|
feed.mod._symbol_info_cache.update(sd)
|
||||||
|
|
||||||
packetizer = partial(
|
normalize = partial(
|
||||||
feed.mod.packetizer,
|
feed.mod.normalize,
|
||||||
formatter=formatter,
|
formatter=formatter,
|
||||||
symbol_data=sd,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# push initial smoke quote response for client initialization
|
# pre-process first set of quotes
|
||||||
|
payload = {}
|
||||||
|
for sym, quote in first_quotes.items():
|
||||||
|
fquote, _ = formatter(quote, sd)
|
||||||
|
assert fquote['displayable']
|
||||||
|
payload[sym] = fquote
|
||||||
|
|
||||||
await ctx.send_yield(payload)
|
await ctx.send_yield(payload)
|
||||||
|
|
||||||
await stream_poll_requests(
|
await stream_poll_requests(
|
||||||
|
@ -362,11 +273,11 @@ async def start_quote_stream(
|
||||||
task_name=feed_type,
|
task_name=feed_type,
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
topics=symbols,
|
topics=symbols,
|
||||||
packetizer=packetizer,
|
packetizer=feed.mod.packetizer,
|
||||||
|
|
||||||
# actual func args
|
# actual func args
|
||||||
get_quotes=get_quotes,
|
get_quotes=get_quotes,
|
||||||
diff_cached=diff_cached,
|
normalizer=normalize,
|
||||||
rate=rate,
|
rate=rate,
|
||||||
)
|
)
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -401,7 +312,6 @@ class DataFeed:
|
||||||
symbols: Sequence[str],
|
symbols: Sequence[str],
|
||||||
feed_type: str,
|
feed_type: str,
|
||||||
rate: int = 1,
|
rate: int = 1,
|
||||||
diff_cached: bool = True,
|
|
||||||
test: str = '',
|
test: str = '',
|
||||||
) -> (AsyncGenerator, dict):
|
) -> (AsyncGenerator, dict):
|
||||||
if feed_type not in self._allowed:
|
if feed_type not in self._allowed:
|
||||||
|
@ -445,7 +355,6 @@ class DataFeed:
|
||||||
broker=self.brokermod.name,
|
broker=self.brokermod.name,
|
||||||
symbols=symbols,
|
symbols=symbols,
|
||||||
feed_type=feed_type,
|
feed_type=feed_type,
|
||||||
diff_cached=diff_cached,
|
|
||||||
rate=rate,
|
rate=rate,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue