Merge pull request #94 from pikers/marketstore_integration

Marketstore integration
bar_select
goodboy 2020-09-04 10:36:53 -04:00 committed by GitHub
commit f8487e250e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 972 additions and 185 deletions

View File

@ -9,8 +9,8 @@ trading and financial analysis targetted at hardcore Linux users.
It tries to use as much bleeding edge tech as possible including (but not limited to):
- Python 3.7+ for glue and business logic
- trio_ and asyncio_ for async
- Python 3.7+ for glue_ and business logic
- trio_ for async
- tractor_ as the underlying actor model
- marketstore_ for historical and real-time tick data persistence and sharing
- techtonicdb_ for L2 book storage
@ -23,6 +23,7 @@ It tries to use as much bleeding edge tech as possible including (but not limite
.. _marketstore: https://github.com/alpacahq/marketstore
.. _techtonicdb: https://github.com/0b01/tectonicdb
.. _Qt: https://www.qt.io/
.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue
Focus and Features:

View File

@ -14,7 +14,7 @@ import tractor
from ..cli import cli
from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger
from ..brokers.core import maybe_spawn_brokerd_as_subactor
from ..data import maybe_spawn_brokerd
from ..brokers import core, get_brokermod, data
log = get_logger('cli')
@ -99,7 +99,7 @@ def quote(config, tickers, df_output):
@cli.command()
@click.option('--df-output', '-df', flag_value=True,
help='Output in `pandas.DataFrame` format')
@click.option('--count', '-c', default=100,
@click.option('--count', '-c', default=1000,
help='Number of bars to retrieve')
@click.argument('symbol', required=True)
@click.pass_obj
@ -117,10 +117,11 @@ def bars(config, symbol, count, df_output):
brokermod,
symbol,
count=count,
as_np=df_output
)
)
if not bars:
if not len(bars):
log.error(f"No quotes could be found for {symbol}?")
return
@ -130,49 +131,6 @@ def bars(config, symbol, count, df_output):
click.echo(colorize_json(bars))
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--rate', '-r', default=3, help='Quote rate limit')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--dhost', '-dh', default='127.0.0.1',
help='Daemon host address to connect to')
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def monitor(config, rate, name, dhost, test, tl):
"""Start a real-time watchlist UI
"""
# global opts
brokermod = config['brokermod']
loglevel = config['loglevel']
log = config['log']
watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path)
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
tickers = watchlists[name]
if not tickers:
log.error(f"No symbols found for watchlist `{name}`?")
return
from ..ui.monitor import _async_main
async def main(tries):
async with maybe_spawn_brokerd_as_subactor(
tries=tries, loglevel=loglevel
) as portal:
# run app "main"
await _async_main(
name, portal, tickers,
brokermod, rate, test=test,
)
tractor.run(
partial(main, tries=1),
name='monitor',
loglevel=loglevel if tl else None,
rpc_module_paths=['piker.ui.monitor'],
start_method='forkserver',
)
@cli.command()
@click.option('--rate', '-r', default=5, help='Logging level')
@ -198,7 +156,7 @@ def record(config, rate, name, dhost, filename):
return
async def main(tries):
async with maybe_spawn_brokerd_as_subactor(
async with maybe_spawn_brokerd(
tries=tries, loglevel=loglevel
) as portal:
# run app "main"
@ -271,37 +229,40 @@ def optsquote(config, symbol, df_output, date):
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--date', '-d', help='Contracts expiry date')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--rate', '-r', default=1, help='Logging level')
@click.argument('symbol', required=True)
@click.argument('tickers', nargs=-1, required=True)
@click.pass_obj
def optschain(config, symbol, date, tl, rate, test):
"""Start an option chain UI
def symbol_info(config, tickers):
"""Print symbol quotes to the console
"""
# global opts
loglevel = config['loglevel']
brokername = config['broker']
brokermod = config['brokermod']
from ..ui.option_chain import _async_main
quotes = trio.run(partial(core.symbol_info, brokermod, tickers))
if not quotes:
log.error(f"No quotes could be found for {tickers}?")
return
async def main(tries):
async with maybe_spawn_brokerd_as_subactor(
tries=tries, loglevel=loglevel
):
# run app "main"
await _async_main(
symbol,
brokername,
rate=rate,
loglevel=loglevel,
test=test,
)
if len(quotes) < len(tickers):
syms = tuple(map(itemgetter('symbol'), quotes))
for ticker in tickers:
if ticker not in syms:
brokermod.log.warn(f"Could not find symbol {ticker}?")
tractor.run(
partial(main, tries=1),
name='kivy-options-chain',
loglevel=loglevel if tl else None,
start_method='forkserver',
)
click.echo(colorize_json(quotes))
@cli.command()
@click.argument('pattern', required=True)
@click.pass_obj
def search(config, pattern):
"""Search for symbols from broker backend(s).
"""
# global opts
brokermod = config['brokermod']
quotes = trio.run(partial(core.symbol_search, brokermod, pattern))
if not quotes:
log.error(f"No matches could be found for {pattern}?")
return
click.echo(colorize_json(quotes))

View File

@ -1,23 +1,18 @@
"""
Broker high level API layer.
Broker high level cross-process API layer.
This API should be kept "remote service compatible" meaning inputs to
routines should be primitive data types where possible.
"""
import inspect
from types import ModuleType
from typing import List, Dict, Any, Optional
from async_generator import asynccontextmanager
import tractor
from ..log import get_logger
from .data import DataFeed
from . import get_brokermod
log = get_logger('broker.core')
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
]
log = get_logger(__name__)
async def api(brokername: str, methname: str, **kwargs) -> dict:
@ -25,12 +20,11 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
"""
brokermod = get_brokermod(brokername)
async with brokermod.get_client() as client:
meth = getattr(client.api, methname, None)
meth = getattr(client, methname, None)
if meth is None:
log.warning(
log.debug(
f"Couldn't find API method {methname} looking up on client")
meth = getattr(client, methname, None)
meth = getattr(client.api, methname, None)
if meth is None:
log.error(f"No api method `{methname}` could be found?")
@ -48,24 +42,6 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
return await meth(**kwargs)
@asynccontextmanager
async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None):
"""If no ``brokerd`` daemon-actor can be found spawn one in a
local subactor.
"""
async with tractor.open_nursery() as nursery:
async with tractor.find_actor('brokerd') as portal:
if not portal:
log.info(
"No broker daemon could be found, spawning brokerd..")
portal = await nursery.start_actor(
'brokerd',
rpc_module_paths=_data_mods,
loglevel=loglevel,
)
yield portal
async def stocks_quote(
brokermod: ModuleType,
tickers: List[str]
@ -121,3 +97,26 @@ async def bars(
"""
async with brokermod.get_client() as client:
return await client.bars(symbol, **kwargs)
async def symbol_info(
brokermod: ModuleType,
symbol: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return symbol info from broker.
"""
async with brokermod.get_client() as client:
return await client.symbol_info(symbol, **kwargs)
async def symbol_search(
brokermod: ModuleType,
symbol: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return symbol info from broker.
"""
async with brokermod.get_client() as client:
# TODO: support multiple asset type concurrent searches.
return await client.search_stocks(symbol, **kwargs)

View File

@ -12,7 +12,7 @@ import typing
from typing import (
Coroutine, Callable, Dict,
List, Any, Tuple, AsyncGenerator,
Sequence,
Sequence
)
import contextlib
from operator import itemgetter
@ -25,7 +25,7 @@ from ..log import get_logger, get_console_log
from . import get_brokermod
log = get_logger('broker.data')
log = get_logger(__name__)
async def wait_for_network(
@ -47,7 +47,7 @@ async def wait_for_network(
continue
except socket.gaierror:
if not down: # only report/log network down once
log.warn(f"Network is down waiting for re-establishment...")
log.warn("Network is down waiting for re-establishment...")
down = True
await trio.sleep(sleep)
@ -80,24 +80,30 @@ class BrokerFeed:
@tractor.msg.pub(tasks=['stock', 'option'])
async def stream_requests(
async def stream_poll_requests(
get_topics: typing.Callable,
get_quotes: Coroutine,
feed: BrokerFeed,
rate: int = 3, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None:
"""Stream requests for quotes for a set of symbols at the given
``rate`` (per second).
This routine is built for brokers who support quote polling for multiple
symbols per request. The ``get_topics()`` func is called to retreive the
set of symbols each iteration and ``get_quotes()`` is to retreive
the quotes.
A stock-broker client ``get_quotes()`` async function must be
provided which returns an async quote retrieval function.
"""
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
.. note::
This code is mostly tailored (for now) to the questrade backend.
It is currently the only broker that doesn't support streaming without
paying for data. See the note in the diffing section regarding volume
differentials which needs to be addressed in order to get cross-broker
support.
"""
sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching
@ -130,17 +136,47 @@ async def stream_requests(
for quote in quotes:
symbol = quote['symbol']
last = _cache.setdefault(symbol, {})
last_volume = last.get('volume', 0)
# find all keys that have match to a new value compared
# to the last quote received
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
# only ship diff updates and other required fields
payload = {k: quote[k] for k, v in new}
payload['symbol'] = symbol
# if there was volume likely the last size of
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
volume = payload.get('volume')
if volume:
volume_since_last_quote = volume - last_volume
assert volume_since_last_quote > 0
payload['volume_delta'] = volume_since_last_quote
# TODO: We can emit 2 ticks here:
# - one for the volume differential
# - one for the last known trade size
# The first in theory can be unwound and
# interpolated assuming the broker passes an
# accurate daily VWAP value.
# To make this work we need a universal ``size``
# field that is normalized before hitting this logic.
# XXX: very questrade specific
payload['size'] = quote['lastTradeSize']
# XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all
# expiries even though this is uncessary for the
# stock case (different topic [i.e. symbol] for each
# quote).
new_quotes.setdefault(quote['key'], []).append(quote)
new_quotes.setdefault(quote['key'], []).append(payload)
else:
# log.debug(f"Delivering quotes:\n{quotes}")
for quote in quotes:
@ -168,7 +204,7 @@ async def symbol_data(broker: str, tickers: List[str]):
"""Retrieve baseline symbol info from broker.
"""
async with get_cached_feed(broker) as feed:
return await feed.client.symbol_data(tickers)
return await feed.client.symbol_info(tickers)
async def smoke_quote(get_quotes, tickers, broker):
@ -290,6 +326,7 @@ async def start_quote_stream(
# do a smoke quote (note this mutates the input list and filters
# out bad symbols for now)
payload = await smoke_quote(get_quotes, symbols, broker)
formatter = feed.mod.format_stock_quote
elif feed_type == 'option':
# FIXME: yeah we need maybe a more general way to specify
@ -304,14 +341,21 @@ async def start_quote_stream(
quote['symbol']: quote
for quote in await get_quotes(symbols)
}
formatter = feed.mod.format_option_quote
def packetizer(topic, quotes):
return {quote['symbol']: quote for quote in quotes}
sd = await feed.client.symbol_info(symbols)
# formatter = partial(formatter, symbol_data=sd)
packetizer = partial(
feed.mod.packetizer,
formatter=formatter,
symbol_data=sd,
)
# push initial smoke quote response for client initialization
await ctx.send_yield(payload)
await stream_requests(
await stream_poll_requests(
# ``msg.pub`` required kwargs
task_name=feed_type,
@ -320,7 +364,6 @@ async def start_quote_stream(
packetizer=packetizer,
# actual func args
feed=feed,
get_quotes=get_quotes,
diff_cached=diff_cached,
rate=rate,
@ -378,15 +421,19 @@ class DataFeed:
# subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded)
sd = await self.portal.run(
"piker.brokers.data", 'symbol_data',
broker=self.brokermod.name, tickers=symbols)
"piker.brokers.data",
'symbol_data',
broker=self.brokermod.name,
tickers=symbols
)
self._symbol_data_cache.update(sd)
if test:
# stream from a local test file
quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
"piker.brokers.data",
'stream_from_file',
filename=test,
)
else:
log.info(f"Starting new stream for {symbols}")

View File

@ -3,14 +3,21 @@ Questrade API backend.
"""
from __future__ import annotations
import inspect
import contextlib
import time
from datetime import datetime
from functools import partial
import itertools
import configparser
from typing import List, Tuple, Dict, Any, Iterator, NamedTuple
from typing import (
List, Tuple, Dict, Any, Iterator, NamedTuple,
AsyncGenerator,
Callable,
)
import arrow
import trio
import tractor
from async_generator import asynccontextmanager
import pandas as pd
import numpy as np
@ -20,8 +27,9 @@ import asks
from ..calc import humanize, percent_change
from . import config
from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json
from ..log import get_logger, colorize_json, get_console_log
from .._async_utils import async_lifo_cache
from . import get_brokermod
log = get_logger(__name__)
@ -407,10 +415,10 @@ class Client:
return symbols2ids
async def symbol_data(self, tickers: List[str]):
"""Return symbol data for ``tickers``.
async def symbol_info(self, symbols: List[str]):
"""Return symbol data for ``symbols``.
"""
t2ids = await self.tickers2ids(tickers)
t2ids = await self.tickers2ids(symbols)
ids = ','.join(t2ids.values())
symbols = {}
for pkt in (await self.api.symbols(ids=ids))['symbols']:
@ -418,6 +426,9 @@ class Client:
return symbols
# TODO: deprecate
symbol_data = symbol_info
async def quote(self, tickers: [str]):
"""Return stock quotes for each ticker in ``tickers``.
"""
@ -598,6 +609,24 @@ class Client:
f"Took {time.time() - start} seconds to retreive {len(bars)} bars")
return bars
async def search_stocks(
self,
pattern: str,
# how many contracts to return
upto: int = 10,
) -> Dict[str, str]:
details = {}
results = await self.api.search(prefix=pattern)
for result in results['symbols']:
sym = result['symbol']
if '.' not in sym:
sym = f"{sym}.{result['listingExchange']}"
details[sym] = result
if len(details) == upto:
return details
# marketstore TSD compatible numpy dtype for bar
_qt_bars_dt = [
@ -839,29 +868,43 @@ def format_stock_quote(
and the second is the same but with all values converted to a
"display-friendly" string format.
"""
last = quote['lastTradePrice']
symbol = quote['symbol']
previous = symbol_data[symbol]['prevDayClosePrice']
change = percent_change(previous, last)
share_count = symbol_data[symbol].get('outstandingShares', None)
mktcap = share_count * last if (last and share_count) else 0
computed = {
'symbol': quote['symbol'],
'%': round(change, 3),
'MC': mktcap,
# why QT do you have to be an asshole shipping null values!!!
'$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3),
'close': previous,
}
computed = {'symbol': symbol}
last = quote.get('lastTradePrice')
if last:
change = percent_change(previous, last)
share_count = symbol_data[symbol].get('outstandingShares', None)
mktcap = share_count * last if (last and share_count) else 0
computed.update({
# 'symbol': quote['symbol'],
'%': round(change, 3),
'MC': mktcap,
# why questrade do you have to be shipping null values!!!
# '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3),
'close': previous,
})
vwap = quote.get('VWAP')
volume = quote.get('volume')
if volume is not None: # could be 0
# why questrade do you have to be an asshole shipping null values!!!
computed['$ vol'] = round((vwap or 0) * (volume or 0), 3)
new = {}
displayable = {}
for key, new_key in keymap.items():
display_value = value = computed.get(key) or quote.get(key)
for key, value in itertools.chain(quote.items(), computed.items()):
new_key = keymap.get(key)
if not new_key:
continue
# API servers can return `None` vals when markets are closed (weekend)
value = 0 if value is None else value
display_value = value
# convert values to a displayble format using available formatting func
if isinstance(new_key, tuple):
new_key, func = new_key
@ -891,7 +934,8 @@ _qt_option_keys = {
# "theta": ('theta', partial(round, ndigits=3)),
# "vega": ('vega', partial(round, ndigits=3)),
'$ vol': ('$ vol', humanize),
'volume': ('vol', humanize),
# XXX: required key to trigger trade execution datum msg
'volume': ('volume', humanize),
# "2021-01-15T00:00:00.000000-05:00",
# "isHalted": false,
# "key": [
@ -939,7 +983,7 @@ def format_option_quote(
"display-friendly" string format.
"""
# TODO: need historical data..
# (cause why would QT keep their quote structure consistent across
# (cause why would questrade keep their quote structure consistent across
# assets..)
# previous = symbol_data[symbol]['prevDayClosePrice']
# change = percent_change(previous, last)
@ -968,3 +1012,164 @@ def format_option_quote(
displayable[new_key] = display_value
return new, displayable
@asynccontextmanager
async def get_cached_client(
brokername: str,
*args,
**kwargs,
) -> 'Client':
"""Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it.
"""
# check if a cached client is in the local actor's statespace
ss = tractor.current_actor().statespace
clients = ss.setdefault('clients', {'_lock': trio.Lock()})
lock = clients['_lock']
client = None
try:
log.info(f"Loading existing `{brokername}` daemon")
async with lock:
client = clients[brokername]
client._consumers += 1
yield client
except KeyError:
log.info(f"Creating new client for broker {brokername}")
async with lock:
brokermod = get_brokermod(brokername)
exit_stack = contextlib.AsyncExitStack()
client = await exit_stack.enter_async_context(
brokermod.get_client()
)
client._consumers = 0
client._exit_stack = exit_stack
clients[brokername] = client
yield client
finally:
client._consumers -= 1
if client._consumers <= 0:
# teardown the client
await client._exit_stack.aclose()
async def smoke_quote(get_quotes, tickers): # , broker):
"""Do an initial "smoke" request for symbols in ``tickers`` filtering
out any symbols not supported by the broker queried in the call to
``get_quotes()``.
"""
from operator import itemgetter
# TODO: trim out with #37
#################################################
# get a single quote filtering out any bad tickers
# NOTE: this code is always run for every new client
# subscription even when a broker quoter task is already running
# since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for symbols {tickers}")
quotes = await get_quotes(tickers)
# report any tickers that aren't returned in the first quote
invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes))
for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn(
f"Symbol `{symbol}` not found") # by broker `{broker}`"
# )
# pop any tickers that return "empty" quotes
payload = {}
for quote in quotes:
symbol = quote['symbol']
if quote is None:
log.warn(
f"Symbol `{symbol}` not found")
# XXX: not this mutates the input list (for now)
tickers.remove(symbol)
continue
# report any unknown/invalid symbols (QT specific)
if quote.get('low52w', False) is None:
log.error(
f"{symbol} seems to be defunct")
payload[symbol] = quote
return payload
# end of section to be trimmed out with #37
###########################################
# function to format packets delivered to subscribers
def packetizer(
topic: str,
quotes: Dict[str, Any],
formatter: Callable,
symbol_data: Dict[str, Any],
) -> Dict[str, Any]:
"""Normalize quotes by name into dicts using broker-specific
processing.
"""
new = {}
for quote in quotes:
new[quote['symbol']], _ = formatter(quote, symbol_data)
return new
@tractor.stream
async def stream_quotes(
ctx: tractor.Context, # marks this as a streaming func
symbols: List[str],
feed_type: str = 'stock',
diff_cached: bool = True,
rate: int = 3,
loglevel: str = None,
# feed_type: str = 'stock',
) -> AsyncGenerator[str, Dict[str, Any]]:
# XXX: why do we need this again?
get_console_log(tractor.current_actor().loglevel)
async with get_cached_client('questrade') as client:
if feed_type == 'stock':
formatter = format_stock_quote
get_quotes = await stock_quoter(client, symbols)
# do a smoke quote (note this mutates the input list and filters
# out bad symbols for now)
payload = await smoke_quote(get_quotes, list(symbols))
else:
formatter = format_option_quote
get_quotes = await option_quoter(client, symbols)
# packetize
payload = {
quote['symbol']: quote
for quote in await get_quotes(symbols)
}
sd = await client.symbol_info(symbols)
# push initial smoke quote response for client initialization
await ctx.send_yield(payload)
from .data import stream_poll_requests
await stream_poll_requests(
# ``msg.pub`` required kwargs
task_name=feed_type,
ctx=ctx,
topics=symbols,
packetizer=partial(
packetizer,
formatter=formatter,
symboal_data=sd,
),
# actual func args
get_quotes=get_quotes,
diff_cached=diff_cached,
rate=rate,
)
log.info("Terminating stream quoter task")

View File

@ -8,7 +8,6 @@ import tractor
from ..log import get_console_log, get_logger
from ..brokers import get_brokermod, config
from ..brokers.core import _data_mods
log = get_logger('cli')
DEFAULT_BROKER = 'questrade'
@ -17,6 +16,7 @@ _config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
_context_defaults = dict(
default_map={
# Questrade specific quote poll rates
'monitor': {
'rate': 3,
},
@ -34,6 +34,7 @@ _context_defaults = dict(
def pikerd(loglevel, host, tl):
"""Spawn the piker broker-daemon.
"""
from ..data import _data_mods
get_console_log(loglevel)
tractor.run_daemon(
rpc_module_paths=_data_mods,
@ -64,7 +65,12 @@ def cli(ctx, broker, loglevel, configdir):
})
def _load_clis() -> None:
from ..data import marketstore as _
from ..brokers import cli as _ # noqa
from ..ui import cli as _ # noqa
from ..watchlists import cli as _ # noqa
# load downstream cli modules
from ..brokers import cli as _
from ..watchlists import cli as _
from ..data import marketstore as _
_load_clis()

View File

@ -0,0 +1,118 @@
"""
Data feed apis and infra.
We provide tsdb integrations for retrieving
and storing data from your brokers as well as
sharing your feeds with other fellow pikers.
"""
from contextlib import asynccontextmanager
from importlib import import_module
from types import ModuleType
from typing import (
Dict, List, Any,
Sequence, AsyncIterator, Optional
)
import trio
import tractor
from ..brokers import get_brokermod
from ..log import get_logger, get_console_log
log = get_logger(__name__)
__ingestors__ = [
'marketstore',
]
def get_ingestor(name: str) -> ModuleType:
"""Return the imported ingestor module by name.
"""
module = import_module('.' + name, 'piker.data')
# we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
]
@asynccontextmanager
async def maybe_spawn_brokerd(
brokername: str,
sleep: float = 0.5,
loglevel: Optional[str] = None,
expose_mods: List = [],
**tractor_kwargs,
) -> tractor._portal.Portal:
"""If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
if loglevel:
get_console_log(loglevel)
tractor_kwargs['loglevel'] = loglevel
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
async with tractor.find_actor(dname) as portal:
# WTF: why doesn't this work?
log.info(f"YOYOYO {__name__}")
if portal is not None:
yield portal
else:
log.info(f"Spawning {brokername} broker daemon")
tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
async with tractor.open_nursery() as nursery:
try:
# spawn new daemon
portal = await nursery.start_actor(
dname,
rpc_module_paths=_data_mods + [brokermod.__name__],
loglevel=loglevel,
**tractor_kwargs
)
async with tractor.wait_for_actor(dname) as portal:
yield portal
finally:
# client code may block indefinitely so cancel when
# teardown is invoked
await nursery.cancel()
@asynccontextmanager
async def open_feed(
name: str,
symbols: Sequence[str],
loglevel: str = 'info',
) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes.
"""
try:
mod = get_brokermod(name)
except ImportError:
mod = get_ingestormod(name)
async with maybe_spawn_brokerd(
mod.name,
loglevel=loglevel,
) as portal:
stream = await portal.run(
mod.__name__,
'stream_quotes',
symbols=symbols,
)
# Feed is required to deliver an initial quote asap.
# TODO: should we timeout and raise a more explicit error?
# with trio.fail_after(5):
with trio.fail_after(float('inf')):
# Retreive initial quote for each symbol
# such that consumer code can know the data layout
first_quote = await stream.__anext__()
log.info(f"Received first quote {first_quote}")
yield (first_quote, stream)

View File

@ -0,0 +1,326 @@
"""
``marketstore`` integration.
- client management routines
- ticK data ingest routines
- websocket client for subscribing to write triggers
- todo: tick sequence stream-cloning for testing
- todo: docker container management automation
"""
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple
import time
from math import isnan
import msgpack
import numpy as np
import pandas as pd
import pymarketstore as pymkts
from trio_websocket import open_websocket_url
from ..log import get_logger, get_console_log
from ..data import open_feed
log = get_logger(__name__)
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
_url: str = 'http://localhost:5993/rpc'
_quote_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask)
# ('fill_time', 'f4'),
('Last', 'f4'),
('Bid', 'f4'),
('Bsize', 'i8'),
('Asize', 'i8'),
('Ask', 'f4'),
('Size', 'i8'),
('Volume', 'i8'),
# ('brokerd_ts', 'i64'),
# ('VWAP', 'f4')
]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = {
'Up': 1,
'Equal': 0,
'Down': -1,
None: np.nan,
}
class MarketStoreError(Exception):
"Generic marketstore client error"
def err_on_resp(response: dict) -> None:
"""Raise any errors found in responses from client request.
"""
responses = response['responses']
if responses is not None:
for r in responses:
err = r['error']
if err:
raise MarketStoreError(err)
def quote_to_marketstore_structarray(
quote: Dict[str, Any],
last_fill: str,
) -> np.array:
"""Return marketstore writeable structarray from quote ``dict``.
"""
if last_fill:
# new fill bby
now = timestamp(last_fill)
else:
# this should get inserted upstream by the broker-client to
# subtract from IPC latency
now = time.time_ns()
secs, ns = now / 10**9, now % 10**9
# pack into List[Tuple[str, Any]]
array_input = []
# insert 'Epoch' entry first and then 'Nanoseconds'.
array_input.append(int(secs))
array_input.append(int(ns))
# append remaining fields
for name, dt in _quote_dt[2:]:
if 'f' in dt:
none = np.nan
else:
# for ``np.int`` we use 0 as a null value
none = 0
# casefold? see https://github.com/alpacahq/marketstore/issues/324
val = quote.get(name.casefold(), none)
array_input.append(val)
return np.array([tuple(array_input)], dtype=_quote_dt)
def timestamp(datestr: str) -> int:
"""Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
"""
return int(pd.Timestamp(datestr).value)
def mk_tbk(keys: Tuple[str, str, str]) -> str:
"""Generate a marketstore table key from a tuple.
Converts,
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
"""
return '{}/' + '/'.join(keys)
class Client:
"""Async wrapper around the alpaca ``pymarketstore`` sync client.
This will server as the shell for building out a proper async client
that isn't horribly documented and un-tested..
"""
def __init__(self, url: str):
self._client = pymkts.Client(url)
async def _invoke(
self,
meth: Callable,
*args,
**kwargs,
) -> Any:
return err_on_resp(meth(*args, **kwargs))
async def destroy(
self,
tbk: Tuple[str, str, str],
) -> None:
return await self._invoke(self._client.destroy, mk_tbk(tbk))
async def list_symbols(
self,
tbk: str,
) -> List[str]:
return await self._invoke(self._client.list_symbols, mk_tbk(tbk))
async def write(
self,
symbol: str,
array: np.ndarray,
) -> None:
start = time.time()
await self._invoke(
self._client.write,
array,
_tick_tbk.format(symbol),
isvariablelength=True
)
log.debug(f"{symbol} write time (s): {time.time() - start}")
def query(
self,
symbol,
tbk: Tuple[str, str] = _tick_tbk_ids,
) -> pd.DataFrame:
# XXX: causes crash
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
result = self._client.query(
pymkts.Params(symbol, *tbk),
)
return result.first().df()
@asynccontextmanager
async def get_client(
url: str = _url,
) -> Client:
yield Client(url)
async def ingest_quote_stream(
symbols: List[str],
brokername: str,
tries: int = 1,
loglevel: str = None,
) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
"""
async with open_feed(
brokername,
symbols,
loglevel=loglevel,
) as (first_quotes, qstream):
quote_cache = first_quotes.copy()
async with get_client() as ms_client:
# start ingest to marketstore
async for quotes in qstream:
log.info(quotes)
for symbol, quote in quotes.items():
# remap tick strs to ints
quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
# check for volume update (i.e. did trades happen
# since last quote)
new_vol = quote.get('volume', None)
if new_vol is None:
log.debug(f"No fills for {symbol}")
if new_vol == quote_cache.get('volume'):
# should never happen due to field diffing
# on sender side
log.error(
f"{symbol}: got same volume as last quote?")
quote_cache.update(quote)
a = quote_to_marketstore_structarray(
quote,
# TODO: check this closer to the broker query api
last_fill=quote.get('fill_time', '')
)
await ms_client.write(symbol, a)
async def stream_quotes(
symbols: List[str],
host: str = 'localhost',
port: int = 5993,
diff_cached: bool = True,
loglevel: str = None,
) -> None:
"""Open a symbol stream from a running instance of marketstore and
log to console.
"""
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> Dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams']
log.info(f"Subscribed to {streams}")
_cache = {}
while True:
msg = await recv()
# unpack symbol and quote data
# key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
symbol = msg['key'].split('/')[0]
data = msg['data']
# calc time stamp(s)
s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
ts = s * 10**9 + ns
data['broker_fill_time_ns'] = ts
quote = {}
for k, v in data.items():
if isnan(v):
continue
quote[k.lower()] = v
quote['symbol'] = symbol
quotes = {}
if diff_cached:
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(f"New quote {quote['symbol']}:\n{new}")
# only ship diff updates and other required fields
payload = {k: quote[k] for k, v in new}
payload['symbol'] = symbol
# if there was volume likely the last size of
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
size = quote.get('size')
volume = quote.get('volume')
if size and volume:
new_volume_since_last = max(
volume - last.get('volume', 0), 0)
log.warning(
f"NEW VOLUME {symbol}:{new_volume_since_last}")
payload['size'] = size
payload['last'] = quote.get('last')
# XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all
# expiries even though this is uncessary for the
# stock case (different topic [i.e. symbol] for each
# quote).
quotes.setdefault(symbol, []).append(payload)
# update cache
_cache[symbol].update(quote)
else:
quotes = {symbol: [{key.lower(): val for key, val in quote.items()}]}
if quotes:
yield quotes

View File

@ -1,15 +0,0 @@
"""
Stuff for your eyes.
"""
import os
import sys
# XXX clear all flags at import to avoid upsetting
# ol' kivy see: https://github.com/kivy/kivy/issues/4225
# though this is likely a ``click`` problem
sys.argv[1:] = []
# use the trio async loop
os.environ['KIVY_EVENTLOOP'] = 'trio'
import kivy
kivy.require('1.10.0')

105
piker/ui/cli.py 100644
View File

@ -0,0 +1,105 @@
"""
Console interface to UI components.
"""
from functools import partial
import os
import click
import tractor
from ..cli import cli
from .. import watchlists as wl
from ..data import maybe_spawn_brokerd
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
def _kivy_import_hack():
# Command line hacks to make it work.
# See the pkg mod.
from .kivy import kivy # noqa
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--rate', '-r', default=3, help='Quote rate limit')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--dhost', '-dh', default='127.0.0.1',
help='Daemon host address to connect to')
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def monitor(config, rate, name, dhost, test, tl):
"""Start a real-time watchlist UI
"""
# global opts
brokermod = config['brokermod']
loglevel = config['loglevel']
log = config['log']
watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path)
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
tickers = watchlists[name]
if not tickers:
log.error(f"No symbols found for watchlist `{name}`?")
return
_kivy_import_hack()
from .kivy.monitor import _async_main
async def main(tries):
async with maybe_spawn_brokerd(
brokername=brokermod.name,
tries=tries, loglevel=loglevel
) as portal:
# run app "main"
await _async_main(
name, portal, tickers,
brokermod, rate, test=test,
)
tractor.run(
partial(main, tries=1),
name='monitor',
loglevel=loglevel if tl else None,
rpc_module_paths=['piker.ui.kivy.monitor'],
start_method='forkserver',
)
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--date', '-d', help='Contracts expiry date')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--rate', '-r', default=1, help='Logging level')
@click.argument('symbol', required=True)
@click.pass_obj
def optschain(config, symbol, date, tl, rate, test):
"""Start an option chain UI
"""
# global opts
loglevel = config['loglevel']
brokername = config['broker']
_kivy_import_hack()
from .kivy.option_chain import _async_main
async def main(tries):
async with maybe_spawn_brokerd(
tries=tries, loglevel=loglevel
):
# run app "main"
await _async_main(
symbol,
brokername,
rate=rate,
loglevel=loglevel,
test=test,
)
tractor.run(
partial(main, tries=1),
name='kivy-options-chain',
loglevel=loglevel if tl else None,
start_method='forkserver',
)

View File

@ -0,0 +1,15 @@
"""
Legacy kivy components.
"""
import os
import sys
# XXX clear all flags at import to avoid upsetting
# ol' kivy see: https://github.com/kivy/kivy/issues/4225
# though this is likely a ``click`` problem
sys.argv[1:] = []
# use the trio async loop
os.environ['KIVY_EVENTLOOP'] = 'trio'
import kivy
kivy.require('1.10.0')

View File

@ -51,23 +51,25 @@ async def update_quotes(
chngcell = row.get_cell('%')
# determine daily change color
color = colorcode('gray')
percent_change = record.get('%')
if percent_change:
daychange = float(record['%'])
if percent_change is not None and percent_change != chngcell:
daychange = float(percent_change)
if daychange < 0.:
color = colorcode('red2')
elif daychange > 0.:
color = colorcode('forestgreen')
else:
color = colorcode('gray')
# update row header and '%' cell text color
if chngcell:
chngcell.color = color
hdrcell.color = color
# if the cell has been "highlighted" make sure to change its color
if hdrcell.background_color != [0]*4:
hdrcell.background_color = color
# update row header and '%' cell text color
chngcell.color = color
hdrcell.color = color
# briefly highlight bg of certain cells on each trade execution
unflash = set()
tick_color = None
@ -123,10 +125,13 @@ async def update_quotes(
record, displayable = formatter(
quote, symbol_data=symbol_data)
# don't red/green the header cell in ``row.update()``
record.pop('symbol')
# determine if sorting should happen
sort_key = table.sort_key
new = record[sort_key]
last = row.get_field(sort_key)
new = record.get(sort_key, last)
if new != last:
to_sort.add(row.widget)

View File

@ -340,6 +340,7 @@ class Row(HoverBehavior, GridLayout):
gray = colorcode('gray')
fgreen = colorcode('forestgreen')
red = colorcode('red2')
for key, val in record.items():
last = self.get_field(key)
color = gray
@ -361,7 +362,7 @@ class Row(HoverBehavior, GridLayout):
if color != gray:
cells[key] = cell
self._last_record = record
self._last_record.update(record)
return cells
# mouse over handlers

View File

@ -30,7 +30,6 @@ setup(
license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='tgoodlet@gmail.com',
url='https://github.com/pikers/piker',
platforms=['linux'],
packages=[
@ -46,13 +45,27 @@ setup(
]
},
install_requires=[
'click', 'colorlog', 'trio', 'attrs', 'async_generator',
'pygments', 'cython', 'asks', 'pandas', 'msgpack',
'click',
'colorlog',
'trio',
'attrs',
'async_generator',
'pygments',
# brokers
'asks',
'ib_insync',
# numerics
'arrow', # better datetimes
'cython',
'numpy',
'pandas',
# tsdbs
'pymarketstore',
#'kivy', see requirement.txt; using a custom branch atm
],
extras_require={
'questrade': ['asks'],
},
tests_require=['pytest'],
python_requires=">=3.7", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"],
@ -61,7 +74,7 @@ setup(
'License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)',
'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
# "Programming Language :: Python :: Implementation :: PyPy",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",