diff --git a/piker/cli.py b/piker/cli.py index 963ebe5c..13c7439e 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -2,7 +2,6 @@ Console interface to broker client/daemons. """ from functools import partial -from multiprocessing import Process import json import os @@ -12,9 +11,8 @@ import trio from . import watchlists as wl from .brokers import core, get_brokermod -from .brokers.core import _brokerd_main -from .ipc import Channel from .log import get_console_log, colorize_json, get_logger +from . import tractor log = get_logger('cli') DEFAULT_BROKER = 'robinhood' @@ -23,18 +21,24 @@ _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') -def run(main, loglevel='info'): - get_console_log(loglevel) - return trio.run(main) - - @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') def pikerd(loglevel, host): """Spawn the piker daemon. """ - run(partial(_brokerd_main, host), loglevel) + get_console_log(loglevel) + tractor.run( + None, # no main task + statespace={ + 'boker2tickersubs': {}, + 'clients': {}, + 'dtasks': set(), + }, + outlive_main=True, # run daemon forever + rpc_module_paths=['piker.broker.core'], + name='brokerd', + ) @click.group() @@ -53,7 +57,7 @@ def cli(): def api(meth, kwargs, loglevel, broker, keys): """client for testing broker API methods with pretty printing of output. """ - log = get_console_log(loglevel) + get_console_log(loglevel) brokermod = get_brokermod(broker) _kwargs = {} @@ -64,8 +68,9 @@ def api(meth, kwargs, loglevel, broker, keys): key, _, value = kwarg.partition('=') _kwargs[key] = value - data = run( - partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) + data = trio.run( + partial(core.api, brokermod, meth, **_kwargs) + ) if keys: # filter to requested keys @@ -89,10 +94,12 @@ def api(meth, kwargs, loglevel, broker, keys): help='Ouput in `pandas.DataFrame` format') @click.argument('tickers', nargs=-1, required=True) def quote(loglevel, broker, tickers, df_output): - """client for testing broker API methods with pretty printing of output. + """Retreive symbol quotes on the console in either json or dataframe + format. """ brokermod = get_brokermod(broker) - quotes = run(partial(core.quote, brokermod, tickers), loglevel=loglevel) + get_console_log(loglevel) + quotes = trio.run(partial(core.quote, brokermod, tickers)) if not quotes: log.error(f"No quotes could be found for {tickers}?") return @@ -133,48 +140,30 @@ def watch(loglevel, broker, rate, name, dhost): async def launch_client(sleep=0.5, tries=10): - async def subscribe(channel): - # initial subs request for symbols - await channel.send((brokermod.name, tickers)) - # symbol data is returned in first response which we'll - # ignore on reconnect - await channel.recv() + async with tractor.find_actor('brokerd') as portals: + async with tractor.open_nursery() as nursery: + if not portals: + log.warn("No broker daemon could be found") + log.warning("Spawning local brokerd..") + portal = await nursery.start_actor( + 'brokerd', + main=None, # no main task + statespace={ + 'broker2tickersubs': {}, + 'clients': {}, + 'dtasks': set(), + }, + outlive_main=True, # run daemon forever + rpc_module_paths=['piker.brokers.core'], + loglevel=loglevel, + ) + else: + portal = portals[0] - channel = Channel((dhost, 1616), on_reconnect=subscribe) - for _ in range(tries): # try for 5 seconds - try: - await channel.connect() - break - except OSError as oserr: - await trio.sleep(sleep) - else: - # will raise indicating child proc should be spawned - await channel.connect() + # run kivy app + await _async_main(name, portal, tickers, brokermod, rate) - async with trio.open_nursery() as nursery: - nursery.start_soon( - _async_main, name, channel, tickers, - brokermod, rate - ) - - # signal exit of stream handler task - await channel.aclose() - - try: - trio.run(partial(launch_client, tries=1)) - except OSError as oserr: - log.warn("No broker daemon could be found") - log.warn(oserr) - log.warning("Spawning local broker-daemon...") - child = Process( - target=run, - args=(partial(_brokerd_main, dhost), loglevel), - daemon=True, - name='pikerd', - ) - child.start() - trio.run(partial(launch_client, tries=5)) - child.join() + tractor.run(partial(launch_client, tries=1)) @cli.group()