diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6d62d14a..90b9420c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -306,15 +306,11 @@ async def start_quoter( get_quotes = await brokermod.quoter(client, tickers) clients[broker] = ( brokermod, client, client_cntxmng, get_quotes) - tickers2qs = broker2tickersubs.setdefault( - broker, {}.fromkeys(tickers, {queue, })) + tickers2qs = broker2tickersubs.setdefault(broker, {}) else: log.info(f"Subscribing with existing `{broker}` daemon") brokermod, client, _, get_quotes = clients[broker] tickers2qs = broker2tickersubs[broker] - # update map from each symbol to requesting client's queue - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) # beginning of section to be trimmed out with #37 ################################################# @@ -324,13 +320,13 @@ async def start_quoter( # since the new client needs to know what symbols are accepted log.warn(f"Retrieving smoke quote for {queue.peer}") quotes = await get_quotes(tickers) - # pop any tickers that aren't returned in the first quote - tickers = set(tickers) - set(quotes) - for ticker in tickers: + # report any tickers that aren't returned in the first quote + invalid_tickers = set(tickers) - set(quotes) + for symbol in invalid_tickers: + tickers.remove(symbol) log.warn( - f"Symbol `{ticker}` not found by broker `{brokermod.name}`" + f"Symbol `{symbol}` not found by broker `{brokermod.name}`" ) - tickers2qs.pop(ticker, None) # pop any tickers that return "empty" quotes payload = {} @@ -339,16 +335,26 @@ async def start_quoter( log.warn( f"Symbol `{symbol}` not found by broker" f" `{brokermod.name}`") - tickers2qs.pop(symbol, None) + tickers.remove(symbol) continue payload[symbol] = quote - # push initial quotes response for client initialization - await queue.put(payload) - # end of section to be trimmed out with #37 ########################################### + # first respond with symbol data for all tickers (allows + # clients to receive broker specific setup info) + sd = await client.symbol_data(tickers) + assert sd, "No symbol data could be found?" + await queue.put(sd) + + # update map from each symbol to requesting client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + + # push initial quotes response for client initialization + await queue.put(payload) + if broker not in dtasks: # no quoter task yet # task should begin on the next checkpoint/iteration log.info(f"Spawning quoter task for {brokermod.name}") @@ -376,7 +382,7 @@ async def start_quoter( await cntxmng.__aexit__(None, None, None) -async def _daemon_main() -> None: +async def _daemon_main(host) -> None: """Entry point for the broker daemon which waits for connections before spawning micro-services. """ @@ -393,7 +399,7 @@ async def _daemon_main() -> None: start_quoter, broker2tickersubs, clients, dtasks, nursery ), - 1616, host='127.0.0.1' + 1616, host=host, ) ) log.debug(f"Spawned {listeners}") diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 3f0ebbd7..feea6396 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -414,7 +414,8 @@ def format_quote( # convert values to a displayble format using available formatting func if isinstance(new_key, tuple): new_key, func = new_key - display_value = func(value) + display_value = func(value) if value else value + new[new_key] = value displayable[new_key] = display_value diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 5a220251..bd70c108 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -8,7 +8,7 @@ from async_generator import asynccontextmanager import asks from ..log import get_logger -from ._util import resproc +from ._util import resproc, BrokerError from ..calc import percent_change asks.init('trio') @@ -51,10 +51,12 @@ class Client: async def quote(self, symbols: [str]): """Retrieve quotes for a list of ``symbols``. """ - return self._zip_in_order( - symbols, - (await self.api.quotes(','.join(symbols)))['results'] - ) + try: + resp = await self.api.quotes(','.join(symbols)) + except BrokerError: + resp = {'results': [None] * len(symbols)} + + return self._zip_in_order(symbols, resp['results']) async def symbol_data(self, symbols: [str]): """Retrieve symbol data via the ``fundmentals`` endpoint. diff --git a/piker/calc.py b/piker/calc.py index 0ed811fc..c4847696 100644 --- a/piker/calc.py +++ b/piker/calc.py @@ -27,4 +27,6 @@ def percent_change(init, new): """Calcuate the percentage change of some ``new`` value from some initial value, ``init``. """ + if not (init and new): + return 0 return (new - init) / init * 100. diff --git a/piker/cli.py b/piker/cli.py index cb637f71..0ae1f5ce 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -23,23 +23,17 @@ _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') def run(main, loglevel='info'): - log = get_console_log(loglevel) - - # main sandwich - try: - return trio.run(main) - except Exception as err: - log.exception(err) - finally: - log.debug("Exiting piker") + get_console_log(loglevel) + return trio.run(main) @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') -def pikerd(loglevel): +@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') +def pikerd(loglevel, host): """Spawn the piker daemon. """ - run(_daemon_main, loglevel) + run(partial(_daemon_main, host), loglevel) @click.group() @@ -102,9 +96,9 @@ def quote(loglevel, broker, tickers, df_output): log.error(f"No quotes could be found for {tickers}?") return - cols = next(filter(bool, quotes.values())).copy() - cols.pop('symbol') if df_output: + cols = next(filter(bool, quotes.values())).copy() + cols.pop('symbol') df = pd.DataFrame( (quote or {} for quote in quotes.values()), index=quotes.keys(), @@ -120,8 +114,10 @@ def quote(loglevel, broker, tickers, df_output): help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--rate', '-r', default=5, help='Logging level') +@click.option('--dhost', '-dh', default='127.0.0.1', + help='Daemon host address to connect to') @click.argument('name', nargs=1, required=True) -def watch(loglevel, broker, rate, name): +def watch(loglevel, broker, rate, name, dhost): """Spawn a real-time watchlist. """ from .ui.watchlist import _async_main @@ -131,17 +127,20 @@ def watch(loglevel, broker, rate, name): watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) tickers = watchlists[name] - async def main(timeout=1): + async def launch_client(sleep=0.5, tries=10): async def subscribe(client): # initial request for symbols price streams await client.send((brokermod.name, tickers)) - client = Client(('127.0.0.1', 1616), subscribe) - try: - await client.connect() - except OSError as oserr: - await trio.sleep(0.5) + client = Client((dhost, 1616), subscribe) + for _ in range(tries): # try for 5 seconds + try: + await client.connect() + break + except OSError as oserr: + await trio.sleep(sleep) + else: # will raise indicating child proc should be spawned await client.connect() @@ -155,18 +154,18 @@ def watch(loglevel, broker, rate, name): await client.aclose() try: - trio.run(main) + trio.run(partial(launch_client, tries=1)) except OSError as oserr: log.warn("No broker daemon could be found") log.warn(oserr) log.warning("Spawning local broker-daemon...") child = Process( target=run, - args=(_daemon_main, loglevel), + args=(partial(_daemon_main, dhost), loglevel), daemon=True, ) child.start() - trio.run(main, 5) + trio.run(launch_client, 5) child.join() diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index a0944ffb..b94cc9fb 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -387,10 +387,9 @@ async def _async_main(name, client, tickers, brokermod, rate): This is started with cli command `piker watch`. ''' - # get initial symbol data - async with brokermod.get_client() as bclient: - # get long term data including last days close price - sd = await bclient.symbol_data(tickers) + # get initial symbol data (long term data including last days close price) + # TODO: need something better this this toy protocol + sd = await client.recv() async with trio.open_nursery() as nursery: # get first quotes response