Port to new data apis

marketstore_integration
Tyler Goodlet 2020-07-15 08:28:13 -04:00
parent 57a8db8cba
commit 78784a4bf3
6 changed files with 187 additions and 48 deletions

View File

@ -14,7 +14,7 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger from ..log import get_console_log, colorize_json, get_logger
from ..brokers.core import maybe_spawn_brokerd_as_subactor from ..data import maybe_spawn_brokerd
from ..brokers import core, get_brokermod, data from ..brokers import core, get_brokermod, data
log = get_logger('cli') log = get_logger('cli')
@ -99,7 +99,7 @@ def quote(config, tickers, df_output):
@cli.command() @cli.command()
@click.option('--df-output', '-df', flag_value=True, @click.option('--df-output', '-df', flag_value=True,
help='Output in `pandas.DataFrame` format') help='Output in `pandas.DataFrame` format')
@click.option('--count', '-c', default=100, @click.option('--count', '-c', default=1000,
help='Number of bars to retrieve') help='Number of bars to retrieve')
@click.argument('symbol', required=True) @click.argument('symbol', required=True)
@click.pass_obj @click.pass_obj
@ -117,10 +117,11 @@ def bars(config, symbol, count, df_output):
brokermod, brokermod,
symbol, symbol,
count=count, count=count,
as_np=df_output
) )
) )
if not bars: if not len(bars):
log.error(f"No quotes could be found for {symbol}?") log.error(f"No quotes could be found for {symbol}?")
return return
@ -198,7 +199,7 @@ def record(config, rate, name, dhost, filename):
return return
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd_as_subactor( async with maybe_spawn_brokerd(
tries=tries, loglevel=loglevel tries=tries, loglevel=loglevel
) as portal: ) as portal:
# run app "main" # run app "main"

View File

