Port `piker watch` to tractor api

kivy_mainline_and_py3.8
Tyler Goodlet 2018-06-27 11:52:56 -04:00
parent e22f17bfe9
commit 4ecfcdc354
1 changed files with 43 additions and 54 deletions

View File

@ -2,7 +2,6 @@
Console interface to broker client/daemons. Console interface to broker client/daemons.
""" """
from functools import partial from functools import partial
from multiprocessing import Process
import json import json
import os import os
@ -12,9 +11,8 @@ import trio
from . import watchlists as wl from . import watchlists as wl
from .brokers import core, get_brokermod from .brokers import core, get_brokermod
from .brokers.core import _brokerd_main
from .ipc import Channel
from .log import get_console_log, colorize_json, get_logger from .log import get_console_log, colorize_json, get_logger
from . import tractor
log = get_logger('cli') log = get_logger('cli')
DEFAULT_BROKER = 'robinhood' DEFAULT_BROKER = 'robinhood'
@ -23,18 +21,24 @@ _config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
def run(main, loglevel='info'):
get_console_log(loglevel)
return trio.run(main)
@click.command() @click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host): def pikerd(loglevel, host):
"""Spawn the piker daemon. """Spawn the piker daemon.
""" """
run(partial(_brokerd_main, host), loglevel) get_console_log(loglevel)
tractor.run(
None, # no main task
statespace={
'boker2tickersubs': {},
'clients': {},
'dtasks': set(),
},
outlive_main=True, # run daemon forever
rpc_module_paths=['piker.broker.core'],
name='brokerd',
)
@click.group() @click.group()
@ -53,7 +57,7 @@ def cli():
def api(meth, kwargs, loglevel, broker, keys): def api(meth, kwargs, loglevel, broker, keys):
"""client for testing broker API methods with pretty printing of output. """client for testing broker API methods with pretty printing of output.
""" """
log = get_console_log(loglevel) get_console_log(loglevel)
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
_kwargs = {} _kwargs = {}
@ -64,8 +68,9 @@ def api(meth, kwargs, loglevel, broker, keys):
key, _, value = kwarg.partition('=') key, _, value = kwarg.partition('=')
_kwargs[key] = value _kwargs[key] = value
data = run( data = trio.run(
partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) partial(core.api, brokermod, meth, **_kwargs)
)
if keys: if keys:
# filter to requested keys # filter to requested keys
@ -89,10 +94,12 @@ def api(meth, kwargs, loglevel, broker, keys):
help='Ouput in `pandas.DataFrame` format') help='Ouput in `pandas.DataFrame` format')
@click.argument('tickers', nargs=-1, required=True) @click.argument('tickers', nargs=-1, required=True)
def quote(loglevel, broker, tickers, df_output): def quote(loglevel, broker, tickers, df_output):
"""client for testing broker API methods with pretty printing of output. """Retreive symbol quotes on the console in either json or dataframe
format.
""" """
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
quotes = run(partial(core.quote, brokermod, tickers), loglevel=loglevel) get_console_log(loglevel)
quotes = trio.run(partial(core.quote, brokermod, tickers))
if not quotes: if not quotes:
log.error(f"No quotes could be found for {tickers}?") log.error(f"No quotes could be found for {tickers}?")
return return
@ -133,48 +140,30 @@ def watch(loglevel, broker, rate, name, dhost):
async def launch_client(sleep=0.5, tries=10): async def launch_client(sleep=0.5, tries=10):
async def subscribe(channel): async with tractor.find_actor('brokerd') as portals:
# initial subs request for symbols async with tractor.open_nursery() as nursery:
await channel.send((brokermod.name, tickers)) if not portals:
# symbol data is returned in first response which we'll log.warn("No broker daemon could be found")
# ignore on reconnect log.warning("Spawning local brokerd..")
await channel.recv() portal = await nursery.start_actor(
'brokerd',
main=None, # no main task
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set(),
},
outlive_main=True, # run daemon forever
rpc_module_paths=['piker.brokers.core'],
loglevel=loglevel,
)
else:
portal = portals[0]
channel = Channel((dhost, 1616), on_reconnect=subscribe) # run kivy app
for _ in range(tries): # try for 5 seconds await _async_main(name, portal, tickers, brokermod, rate)
try:
await channel.connect()
break
except OSError as oserr:
await trio.sleep(sleep)
else:
# will raise indicating child proc should be spawned
await channel.connect()
async with trio.open_nursery() as nursery: tractor.run(partial(launch_client, tries=1))
nursery.start_soon(
_async_main, name, channel, tickers,
brokermod, rate
)
# signal exit of stream handler task
await channel.aclose()
try:
trio.run(partial(launch_client, tries=1))
except OSError as oserr:
log.warn("No broker daemon could be found")
log.warn(oserr)
log.warning("Spawning local broker-daemon...")
child = Process(
target=run,
args=(partial(_brokerd_main, dhost), loglevel),
daemon=True,
name='pikerd',
)
child.start()
trio.run(partial(launch_client, tries=5))
child.join()
@cli.group() @cli.group()