Use `Channel` throughout cli entry point
parent
f71f986dae
commit
28eff7122d
18
piker/cli.py
18
piker/cli.py
|
@ -13,7 +13,7 @@ import trio
|
||||||
from . import watchlists as wl
|
from . import watchlists as wl
|
||||||
from .brokers import core, get_brokermod
|
from .brokers import core, get_brokermod
|
||||||
from .brokers.core import _brokerd_main
|
from .brokers.core import _brokerd_main
|
||||||
from .ipc import Client
|
from .ipc import Channel
|
||||||
from .log import get_console_log, colorize_json, get_logger
|
from .log import get_console_log, colorize_json, get_logger
|
||||||
|
|
||||||
log = get_logger('cli')
|
log = get_logger('cli')
|
||||||
|
@ -133,32 +133,32 @@ def watch(loglevel, broker, rate, name, dhost):
|
||||||
|
|
||||||
async def launch_client(sleep=0.5, tries=10):
|
async def launch_client(sleep=0.5, tries=10):
|
||||||
|
|
||||||
async def subscribe(client):
|
async def subscribe(channel):
|
||||||
# initial subs request for symbols
|
# initial subs request for symbols
|
||||||
await client.send((brokermod.name, tickers))
|
await channel.send((brokermod.name, tickers))
|
||||||
# symbol data is returned in first response which we'll
|
# symbol data is returned in first response which we'll
|
||||||
# ignore on reconnect
|
# ignore on reconnect
|
||||||
await client.recv()
|
await channel.recv()
|
||||||
|
|
||||||
client = Client((dhost, 1616), on_reconnect=subscribe)
|
channel = Channel((dhost, 1616), on_reconnect=subscribe)
|
||||||
for _ in range(tries): # try for 5 seconds
|
for _ in range(tries): # try for 5 seconds
|
||||||
try:
|
try:
|
||||||
await client.connect()
|
await channel.connect()
|
||||||
break
|
break
|
||||||
except OSError as oserr:
|
except OSError as oserr:
|
||||||
await trio.sleep(sleep)
|
await trio.sleep(sleep)
|
||||||
else:
|
else:
|
||||||
# will raise indicating child proc should be spawned
|
# will raise indicating child proc should be spawned
|
||||||
await client.connect()
|
await channel.connect()
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
_async_main, name, client, tickers,
|
_async_main, name, channel, tickers,
|
||||||
brokermod, rate
|
brokermod, rate
|
||||||
)
|
)
|
||||||
|
|
||||||
# signal exit of stream handler task
|
# signal exit of stream handler task
|
||||||
await client.aclose()
|
await channel.aclose()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
trio.run(partial(launch_client, tries=1))
|
trio.run(partial(launch_client, tries=1))
|
||||||
|
|
Loading…
Reference in New Issue