@ -1,23 +1,18 @@
""" """
Broker high level API layer. Broker high level cross-process API layer.
This API should be kept "remote service compatible" meaning inputs to
routines should be primitive data types where possible.
""" """
import inspect import inspect
from types import ModuleType from types import ModuleType
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from async_generator import asynccontextmanager
import tractor
from ..log import get_logger from ..log import get_logger
from .data import DataFeed
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.core') log = get_logger(__name__)
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
]
async def api(brokername: str, methname: str, **kwargs) -> dict: async def api(brokername: str, methname: str, **kwargs) -> dict:
@ -25,12 +20,11 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
""" """
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
meth = getattr(client, methname, None)
meth = getattr(client.api, methname, None)
if meth is None: if meth is None:
log.warning( log.warning(
f"Couldn't find API method {methname} looking up on client") f"Couldn't find API method {methname} looking up on client")
meth = getattr(client, methname, None) meth = getattr(client.api, methname, None)
if meth is None: if meth is None:
log.error(f"No api method `{methname}` could be found?") log.error(f"No api method `{methname}` could be found?")
@ -48,24 +42,6 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
return await meth(**kwargs) return await meth(**kwargs)
@asynccontextmanager
async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None):
"""If no ``brokerd`` daemon-actor can be found spawn one in a
local subactor.
"""
async with tractor.open_nursery() as nursery:
async with tractor.find_actor('brokerd') as portal:
if not portal:
log.info(
"No broker daemon could be found, spawning brokerd..")
portal = await nursery.start_actor(
'brokerd',
rpc_module_paths=_data_mods,
loglevel=loglevel,
)
yield portal
async def stocks_quote( async def stocks_quote(
brokermod: ModuleType, brokermod: ModuleType,
tickers: List[str] tickers: List[str]
@ -121,3 +97,26 @@ async def bars(
""" """
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
return await client.bars(symbol, **kwargs) return await client.bars(symbol, **kwargs)
async def symbol_info(
brokermod: ModuleType,
symbol: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return symbol info from broker.
"""
async with brokermod.get_client() as client:
return await client.symbol_info(symbol, **kwargs)
async def symbol_search(
brokermod: ModuleType,
pattern: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return symbol info from broker.
"""
async with brokermod.get_client() as client:
# TODO: support multiple asset type concurrent searches.
return await client.search_stocks(pattern=pattern, **kwargs)

View File

@ -25,7 +25,7 @@ from ..log import get_logger, get_console_log
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.data') log = get_logger(__name__)
async def wait_for_network( async def wait_for_network(
@ -80,7 +80,7 @@ class BrokerFeed:
@tractor.msg.pub(tasks=['stock', 'option']) @tractor.msg.pub(tasks=['stock', 'option'])
async def stream_requests( async def stream_poll_requests(
get_topics: typing.Callable, get_topics: typing.Callable,
get_quotes: Coroutine, get_quotes: Coroutine,
feed: BrokerFeed, feed: BrokerFeed,
@ -90,6 +90,12 @@ async def stream_requests(
"""Stream requests for quotes for a set of symbols at the given """Stream requests for quotes for a set of symbols at the given
``rate`` (per second). ``rate`` (per second).
This routine is built for brokers who support quote polling for multiple
symbols per request. The ``get_topics()`` func is called to retreive the
set of symbols each iteration and ``get_quotes()`` is to retreive
the quotes.
A stock-broker client ``get_quotes()`` async function must be A stock-broker client ``get_quotes()`` async function must be
provided which returns an async quote retrieval function. provided which returns an async quote retrieval function.
""" """
@ -327,7 +333,7 @@ async def start_quote_stream(
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await ctx.send_yield(payload) await ctx.send_yield(payload)
await stream_requests( await stream_poll_requests(
# ``msg.pub`` required kwargs # ``msg.pub`` required kwargs
task_name=feed_type, task_name=feed_type,
@ -394,15 +400,19 @@ class DataFeed:
# subscribe for tickers (this performs a possible filtering # subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded) # where invalid symbols are discarded)
sd = await self.portal.run( sd = await self.portal.run(
"piker.brokers.data", 'symbol_data', "piker.brokers.data",
broker=self.brokermod.name, tickers=symbols) 'symbol_data',
broker=self.brokermod.name,
tickers=symbols
)
self._symbol_data_cache.update(sd) self._symbol_data_cache.update(sd)
if test: if test:
# stream from a local test file # stream from a local test file
quote_gen = await self.portal.run( quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file', "piker.brokers.data",
filename=test 'stream_from_file',
filename=test,
) )
else: else:
log.info(f"Starting new stream for {symbols}") log.info(f"Starting new stream for {symbols}")

View File

@ -8,7 +8,6 @@ import tractor
from ..log import get_console_log, get_logger from ..log import get_console_log, get_logger
from ..brokers import get_brokermod, config from ..brokers import get_brokermod, config
from ..brokers.core import _data_mods
log = get_logger('cli') log = get_logger('cli')
DEFAULT_BROKER = 'questrade' DEFAULT_BROKER = 'questrade'
@ -34,6 +33,7 @@ _context_defaults = dict(
def pikerd(loglevel, host, tl): def pikerd(loglevel, host, tl):
"""Spawn the piker broker-daemon. """Spawn the piker broker-daemon.
""" """
from ..data import _data_mods
get_console_log(loglevel) get_console_log(loglevel)
tractor.run_daemon( tractor.run_daemon(
rpc_module_paths=_data_mods, rpc_module_paths=_data_mods,
@ -64,7 +64,12 @@ def cli(ctx, broker, loglevel, configdir):
}) })
def _load_clis() -> None:
from ..data import marketstore as _
from ..brokers import cli as _ # noqa
from ..ui import cli as _ # noqa
from ..watchlists import cli as _ # noqa
# load downstream cli modules # load downstream cli modules
from ..brokers import cli as _ _load_clis()
from ..watchlists import cli as _
from ..data import marketstore as _

View File

@ -19,11 +19,11 @@ import trio
import tractor import tractor
from trio_websocket import open_websocket_url from trio_websocket import open_websocket_url
from . import maybe_spawn_brokerd
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..brokers.data import DataFeed from ..brokers.data import DataFeed
from ..log import get_logger from ..log import get_logger
from ..brokers.core import maybe_spawn_brokerd_as_subactor
log = get_logger(__name__) log = get_logger(__name__)
@ -138,7 +138,7 @@ def ingest(config, name, test_file, tl, url):
symbols = watchlists[name] symbols = watchlists[name]
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd_as_subactor( async with maybe_spawn_brokerd(
tries=tries, tries=tries,
loglevel=loglevel loglevel=loglevel
) as portal: ) as portal:

124
piker/ui/cli.py 100644
View File

@ -0,0 +1,124 @@
"""
Console interface to UI components.
"""
from functools import partial
import os
import click
import tractor
from ..cli import cli
from .. import watchlists as wl
from ..data import maybe_spawn_brokerd
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
def _kivy_import_hack():
# Command line hacks to make it work.
# See the pkg mod.
from .kivy import kivy # noqa
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--rate', '-r', default=3, help='Quote rate limit')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--dhost', '-dh', default='127.0.0.1',
help='Daemon host address to connect to')
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def monitor(config, rate, name, dhost, test, tl):
"""Start a real-time watchlist UI
"""
# global opts
brokermod = config['brokermod']
loglevel = config['loglevel']
log = config['log']
watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path)
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
tickers = watchlists[name]
if not tickers:
log.error(f"No symbols found for watchlist `{name}`?")
return
_kivy_import_hack()
from .kivy.monitor import _async_main
async def main(tries):
async with maybe_spawn_brokerd(
brokername=brokermod.name,
tries=tries, loglevel=loglevel
) as portal:
# run app "main"
await _async_main(
name, portal, tickers,
brokermod, rate, test=test,
)
tractor.run(
partial(main, tries=1),
name='monitor',
loglevel=loglevel if tl else None,
rpc_module_paths=['piker.ui.kivy.monitor'],
start_method='forkserver',
)
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--date', '-d', help='Contracts expiry date')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--rate', '-r', default=1, help='Logging level')
@click.argument('symbol', required=True)
@click.pass_obj
def optschain(config, symbol, date, tl, rate, test):
"""Start an option chain UI
"""
# global opts
loglevel = config['loglevel']
brokername = config['broker']
_kivy_import_hack()
from .kivy.option_chain import _async_main
async def main(tries):
async with maybe_spawn_brokerd(
tries=tries, loglevel=loglevel
):
# run app "main"
await _async_main(
symbol,
brokername,
rate=rate,
loglevel=loglevel,
test=test,
)
tractor.run(
partial(main, tries=1),
name='kivy-options-chain',
loglevel=loglevel if tl else None,
start_method='forkserver',
)
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--date', '-d', help='Contracts expiry date')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--rate', '-r', default=1, help='Logging level')
@click.argument('symbol', required=True)
@click.pass_obj
def chart(config, symbol, date, tl, rate, test):
"""Start an option chain UI
"""
from ._chart import main
# global opts
loglevel = config['loglevel']
brokername = config['broker']
main(sym=symbol, brokername=brokername, loglevel=loglevel)