Try to connect to daemon once on startup; don't poll
parent
a2c4f0c80b
commit
482f9531ca
27
piker/cli.py
27
piker/cli.py
|
@ -1,14 +1,10 @@
|
||||||
"""
|
"""
|
||||||
Console interface to broker client/daemons.
|
Console interface to broker client/daemons.
|
||||||
"""
|
"""
|
||||||
from collections import defaultdict
|
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from importlib import import_module
|
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import signal
|
|
||||||
import time
|
|
||||||
|
|
||||||
import click
|
import click
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
@ -16,7 +12,7 @@ import trio
|
||||||
|
|
||||||
from . import watchlists as wl
|
from . import watchlists as wl
|
||||||
from .brokers import core, get_brokermod
|
from .brokers import core, get_brokermod
|
||||||
from .brokers.core import _daemon_main
|
from .brokers.core import _daemon_main, Client
|
||||||
from .log import get_console_log, colorize_json, get_logger
|
from .log import get_console_log, colorize_json, get_logger
|
||||||
|
|
||||||
log = get_logger('cli')
|
log = get_logger('cli')
|
||||||
|
@ -135,29 +131,19 @@ def watch(loglevel, broker, rate, name):
|
||||||
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
|
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
|
||||||
tickers = watchlists[name]
|
tickers = watchlists[name]
|
||||||
|
|
||||||
# setup ticker stream
|
|
||||||
from .brokers.core import Client
|
|
||||||
|
|
||||||
async def main(timeout=1):
|
async def main(timeout=1):
|
||||||
|
|
||||||
async def subscribe(client):
|
async def subscribe(client):
|
||||||
# initial request for symbols price streams
|
# initial request for symbols price streams
|
||||||
await client.send((brokermod.name, tickers))
|
await client.send((brokermod.name, tickers))
|
||||||
|
|
||||||
client = Client(('127.0.0.1', 1616), subscribe)
|
client = Client(('127.0.0.1', 1616), subscribe)
|
||||||
start = time.time()
|
|
||||||
down = False
|
|
||||||
while True:
|
|
||||||
try:
|
try:
|
||||||
await client.connect()
|
await client.connect()
|
||||||
break
|
|
||||||
except OSError as oserr:
|
except OSError as oserr:
|
||||||
if not down:
|
await trio.sleep(0.5)
|
||||||
log.info("Waiting on daemon to come up...")
|
# will raise indicating child proc should be spawned
|
||||||
down = True
|
await client.connect()
|
||||||
await trio.sleep(0.1)
|
|
||||||
if time.time() - start > timeout:
|
|
||||||
raise
|
|
||||||
continue
|
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
|
@ -171,8 +157,9 @@ def watch(loglevel, broker, rate, name):
|
||||||
try:
|
try:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
except OSError as oserr:
|
except OSError as oserr:
|
||||||
|
log.warn("No broker daemon could be found")
|
||||||
log.warn(oserr)
|
log.warn(oserr)
|
||||||
log.info("Spawning local broker-daemon...")
|
log.warning("Spawning local broker-daemon...")
|
||||||
child = Process(
|
child = Process(
|
||||||
target=run,
|
target=run,
|
||||||
args=(_daemon_main, loglevel),
|
args=(_daemon_main, loglevel),
|
||||||
|
|
Loading…
Reference in New Issue