Define "packetizer" in specific broker mod

Allows for formatting published quotes using a broker specific
formatting callback.
its_happening
Tyler Goodlet 2020-07-28 13:47:18 -04:00
parent 12655f87fd
commit b16bc9b42d
3 changed files with 107 additions and 44 deletions

View File

@ -12,7 +12,7 @@ import typing
from typing import (
Coroutine, Callable, Dict,
List, Any, Tuple, AsyncGenerator,
Sequence,
Sequence
)
import contextlib
from operator import itemgetter
@ -47,7 +47,7 @@ async def wait_for_network(
continue
except socket.gaierror:
if not down: # only report/log network down once
log.warn(f"Network is down waiting for re-establishment...")
log.warn("Network is down waiting for re-establishment...")
down = True
await trio.sleep(sleep)
@ -83,7 +83,6 @@ class BrokerFeed:
async def stream_poll_requests(
get_topics: typing.Callable,
get_quotes: Coroutine,
feed: BrokerFeed,
rate: int = 3, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None:
@ -93,17 +92,18 @@ async def stream_poll_requests(
This routine is built for brokers who support quote polling for multiple
symbols per request. The ``get_topics()`` func is called to retreive the
set of symbols each iteration and ``get_quotes()`` is to retreive
the quotes.
the quotes.
A stock-broker client ``get_quotes()`` async function must be
provided which returns an async quote retrieval function.
"""
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
.. note::
This code is mostly tailored (for now) to the questrade backend.
It is currently the only broker that doesn't support streaming without
paying for data. See the note in the diffing section regarding volume
differentials which needs to be addressed in order to get cross-broker
support.
"""
sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching
@ -136,6 +136,7 @@ async def stream_poll_requests(
for quote in quotes:
symbol = quote['symbol']
last = _cache.setdefault(symbol, {})
last_volume = last.get('volume', 0)
# find all keys that have match to a new value compared
# to the last quote received
@ -153,9 +154,22 @@ async def stream_poll_requests(
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
size = quote.get('size')
if size and 'volume' in payload:
payload['size'] = size
volume = payload.get('volume')
if volume:
volume_since_last_quote = volume - last_volume
assert volume_since_last_quote > 0
payload['volume_delta'] = volume_since_last_quote
# TODO: We can emit 2 ticks here:
# - one for the volume differential
# - one for the last known trade size
# The first in theory can be unwound and
# interpolated assuming the broker passes an
# accurate daily VWAP value.
# To make this work we need a universal ``size``
# field that is normalized before hitting this logic.
# XXX: very questrade specific
payload['size'] = quote['lastTradeSize']
# XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all
@ -312,6 +326,7 @@ async def start_quote_stream(
# do a smoke quote (note this mutates the input list and filters
# out bad symbols for now)
payload = await smoke_quote(get_quotes, symbols, broker)
formatter = feed.mod.format_stock_quote
elif feed_type == 'option':
# FIXME: yeah we need maybe a more general way to specify
@ -326,9 +341,16 @@ async def start_quote_stream(
quote['symbol']: quote
for quote in await get_quotes(symbols)
}
formatter = feed.mod.format_option_quote
def packetizer(topic, quotes):
return {quote['symbol']: quote for quote in quotes}
sd = await feed.client.symbol_info(symbols)
# formatter = partial(formatter, symbol_data=sd)
packetizer = partial(
feed.mod.packetizer,
formatter=formatter,
symbol_data=sd,
)
# push initial smoke quote response for client initialization
await ctx.send_yield(payload)
@ -342,7 +364,6 @@ async def start_quote_stream(
packetizer=packetizer,
# actual func args
feed=feed,
get_quotes=get_quotes,
diff_cached=diff_cached,
rate=rate,

View File

@ -12,6 +12,7 @@ import configparser
from typing import (
List, Tuple, Dict, Any, Iterator, NamedTuple,
AsyncGenerator,
Callable,
)
import arrow
@ -26,7 +27,7 @@ import asks
from ..calc import humanize, percent_change
from . import config
from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json
from ..log import get_logger, colorize_json, get_console_log
from .._async_utils import async_lifo_cache
from . import get_brokermod
@ -934,7 +935,8 @@ _qt_option_keys = {
# "theta": ('theta', partial(round, ndigits=3)),
# "vega": ('vega', partial(round, ndigits=3)),
'$ vol': ('$ vol', humanize),
'volume': ('vol', humanize),
# XXX: required key to trigger trade execution datum msg
'volume': ('volume', humanize),
# "2021-01-15T00:00:00.000000-05:00",
# "isHalted": false,
# "key": [
@ -1038,18 +1040,20 @@ async def get_cached_client(
log.info(f"Loading existing `{brokername}` daemon")
async with lock:
client = clients[brokername]
client._consumers += 1
yield client
except KeyError:
log.info(f"Creating new client for broker {brokername}")
async with lock:
brokermod = get_brokermod(brokername)
exit_stack = contextlib.AsyncExitStack()
client = await exit_stack.enter_async_context(
brokermod.get_client())
brokermod.get_client()
)
client._consumers = 0
client._exit_stack = exit_stack
clients[brokername] = client
else:
client._consumers += 1
yield client
yield client
finally:
client._consumers -= 1
if client._consumers <= 0:
@ -1104,6 +1108,23 @@ async def smoke_quote(get_quotes, tickers): # , broker):
###########################################
# function to format packets delivered to subscribers
def packetizer(
topic: str,
quotes: Dict[str, Any],
formatter: Callable,
symbol_data: Dict[str, Any],
) -> Dict[str, Any]:
"""Normalize quotes by name into dicts using broker-specific
processing.
"""
new = {}
for quote in quotes:
new[quote['symbol']], _ = formatter(quote, symbol_data)
return new
@tractor.stream
async def stream_quotes(
ctx: tractor.Context, # marks this as a streaming func
@ -1111,8 +1132,11 @@ async def stream_quotes(
feed_type: str = 'stock',
diff_cached: bool = True,
rate: int = 3,
loglevel: str = None,
# feed_type: str = 'stock',
) -> AsyncGenerator[str, Dict[str, Any]]:
# XXX: why do we need this again?
get_console_log(tractor.current_actor().loglevel)
async with get_cached_client('questrade') as client:
if feed_type == 'stock':
@ -1131,20 +1155,7 @@ async def stream_quotes(
for quote in await get_quotes(symbols)
}
symbol_data = await client.symbol_info(symbols)
# function to format packets delivered to subscribers
def packetizer(
topic: str,
quotes: Dict[str, Any]
) -> Dict[str, Any]:
"""Normalize quotes by name into dicts.
"""
new = {}
for quote in quotes:
new[quote['symbol']], _ = formatter(quote, symbol_data)
return new
sd = await client.symbol_info(symbols)
# push initial smoke quote response for client initialization
await ctx.send_yield(payload)
@ -1157,7 +1168,11 @@ async def stream_quotes(
task_name=feed_type,
ctx=ctx,
topics=symbols,
packetizer=packetizer,
packetizer=partial(
packetizer,
formatter=formatter,
symboal_data=sd,
),
# actual func args
get_quotes=get_quotes,

View File

@ -6,19 +6,35 @@ and storing data from your brokers as well as
sharing your feeds with other fellow pikers.
"""
from contextlib import asynccontextmanager
from importlib import import_module
from types import ModuleType
from typing import (
Dict, List, Any,
Sequence, AsyncIterator, Optional
)
import trio
import tractor
from ..brokers import get_brokermod
from ..log import get_logger
from ..log import get_logger, get_console_log
log = get_logger(__name__)
__ingestors__ = [
'marketstore',
]
def get_ingestor(name: str) -> ModuleType:
"""Return the imported ingestor module by name.
"""
module = import_module('.' + name, 'piker.data')
# we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module
_data_mods = [
'piker.brokers.core',
@ -30,7 +46,6 @@ _data_mods = [
async def maybe_spawn_brokerd(
brokername: str,
sleep: float = 0.5,
tries: int = 10,
loglevel: Optional[str] = None,
expose_mods: List = [],
**tractor_kwargs,
@ -38,6 +53,11 @@ async def maybe_spawn_brokerd(
"""If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
if loglevel:
get_console_log(loglevel)
tractor_kwargs['loglevel'] = loglevel
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
async with tractor.find_actor(dname) as portal:
@ -69,21 +89,28 @@ async def maybe_spawn_brokerd(
async def open_feed(
name: str,
symbols: Sequence[str],
loglevel: str = 'info',
) -> AsyncIterator[Dict[str, Any]]:
try:
mod = get_brokermod(name)
except ImportError:
# TODO: try to pull up ingest feeds
# - market store
# - influx
raise
mod = get_ingestormod(name)
async with maybe_spawn_brokerd(
mod.name,
loglevel=loglevel,
) as portal:
stream = await portal.run(
mod.__name__,
'stream_quotes',
symbols=symbols,
)
yield stream
# Feed is required to deliver an initial quote asap.
# TODO: should we timeout and raise a more explicit error?
# with trio.fail_after(5):
with trio.fail_after(float('inf')):
# Retreive initial quote for each symbol
# such that consumer code can know the data layout
first_quote = await stream.__anext__()
log.info(f"Received first quote {first_quote}")
yield (first_quote, stream)