diff --git a/piker/cli.py b/piker/cli.py index 3c426d24..3ea0eaac 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -9,9 +9,10 @@ import click import pandas as pd import trio import tractor +from async_generator import asynccontextmanager from . import watchlists as wl -from .brokers import core, get_brokermod +from .brokers import core, get_brokermod, data from .log import get_console_log, colorize_json, get_logger log = get_logger('cli') @@ -23,9 +24,10 @@ _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') -def pikerd(loglevel, host): - """Spawn the piker daemon. +def pikerd(loglevel, host, tl): + """Spawn the piker broker-daemon. """ get_console_log(loglevel) tractor.run_daemon( @@ -36,7 +38,7 @@ def pikerd(loglevel, host): 'dtasks': set(), }, name='brokerd', - loglevel=loglevel, + loglevel=loglevel if tl else None, ) @@ -116,45 +118,40 @@ def quote(loglevel, broker, tickers, df_output): click.echo(colorize_json(quotes)) -@cli.command() -@click.option('--broker', '-b', default=DEFAULT_BROKER, - help='Broker backend to use') -@click.option('--loglevel', '-l', default='warning', help='Logging level') -@click.option('--df-output', '-df', flag_value=True, - help='Output in `pandas.DataFrame` format') -@click.argument('symbol', required=True) -def option_chain_quote(loglevel, broker, symbol, df_output): - """Retreive symbol quotes on the console in either json or dataframe - format. +@asynccontextmanager +async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): + """If no ``brokerd`` daemon-actor can be found spawn one in a + local subactor. """ - brokermod = get_brokermod(broker) - get_console_log(loglevel) - quotes = trio.run(partial(core.option_chain, brokermod, symbol))[symbol] - if not quotes: - log.error(f"No quotes could be found for {symbol}?") - return - - if df_output: - cols = next(filter(bool, quotes.values())).copy() - df = pd.DataFrame( - (quote.values() for contract, quote in quotes.items()), - index=quotes.keys(), - columns=cols.keys(), - ) - click.echo(df) - else: - click.echo(colorize_json(quotes)) + async with tractor.open_nursery() as nursery: + async with tractor.find_actor('brokerd') as portal: + if not portal: + log.info( + "No broker daemon could be found, spawning brokerd..") + portal = await nursery.start_actor( + 'brokerd', + statespace={ + 'broker2tickersubs': {}, + 'clients': {}, + 'dtasks': set(), + }, + rpc_module_paths=['piker.brokers.data'], + loglevel=loglevel, + ) + yield portal @cli.command() @click.option('--broker', '-b', default=DEFAULT_BROKER, help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') -@click.option('--rate', '-r', default=5, help='Logging level') +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--rate', '-r', default=5, help='Quote rate limit') +@click.option('--test', '-t', help='Test quote stream file') @click.option('--dhost', '-dh', default='127.0.0.1', help='Daemon host address to connect to') @click.argument('name', nargs=1, required=True) -def monitor(loglevel, broker, rate, name, dhost): +def monitor(loglevel, broker, rate, name, dhost, test, tl): """Spawn a real-time watchlist. """ from .ui.monitor import _async_main @@ -167,28 +164,71 @@ def monitor(loglevel, broker, rate, name, dhost): log.error(f"No symbols found for watchlist `{name}`?") return - async def launch_client(sleep=0.5, tries=10): + async def main(tries): + async with maybe_spawn_brokerd_as_subactor( + tries=tries, loglevel=loglevel + ) as portal: + if test: + # stream from a local test file + agen = await portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + # agen = data.stream_from_file(test) + else: + # start live streaming from broker daemon + agen = await portal.run( + "piker.brokers.data", 'start_quote_stream', + broker=brokermod.name, tickers=tickers) - async with tractor.open_nursery() as nursery: - async with tractor.find_actor('brokerd') as portal: - if not portal: - log.warn("No broker daemon could be found") - log.warning("Spawning local brokerd..") - portal = await nursery.start_actor( - 'brokerd', - statespace={ - 'broker2tickersubs': {}, - 'clients': {}, - 'dtasks': set(), - }, - rpc_module_paths=['piker.brokers.data'], - loglevel=loglevel, - ) + # run app "main" + await _async_main( + name, portal, tickers, + brokermod, rate, agen, + ) - # run kivy app - await _async_main(name, portal, tickers, brokermod, rate) + tractor.run( + partial(main, tries=1), + name='kivy-monitor', + loglevel=loglevel if tl else None, + ) - tractor.run(partial(launch_client, tries=1), name='kivy-watchlist') + +@cli.command() +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') +@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--rate', '-r', default=5, help='Logging level') +@click.option('--filename', '-f', default='quotestream.jsonstream', + help='Logging level') +@click.option('--dhost', '-dh', default='127.0.0.1', + help='Daemon host address to connect to') +@click.argument('name', nargs=1, required=True) +def record(loglevel, broker, rate, name, dhost, filename): + """Record client side quotes to file + """ + log = get_console_log(loglevel) # activate console logging + brokermod = get_brokermod(broker) + watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) + watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) + tickers = watchlists[name] + if not tickers: + log.error(f"No symbols found for watchlist `{name}`?") + return + + async def main(tries): + async with maybe_spawn_brokerd_as_subactor( + tries=tries, loglevel=loglevel + ) as portal: + # run app "main" + return await data.stream_to_file( + name, filename, + portal, tickers, + brokermod, rate, + ) + + filename = tractor.run(partial(main, tries=1), name='data-feed-recorder') + click.echo(f"Data feed recording saved to {filename}") @cli.group() @@ -277,3 +317,59 @@ def merge(ctx, watchlist_to_merge): @click.pass_context def dump(ctx, name): click.echo(json.dumps(ctx.obj['watchlist'])) + + +# options utils + +@cli.command() +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') +@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--ids', flag_value=True, help='Include numeric ids in output') +@click.argument('symbol', required=True) +def contracts(loglevel, broker, symbol, ids): + brokermod = get_brokermod(broker) + get_console_log(loglevel) + quotes = trio.run(partial(core.contracts, brokermod, symbol)) + if not ids: + # just print out expiry dates which can be used with + # the option_chain_quote cmd + id, contracts = next(iter(quotes.items())) + quotes = list(contracts) + + click.echo(colorize_json(quotes)) + + +@cli.command() +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') +@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--df-output', '-df', flag_value=True, + help='Output in `pandas.DataFrame` format') +@click.option('--date', '-d', help='Contracts expiry date') +@click.argument('symbol', required=True) +def optsquote(loglevel, broker, symbol, df_output, date): + """Retreive symbol quotes on the console in either + json or dataframe format. + """ + brokermod = get_brokermod(broker) + get_console_log(loglevel) + quotes = trio.run( + partial( + core.option_chain, brokermod, symbol, date + ) + )[symbol] + if not quotes: + log.error(f"No quotes could be found for {symbol}?") + return + + if df_output: + cols = next(filter(bool, quotes.values())).copy() + df = pd.DataFrame( + (quote.values() for contract, quote in quotes.items()), + index=quotes.keys(), + columns=cols.keys(), + ) + click.echo(df) + else: + click.echo(colorize_json(quotes))