Allow wl app to spawn a broker daemon in a subprocess

kivy_mainline_and_py3.8
Tyler Goodlet 2018-04-19 01:32:21 -04:00
parent 90e8dd911c
commit 2973b40946
2 changed files with 118 additions and 82 deletions

View File

@ -1,20 +1,23 @@
"""
Console interface to broker client/daemons.
"""
from collections import defaultdict
from functools import partial
from importlib import import_module
import os
from collections import defaultdict
from multiprocessing import Process
import json
import os
import signal
import time
import click
import trio
import pandas as pd
import trio
from .log import get_console_log, colorize_json, get_logger
from . import watchlists as wl
from .brokers import core, get_brokermod
from .brokers.core import _daemon_main
from .log import get_console_log, colorize_json, get_logger
log = get_logger('cli')
DEFAULT_BROKER = 'robinhood'
@ -36,15 +39,11 @@ def run(main, loglevel='info'):
@click.command()
@click.option('--broker', '-b', default=DEFAULT_BROKER,
help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level')
def pikerd(broker, loglevel):
def pikerd(loglevel):
"""Spawn the piker daemon.
"""
from piker.brokers.core import _daemon_main
brokermod = get_brokermod(broker)
run(partial(_daemon_main, brokermod), loglevel)
run(_daemon_main, loglevel)
@click.group()
@ -134,7 +133,53 @@ def watch(loglevel, broker, rate, name):
brokermod = get_brokermod(broker)
watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path)
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
trio.run(_async_main, name, watchlists[name], brokermod, rate)
tickers = watchlists[name]
# setup ticker stream
from .brokers.core import Client
async def main(timeout=1):
async def subscribe(client):
# initial request for symbols price streams
await client.send((brokermod.name, tickers))
client = Client(('127.0.0.1', 1616), subscribe)
start = time.time()
while True:
try:
await client.connect()
break
except OSError as oserr:
log.info("Waiting on daemon to come up...")
await trio.sleep(0.1)
if time.time() - start > timeout:
raise
continue
async with trio.open_nursery() as nursery:
nursery.start_soon(
_async_main, name, client, tickers,
brokermod, rate
)
try:
trio.run(main)
except OSError as oserr:
log.exception(oserr)
answer = input(
"\nWould you like to spawn a broker daemon locally? [Y/n]")
if answer is not 'n':
child = Process(
target=run,
args=(_daemon_main, loglevel),
daemon=True,
)
child.daemon = True
child.start()
trio.run(main, 5)
# trio dies with a keyboard interrupt
os.kill(child.pid, signal.SIGINT)
child.join()
@cli.group()

View File

@ -382,20 +382,11 @@ async def run_kivy(root, nursery):
nursery.cancel_scope.cancel() # cancel all other tasks that may be running
async def _async_main(name, tickers, brokermod, rate):
async def _async_main(name, client, tickers, brokermod, rate):
'''Launch kivy app + all other related tasks.
This is started with cli command `piker watch`.
'''
# setup ticker stream
from ..brokers.core import Client
async def subscribe(client):
# initial request for symbols price streams
await client.send((brokermod.name, tickers))
async with Client(('127.0.0.1', 1616), subscribe) as client:
# get initial symbol data
async with brokermod.get_client() as bclient:
# get long term data including last days close price