From 3d6b14ec3f867927a8fbc81f64fcb7d819894064 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 15:07:13 -0400 Subject: [PATCH 1/9] Pass in the host addr --- piker/brokers/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6d62d14a..2ec91fc7 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -376,7 +376,7 @@ async def start_quoter( await cntxmng.__aexit__(None, None, None) -async def _daemon_main() -> None: +async def _daemon_main(host) -> None: """Entry point for the broker daemon which waits for connections before spawning micro-services. """ @@ -393,7 +393,7 @@ async def _daemon_main() -> None: start_quoter, broker2tickersubs, clients, dtasks, nursery ), - 1616, host='127.0.0.1' + 1616, host=host, ) ) log.debug(f"Spawned {listeners}") From 5a9c079c10536c3de4c76654cab67bbb9c2340ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 15:10:10 -0400 Subject: [PATCH 2/9] Support specifying daemon host address --- piker/cli.py | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index cb637f71..95d429cf 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -23,23 +23,17 @@ _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') def run(main, loglevel='info'): - log = get_console_log(loglevel) - - # main sandwich - try: - return trio.run(main) - except Exception as err: - log.exception(err) - finally: - log.debug("Exiting piker") + get_console_log(loglevel) + return trio.run(main) @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') -def pikerd(loglevel): +@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') +def pikerd(loglevel, host): """Spawn the piker daemon. """ - run(_daemon_main, loglevel) + run(partial(_daemon_main, host), loglevel) @click.group() @@ -120,8 +114,10 @@ def quote(loglevel, broker, tickers, df_output): help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--rate', '-r', default=5, help='Logging level') +@click.option('--dhost', '-dh', default='127.0.0.1', + help='Daemon host address to connect to') @click.argument('name', nargs=1, required=True) -def watch(loglevel, broker, rate, name): +def watch(loglevel, broker, rate, name, dhost): """Spawn a real-time watchlist. """ from .ui.watchlist import _async_main @@ -131,17 +127,20 @@ def watch(loglevel, broker, rate, name): watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) tickers = watchlists[name] - async def main(timeout=1): + async def launch_client(sleep=0.5, tries=10): async def subscribe(client): # initial request for symbols price streams await client.send((brokermod.name, tickers)) - client = Client(('127.0.0.1', 1616), subscribe) - try: - await client.connect() - except OSError as oserr: - await trio.sleep(0.5) + client = Client((dhost, 1616), subscribe) + for _ in range(tries): # try for 5 seconds + try: + await client.connect() + break + except OSError as oserr: + await trio.sleep(sleep) + else: # will raise indicating child proc should be spawned await client.connect() @@ -155,18 +154,18 @@ def watch(loglevel, broker, rate, name): await client.aclose() try: - trio.run(main) + trio.run(partial(launch_client, tries=1)) except OSError as oserr: log.warn("No broker daemon could be found") log.warn(oserr) log.warning("Spawning local broker-daemon...") child = Process( target=run, - args=(_daemon_main, loglevel), + args=(partial(_daemon_main, dhost), loglevel), daemon=True, ) child.start() - trio.run(main, 5) + trio.run(launch_client, 5) child.join() From fcaeeae6180f269976bc5f82cc93f85a41757ed3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 15:39:57 -0400 Subject: [PATCH 3/9] Acquire symbol data with daemon; push as first response --- piker/brokers/core.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 2ec91fc7..e521477e 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -325,13 +325,19 @@ async def start_quoter( log.warn(f"Retrieving smoke quote for {queue.peer}") quotes = await get_quotes(tickers) # pop any tickers that aren't returned in the first quote - tickers = set(tickers) - set(quotes) - for ticker in tickers: + valid_tickers = set(tickers) - set(quotes) + for ticker in valid_tickers: log.warn( f"Symbol `{ticker}` not found by broker `{brokermod.name}`" ) tickers2qs.pop(ticker, None) + # first respond with symbol data for all tickers (allows + # clients to receive broker specific setup info) + sd = await client.symbol_data(tickers) + assert sd, "No symbol data could be found?" + await queue.put(sd) + # pop any tickers that return "empty" quotes payload = {} for symbol, quote in quotes.items(): From 995851360d9b1875071a863faedcb82ea8680457 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 15:40:24 -0400 Subject: [PATCH 4/9] Rx symbol data from daemon as first response --- piker/ui/watchlist.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index a0944ffb..b94cc9fb 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -387,10 +387,9 @@ async def _async_main(name, client, tickers, brokermod, rate): This is started with cli command `piker watch`. ''' - # get initial symbol data - async with brokermod.get_client() as bclient: - # get long term data including last days close price - sd = await bclient.symbol_data(tickers) + # get initial symbol data (long term data including last days close price) + # TODO: need something better this this toy protocol + sd = await client.recv() async with trio.open_nursery() as nursery: # get first quotes response From 3a40c2f8feed7011fda121fb9497416ab4f8d6bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 15:47:48 -0400 Subject: [PATCH 5/9] Zero bad fields --- piker/calc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/calc.py b/piker/calc.py index 0ed811fc..c4847696 100644 --- a/piker/calc.py +++ b/piker/calc.py @@ -27,4 +27,6 @@ def percent_change(init, new): """Calcuate the percentage change of some ``new`` value from some initial value, ``init``. """ + if not (init and new): + return 0 return (new - init) / init * 100. From 3646fb4a233afc6f8ba19f4c1221b1df16698414 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 22:48:06 -0400 Subject: [PATCH 6/9] Filter out bad symbols before adding client subscription Event if a broker client is already spawned new clients should still receive a detailed symbol data packet as the first response. Avoid exposing the new client's queue to the broker (i.e. subscribing it for quotes) until after first pushing this packet with all bad symbols filtered out. --- piker/brokers/core.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e521477e..90b9420c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -306,15 +306,11 @@ async def start_quoter( get_quotes = await brokermod.quoter(client, tickers) clients[broker] = ( brokermod, client, client_cntxmng, get_quotes) - tickers2qs = broker2tickersubs.setdefault( - broker, {}.fromkeys(tickers, {queue, })) + tickers2qs = broker2tickersubs.setdefault(broker, {}) else: log.info(f"Subscribing with existing `{broker}` daemon") brokermod, client, _, get_quotes = clients[broker] tickers2qs = broker2tickersubs[broker] - # update map from each symbol to requesting client's queue - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) # beginning of section to be trimmed out with #37 ################################################# @@ -324,19 +320,13 @@ async def start_quoter( # since the new client needs to know what symbols are accepted log.warn(f"Retrieving smoke quote for {queue.peer}") quotes = await get_quotes(tickers) - # pop any tickers that aren't returned in the first quote - valid_tickers = set(tickers) - set(quotes) - for ticker in valid_tickers: + # report any tickers that aren't returned in the first quote + invalid_tickers = set(tickers) - set(quotes) + for symbol in invalid_tickers: + tickers.remove(symbol) log.warn( - f"Symbol `{ticker}` not found by broker `{brokermod.name}`" + f"Symbol `{symbol}` not found by broker `{brokermod.name}`" ) - tickers2qs.pop(ticker, None) - - # first respond with symbol data for all tickers (allows - # clients to receive broker specific setup info) - sd = await client.symbol_data(tickers) - assert sd, "No symbol data could be found?" - await queue.put(sd) # pop any tickers that return "empty" quotes payload = {} @@ -345,16 +335,26 @@ async def start_quoter( log.warn( f"Symbol `{symbol}` not found by broker" f" `{brokermod.name}`") - tickers2qs.pop(symbol, None) + tickers.remove(symbol) continue payload[symbol] = quote - # push initial quotes response for client initialization - await queue.put(payload) - # end of section to be trimmed out with #37 ########################################### + # first respond with symbol data for all tickers (allows + # clients to receive broker specific setup info) + sd = await client.symbol_data(tickers) + assert sd, "No symbol data could be found?" + await queue.put(sd) + + # update map from each symbol to requesting client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + + # push initial quotes response for client initialization + await queue.put(payload) + if broker not in dtasks: # no quoter task yet # task should begin on the next checkpoint/iteration log.info(f"Spawning quoter task for {brokermod.name}") From fd1fe0816e6b25de23506a1e0d9e390dcc6099a9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 23:50:24 -0400 Subject: [PATCH 7/9] Don't call formatting func on None values --- piker/brokers/questrade.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 3f0ebbd7..feea6396 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -414,7 +414,8 @@ def format_quote( # convert values to a displayble format using available formatting func if isinstance(new_key, tuple): new_key, func = new_key - display_value = func(value) + display_value = func(value) if value else value + new[new_key] = value displayable[new_key] = display_value From bcaef70612ed66fd66919eb5080dcc226aec5b52 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 23:52:35 -0400 Subject: [PATCH 8/9] Pack null results without raising --- piker/brokers/robinhood.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 5a220251..bd70c108 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -8,7 +8,7 @@ from async_generator import asynccontextmanager import asks from ..log import get_logger -from ._util import resproc +from ._util import resproc, BrokerError from ..calc import percent_change asks.init('trio') @@ -51,10 +51,12 @@ class Client: async def quote(self, symbols: [str]): """Retrieve quotes for a list of ``symbols``. """ - return self._zip_in_order( - symbols, - (await self.api.quotes(','.join(symbols)))['results'] - ) + try: + resp = await self.api.quotes(','.join(symbols)) + except BrokerError: + resp = {'results': [None] * len(symbols)} + + return self._zip_in_order(symbols, resp['results']) async def symbol_data(self, symbols: [str]): """Retrieve symbol data via the ``fundmentals`` endpoint. From 9b34aac0fd6bbf64bacd3c767000fb812d1323ab Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 23:53:20 -0400 Subject: [PATCH 9/9] Build columns only for dataframe output --- piker/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index 95d429cf..0ae1f5ce 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -96,9 +96,9 @@ def quote(loglevel, broker, tickers, df_output): log.error(f"No quotes could be found for {tickers}?") return - cols = next(filter(bool, quotes.values())).copy() - cols.pop('symbol') if df_output: + cols = next(filter(bool, quotes.values())).copy() + cols.pop('symbol') df = pd.DataFrame( (quote or {} for quote in quotes.values()), index=quotes.keys(),