From 2973b40946d165922312aa5f45818ef41801c5ba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Apr 2018 01:32:21 -0400 Subject: [PATCH] Allow wl app to spawn a broker daemon in a subprocess --- piker/cli.py | 69 ++++++++++++++++++---- piker/ui/watchlist.py | 131 ++++++++++++++++++++---------------------- 2 files changed, 118 insertions(+), 82 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index a8f9b924..906a89c8 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -1,20 +1,23 @@ """ Console interface to broker client/daemons. """ +from collections import defaultdict from functools import partial from importlib import import_module -import os -from collections import defaultdict +from multiprocessing import Process import json +import os +import signal +import time import click -import trio import pandas as pd +import trio - -from .log import get_console_log, colorize_json, get_logger from . import watchlists as wl from .brokers import core, get_brokermod +from .brokers.core import _daemon_main +from .log import get_console_log, colorize_json, get_logger log = get_logger('cli') DEFAULT_BROKER = 'robinhood' @@ -36,15 +39,11 @@ def run(main, loglevel='info'): @click.command() -@click.option('--broker', '-b', default=DEFAULT_BROKER, - help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') -def pikerd(broker, loglevel): +def pikerd(loglevel): """Spawn the piker daemon. """ - from piker.brokers.core import _daemon_main - brokermod = get_brokermod(broker) - run(partial(_daemon_main, brokermod), loglevel) + run(_daemon_main, loglevel) @click.group() @@ -134,7 +133,53 @@ def watch(loglevel, broker, rate, name): brokermod = get_brokermod(broker) watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - trio.run(_async_main, name, watchlists[name], brokermod, rate) + tickers = watchlists[name] + + # setup ticker stream + from .brokers.core import Client + + async def main(timeout=1): + async def subscribe(client): + # initial request for symbols price streams + await client.send((brokermod.name, tickers)) + + client = Client(('127.0.0.1', 1616), subscribe) + start = time.time() + while True: + try: + await client.connect() + break + except OSError as oserr: + log.info("Waiting on daemon to come up...") + await trio.sleep(0.1) + if time.time() - start > timeout: + raise + continue + + async with trio.open_nursery() as nursery: + nursery.start_soon( + _async_main, name, client, tickers, + brokermod, rate + ) + + try: + trio.run(main) + except OSError as oserr: + log.exception(oserr) + answer = input( + "\nWould you like to spawn a broker daemon locally? [Y/n]") + if answer is not 'n': + child = Process( + target=run, + args=(_daemon_main, loglevel), + daemon=True, + ) + child.daemon = True + child.start() + trio.run(main, 5) + # trio dies with a keyboard interrupt + os.kill(child.pid, signal.SIGINT) + child.join() @cli.group() diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 69e6e350..be21dd85 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -382,86 +382,77 @@ async def run_kivy(root, nursery): nursery.cancel_scope.cancel() # cancel all other tasks that may be running -async def _async_main(name, tickers, brokermod, rate): +async def _async_main(name, client, tickers, brokermod, rate): '''Launch kivy app + all other related tasks. This is started with cli command `piker watch`. ''' - # setup ticker stream - from ..brokers.core import Client + # get initial symbol data + async with brokermod.get_client() as bclient: + # get long term data including last days close price + sd = await bclient.symbol_data(tickers) - async def subscribe(client): - # initial request for symbols price streams - await client.send((brokermod.name, tickers)) + async with trio.open_nursery() as nursery: + # get first quotes response + quotes = await client.recv() + first_quotes = [ + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - async with Client(('127.0.0.1', 1616), subscribe) as client: + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + nursery.cancel_scope.cancel() + return - # get initial symbol data - async with brokermod.get_client() as bclient: - # get long term data including last days close price - sd = await bclient.symbol_data(tickers) + # build out UI + Window.set_title(f"watchlist: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', padding=5, spacing=5) - async with trio.open_nursery() as nursery: - # get first quotes response - quotes = await client.recv() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header_row=True, + size_hint=(1, None), + ) + box.add_widget(header) - # build out UI - Window.set_title(f"watchlist: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', padding=5, spacing=5) + # build grid + grid = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + grid.append_row(ticker_record, bidasks=bidasks) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = grid - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(grid.sort_key) + sort_cell.bold = sort_cell.underline = True + grid.last_clicked_col_cell = sort_cell - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header_row=True, - size_hint=(1, None), - ) - box.add_widget(header) + # set up a pager view for large ticker lists + grid.bind(minimum_height=grid.setter('height')) + pager = PagerView(box, grid, nursery) + box.add_widget(pager) - # build grid - grid = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - grid.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = grid - - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(grid.sort_key) - sort_cell.bold = sort_cell.underline = True - grid.last_clicked_col_cell = sort_cell - - # set up a pager view for large ticker lists - grid.bind(minimum_height=grid.setter('height')) - pager = PagerView(box, grid, nursery) - box.add_widget(pager) - - widgets = { - # 'anchor': anchor, - 'root': box, - 'grid': grid, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon( - update_quotes, nursery, brokermod, widgets, client, sd, quotes) + widgets = { + # 'anchor': anchor, + 'root': box, + 'grid': grid, + 'box': box, + 'header': header, + 'pager': pager, + } + nursery.start_soon(run_kivy, widgets['root'], nursery) + nursery.start_soon( + update_quotes, nursery, brokermod, widgets, client, sd, quotes)