Add options query and data feed recording commands

Add `contracts` and `optsquote` commands for querying option contracts
info and market quotes respectively. Add a `record` command for
streaming real-time data feed quotes to disk. Port `monitor` to the
new `piker.brokers.data` module. Forward loglevel flags through to
`tractor` for relevant commands.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-11-22 16:21:15 -05:00
parent c23982393d
commit b9a9b7a9a3
1 changed files with 148 additions and 52 deletions

View File

@ -9,9 +9,10 @@ import click
import pandas as pd import pandas as pd
import trio import trio
import tractor import tractor
from async_generator import asynccontextmanager
from . import watchlists as wl from . import watchlists as wl
from .brokers import core, get_brokermod from .brokers import core, get_brokermod, data
from .log import get_console_log, colorize_json, get_logger from .log import get_console_log, colorize_json, get_logger
log = get_logger('cli') log = get_logger('cli')
@ -23,9 +24,10 @@ _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
@click.command() @click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host): def pikerd(loglevel, host, tl):
"""Spawn the piker daemon. """Spawn the piker broker-daemon.
""" """
get_console_log(loglevel) get_console_log(loglevel)
tractor.run_daemon( tractor.run_daemon(
@ -36,7 +38,7 @@ def pikerd(loglevel, host):
'dtasks': set(), 'dtasks': set(),
}, },
name='brokerd', name='brokerd',
loglevel=loglevel, loglevel=loglevel if tl else None,
) )
@ -116,45 +118,40 @@ def quote(loglevel, broker, tickers, df_output):
click.echo(colorize_json(quotes)) click.echo(colorize_json(quotes))
@cli.command() @asynccontextmanager
@click.option('--broker', '-b', default=DEFAULT_BROKER, async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None):
help='Broker backend to use') """If no ``brokerd`` daemon-actor can be found spawn one in a
@click.option('--loglevel', '-l', default='warning', help='Logging level') local subactor.
@click.option('--df-output', '-df', flag_value=True,
help='Output in `pandas.DataFrame` format')
@click.argument('symbol', required=True)
def option_chain_quote(loglevel, broker, symbol, df_output):
"""Retreive symbol quotes on the console in either json or dataframe
format.
""" """
brokermod = get_brokermod(broker) async with tractor.open_nursery() as nursery:
get_console_log(loglevel) async with tractor.find_actor('brokerd') as portal:
quotes = trio.run(partial(core.option_chain, brokermod, symbol))[symbol] if not portal:
if not quotes: log.info(
log.error(f"No quotes could be found for {symbol}?") "No broker daemon could be found, spawning brokerd..")
return portal = await nursery.start_actor(
'brokerd',
if df_output: statespace={
cols = next(filter(bool, quotes.values())).copy() 'broker2tickersubs': {},
df = pd.DataFrame( 'clients': {},
(quote.values() for contract, quote in quotes.items()), 'dtasks': set(),
index=quotes.keys(), },
columns=cols.keys(), rpc_module_paths=['piker.brokers.data'],
) loglevel=loglevel,
click.echo(df) )
else: yield portal
click.echo(colorize_json(quotes))
@cli.command() @cli.command()
@click.option('--broker', '-b', default=DEFAULT_BROKER, @click.option('--broker', '-b', default=DEFAULT_BROKER,
help='Broker backend to use') help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--rate', '-r', default=5, help='Logging level') @click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--rate', '-r', default=5, help='Quote rate limit')
@click.option('--test', '-t', help='Test quote stream file')
@click.option('--dhost', '-dh', default='127.0.0.1', @click.option('--dhost', '-dh', default='127.0.0.1',
help='Daemon host address to connect to') help='Daemon host address to connect to')
@click.argument('name', nargs=1, required=True) @click.argument('name', nargs=1, required=True)
def monitor(loglevel, broker, rate, name, dhost): def monitor(loglevel, broker, rate, name, dhost, test, tl):
"""Spawn a real-time watchlist. """Spawn a real-time watchlist.
""" """
from .ui.monitor import _async_main from .ui.monitor import _async_main
@ -167,28 +164,71 @@ def monitor(loglevel, broker, rate, name, dhost):
log.error(f"No symbols found for watchlist `{name}`?") log.error(f"No symbols found for watchlist `{name}`?")
return return
async def launch_client(sleep=0.5, tries=10): async def main(tries):
async with maybe_spawn_brokerd_as_subactor(
tries=tries, loglevel=loglevel
) as portal:
if test:
# stream from a local test file
agen = await portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
# agen = data.stream_from_file(test)
else:
# start live streaming from broker daemon
agen = await portal.run(
"piker.brokers.data", 'start_quote_stream',
broker=brokermod.name, tickers=tickers)
async with tractor.open_nursery() as nursery: # run app "main"
async with tractor.find_actor('brokerd') as portal: await _async_main(
if not portal: name, portal, tickers,
log.warn("No broker daemon could be found") brokermod, rate, agen,
log.warning("Spawning local brokerd..") )
portal = await nursery.start_actor(
'brokerd',
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set(),
},
rpc_module_paths=['piker.brokers.data'],
loglevel=loglevel,
)
# run kivy app tractor.run(
await _async_main(name, portal, tickers, brokermod, rate) partial(main, tries=1),
name='kivy-monitor',
loglevel=loglevel if tl else None,
)
tractor.run(partial(launch_client, tries=1), name='kivy-watchlist')
@cli.command()
@click.option('--broker', '-b', default=DEFAULT_BROKER,
help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--rate', '-r', default=5, help='Logging level')
@click.option('--filename', '-f', default='quotestream.jsonstream',
help='Logging level')
@click.option('--dhost', '-dh', default='127.0.0.1',
help='Daemon host address to connect to')
@click.argument('name', nargs=1, required=True)
def record(loglevel, broker, rate, name, dhost, filename):
"""Record client side quotes to file
"""
log = get_console_log(loglevel) # activate console logging
brokermod = get_brokermod(broker)
watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path)
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
tickers = watchlists[name]
if not tickers:
log.error(f"No symbols found for watchlist `{name}`?")
return
async def main(tries):
async with maybe_spawn_brokerd_as_subactor(
tries=tries, loglevel=loglevel
) as portal:
# run app "main"
return await data.stream_to_file(
name, filename,
portal, tickers,
brokermod, rate,
)
filename = tractor.run(partial(main, tries=1), name='data-feed-recorder')
click.echo(f"Data feed recording saved to {filename}")
@cli.group() @cli.group()
@ -277,3 +317,59 @@ def merge(ctx, watchlist_to_merge):
@click.pass_context @click.pass_context
def dump(ctx, name): def dump(ctx, name):
click.echo(json.dumps(ctx.obj['watchlist'])) click.echo(json.dumps(ctx.obj['watchlist']))
# options utils
@cli.command()
@click.option('--broker', '-b', default=DEFAULT_BROKER,
help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--ids', flag_value=True, help='Include numeric ids in output')
@click.argument('symbol', required=True)
def contracts(loglevel, broker, symbol, ids):
brokermod = get_brokermod(broker)
get_console_log(loglevel)
quotes = trio.run(partial(core.contracts, brokermod, symbol))
if not ids:
# just print out expiry dates which can be used with
# the option_chain_quote cmd
id, contracts = next(iter(quotes.items()))
quotes = list(contracts)
click.echo(colorize_json(quotes))
@cli.command()
@click.option('--broker', '-b', default=DEFAULT_BROKER,
help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--df-output', '-df', flag_value=True,
help='Output in `pandas.DataFrame` format')
@click.option('--date', '-d', help='Contracts expiry date')
@click.argument('symbol', required=True)
def optsquote(loglevel, broker, symbol, df_output, date):
"""Retreive symbol quotes on the console in either
json or dataframe format.
"""
brokermod = get_brokermod(broker)
get_console_log(loglevel)
quotes = trio.run(
partial(
core.option_chain, brokermod, symbol, date
)
)[symbol]
if not quotes:
log.error(f"No quotes could be found for {symbol}?")
return
if df_output:
cols = next(filter(bool, quotes.values())).copy()
df = pd.DataFrame(
(quote.values() for contract, quote in quotes.items()),
index=quotes.keys(),
columns=cols.keys(),
)
click.echo(df)
else:
click.echo(colorize_json(quotes))