Define "packetizer" in specific broker mod

Allows for formatting published quotes using a broker specific
formatting callback.
marketstore_integration
Tyler Goodlet 2020-07-28 13:47:18 -04:00
parent 312169e790
commit 702c63f607
3 changed files with 107 additions and 44 deletions

View File

@ -12,7 +12,7 @@ import typing
from typing import ( from typing import (
Coroutine, Callable, Dict, Coroutine, Callable, Dict,
List, Any, Tuple, AsyncGenerator, List, Any, Tuple, AsyncGenerator,
Sequence, Sequence
) )
import contextlib import contextlib
from operator import itemgetter from operator import itemgetter
@ -47,7 +47,7 @@ async def wait_for_network(
continue continue
except socket.gaierror: except socket.gaierror:
if not down: # only report/log network down once if not down: # only report/log network down once
log.warn(f"Network is down waiting for re-establishment...") log.warn("Network is down waiting for re-establishment...")
down = True down = True
await trio.sleep(sleep) await trio.sleep(sleep)
@ -83,7 +83,6 @@ class BrokerFeed:
async def stream_poll_requests( async def stream_poll_requests(
get_topics: typing.Callable, get_topics: typing.Callable,
get_quotes: Coroutine, get_quotes: Coroutine,
feed: BrokerFeed,
rate: int = 3, # delay between quote requests rate: int = 3, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None: ) -> None:
@ -95,15 +94,16 @@ async def stream_poll_requests(
set of symbols each iteration and ``get_quotes()`` is to retreive set of symbols each iteration and ``get_quotes()`` is to retreive
the quotes. the quotes.
A stock-broker client ``get_quotes()`` async function must be A stock-broker client ``get_quotes()`` async function must be
provided which returns an async quote retrieval function. provided which returns an async quote retrieval function.
"""
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
.. note::
This code is mostly tailored (for now) to the questrade backend.
It is currently the only broker that doesn't support streaming without
paying for data. See the note in the diffing section regarding volume
differentials which needs to be addressed in order to get cross-broker
support.
"""
sleeptime = round(1. / rate, 3) sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching _cache = {} # ticker to quote caching
@ -136,6 +136,7 @@ async def stream_poll_requests(
for quote in quotes: for quote in quotes:
symbol = quote['symbol'] symbol = quote['symbol']
last = _cache.setdefault(symbol, {}) last = _cache.setdefault(symbol, {})
last_volume = last.get('volume', 0)
# find all keys that have match to a new value compared # find all keys that have match to a new value compared
# to the last quote received # to the last quote received
@ -153,9 +154,22 @@ async def stream_poll_requests(
# shares traded is useful info and it's possible # shares traded is useful info and it's possible
# that the set difference from above will disregard # that the set difference from above will disregard
# a "size" value since the same # of shares were traded # a "size" value since the same # of shares were traded
size = quote.get('size') volume = payload.get('volume')
if size and 'volume' in payload: if volume:
payload['size'] = size volume_since_last_quote = volume - last_volume
assert volume_since_last_quote > 0
payload['volume_delta'] = volume_since_last_quote
# TODO: We can emit 2 ticks here:
# - one for the volume differential
# - one for the last known trade size
# The first in theory can be unwound and
# interpolated assuming the broker passes an
# accurate daily VWAP value.
# To make this work we need a universal ``size``
# field that is normalized before hitting this logic.
# XXX: very questrade specific
payload['size'] = quote['lastTradeSize']
# XXX: we append to a list for the options case where the # XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all # subscription topic (key) is the same for all
@ -312,6 +326,7 @@ async def start_quote_stream(
# do a smoke quote (note this mutates the input list and filters # do a smoke quote (note this mutates the input list and filters
# out bad symbols for now) # out bad symbols for now)
payload = await smoke_quote(get_quotes, symbols, broker) payload = await smoke_quote(get_quotes, symbols, broker)
formatter = feed.mod.format_stock_quote
elif feed_type == 'option': elif feed_type == 'option':
# FIXME: yeah we need maybe a more general way to specify # FIXME: yeah we need maybe a more general way to specify
@ -326,9 +341,16 @@ async def start_quote_stream(
quote['symbol']: quote quote['symbol']: quote
for quote in await get_quotes(symbols) for quote in await get_quotes(symbols)
} }
formatter = feed.mod.format_option_quote
def packetizer(topic, quotes): sd = await feed.client.symbol_info(symbols)
return {quote['symbol']: quote for quote in quotes} # formatter = partial(formatter, symbol_data=sd)
packetizer = partial(
feed.mod.packetizer,
formatter=formatter,
symbol_data=sd,
)
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await ctx.send_yield(payload) await ctx.send_yield(payload)
@ -342,7 +364,6 @@ async def start_quote_stream(
packetizer=packetizer, packetizer=packetizer,
# actual func args # actual func args
feed=feed,
get_quotes=get_quotes, get_quotes=get_quotes,
diff_cached=diff_cached, diff_cached=diff_cached,
rate=rate, rate=rate,

View File

@ -12,6 +12,7 @@ import configparser
from typing import ( from typing import (
List, Tuple, Dict, Any, Iterator, NamedTuple, List, Tuple, Dict, Any, Iterator, NamedTuple,
AsyncGenerator, AsyncGenerator,
Callable,
) )
import arrow import arrow
@ -26,7 +27,7 @@ import asks
from ..calc import humanize, percent_change from ..calc import humanize, percent_change
from . import config from . import config
from ._util import resproc, BrokerError, SymbolNotFound from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json from ..log import get_logger, colorize_json, get_console_log
from .._async_utils import async_lifo_cache from .._async_utils import async_lifo_cache
from . import get_brokermod from . import get_brokermod
@ -933,7 +934,8 @@ _qt_option_keys = {
# "theta": ('theta', partial(round, ndigits=3)), # "theta": ('theta', partial(round, ndigits=3)),
# "vega": ('vega', partial(round, ndigits=3)), # "vega": ('vega', partial(round, ndigits=3)),
'$ vol': ('$ vol', humanize), '$ vol': ('$ vol', humanize),
'volume': ('vol', humanize), # XXX: required key to trigger trade execution datum msg
'volume': ('volume', humanize),
# "2021-01-15T00:00:00.000000-05:00", # "2021-01-15T00:00:00.000000-05:00",
# "isHalted": false, # "isHalted": false,
# "key": [ # "key": [
@ -1031,18 +1033,20 @@ async def get_cached_client(
log.info(f"Loading existing `{brokername}` daemon") log.info(f"Loading existing `{brokername}` daemon")
async with lock: async with lock:
client = clients[brokername] client = clients[brokername]
client._consumers += 1
yield client
except KeyError: except KeyError:
log.info(f"Creating new client for broker {brokername}") log.info(f"Creating new client for broker {brokername}")
async with lock: async with lock:
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
exit_stack = contextlib.AsyncExitStack() exit_stack = contextlib.AsyncExitStack()
client = await exit_stack.enter_async_context( client = await exit_stack.enter_async_context(
brokermod.get_client()) brokermod.get_client()
)
client._consumers = 0
client._exit_stack = exit_stack client._exit_stack = exit_stack
clients[brokername] = client clients[brokername] = client
else: yield client
client._consumers += 1
yield client
finally: finally:
client._consumers -= 1 client._consumers -= 1
if client._consumers <= 0: if client._consumers <= 0:
@ -1097,6 +1101,23 @@ async def smoke_quote(get_quotes, tickers): # , broker):
########################################### ###########################################
# function to format packets delivered to subscribers
def packetizer(
topic: str,
quotes: Dict[str, Any],
formatter: Callable,
symbol_data: Dict[str, Any],
) -> Dict[str, Any]:
"""Normalize quotes by name into dicts using broker-specific
processing.
"""
new = {}
for quote in quotes:
new[quote['symbol']], _ = formatter(quote, symbol_data)
return new
@tractor.stream @tractor.stream
async def stream_quotes( async def stream_quotes(
ctx: tractor.Context, # marks this as a streaming func ctx: tractor.Context, # marks this as a streaming func
@ -1104,8 +1125,11 @@ async def stream_quotes(
feed_type: str = 'stock', feed_type: str = 'stock',
diff_cached: bool = True, diff_cached: bool = True,
rate: int = 3, rate: int = 3,
loglevel: str = None,
# feed_type: str = 'stock', # feed_type: str = 'stock',
) -> AsyncGenerator[str, Dict[str, Any]]: ) -> AsyncGenerator[str, Dict[str, Any]]:
# XXX: why do we need this again?
get_console_log(tractor.current_actor().loglevel)
async with get_cached_client('questrade') as client: async with get_cached_client('questrade') as client:
if feed_type == 'stock': if feed_type == 'stock':
@ -1124,20 +1148,7 @@ async def stream_quotes(
for quote in await get_quotes(symbols) for quote in await get_quotes(symbols)
} }
symbol_data = await client.symbol_info(symbols) sd = await client.symbol_info(symbols)
# function to format packets delivered to subscribers
def packetizer(
topic: str,
quotes: Dict[str, Any]
) -> Dict[str, Any]:
"""Normalize quotes by name into dicts.
"""
new = {}
for quote in quotes:
new[quote['symbol']], _ = formatter(quote, symbol_data)
return new
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await ctx.send_yield(payload) await ctx.send_yield(payload)
@ -1150,7 +1161,11 @@ async def stream_quotes(
task_name=feed_type, task_name=feed_type,
ctx=ctx, ctx=ctx,
topics=symbols, topics=symbols,
packetizer=packetizer, packetizer=partial(
packetizer,
formatter=formatter,
symboal_data=sd,
),
# actual func args # actual func args
get_quotes=get_quotes, get_quotes=get_quotes,

View File

@ -6,19 +6,35 @@ and storing data from your brokers as well as
sharing your feeds with other fellow pikers. sharing your feeds with other fellow pikers.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from importlib import import_module
from types import ModuleType
from typing import ( from typing import (
Dict, List, Any, Dict, List, Any,
Sequence, AsyncIterator, Optional Sequence, AsyncIterator, Optional
) )
import trio
import tractor import tractor
from ..brokers import get_brokermod from ..brokers import get_brokermod
from ..log import get_logger from ..log import get_logger, get_console_log
log = get_logger(__name__) log = get_logger(__name__)
__ingestors__ = [
'marketstore',
]
def get_ingestor(name: str) -> ModuleType:
"""Return the imported ingestor module by name.
"""
module = import_module('.' + name, 'piker.data')
# we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module
_data_mods = [ _data_mods = [
'piker.brokers.core', 'piker.brokers.core',
@ -30,7 +46,6 @@ _data_mods = [
async def maybe_spawn_brokerd( async def maybe_spawn_brokerd(
brokername: str, brokername: str,
sleep: float = 0.5, sleep: float = 0.5,
tries: int = 10,
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
expose_mods: List = [], expose_mods: List = [],
**tractor_kwargs, **tractor_kwargs,
@ -38,6 +53,11 @@ async def maybe_spawn_brokerd(
"""If no ``brokerd.{brokername}`` daemon-actor can be found, """If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it. spawn one in a local subactor and return a portal to it.
""" """
if loglevel:
get_console_log(loglevel)
tractor_kwargs['loglevel'] = loglevel
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}' dname = f'brokerd.{brokername}'
async with tractor.find_actor(dname) as portal: async with tractor.find_actor(dname) as portal:
@ -69,21 +89,28 @@ async def maybe_spawn_brokerd(
async def open_feed( async def open_feed(
name: str, name: str,
symbols: Sequence[str], symbols: Sequence[str],
loglevel: str = 'info',
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
try: try:
mod = get_brokermod(name) mod = get_brokermod(name)
except ImportError: except ImportError:
# TODO: try to pull up ingest feeds mod = get_ingestormod(name)
# - market store
# - influx
raise
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
mod.name, mod.name,
loglevel=loglevel,
) as portal: ) as portal:
stream = await portal.run( stream = await portal.run(
mod.__name__, mod.__name__,
'stream_quotes', 'stream_quotes',
symbols=symbols, symbols=symbols,
) )
yield stream # Feed is required to deliver an initial quote asap.
# TODO: should we timeout and raise a more explicit error?
# with trio.fail_after(5):
with trio.fail_after(float('inf')):
# Retreive initial quote for each symbol
# such that consumer code can know the data layout
first_quote = await stream.__anext__()
log.info(f"Received first quote {first_quote}")
yield (first_quote, stream)