Merge pull request #47 from pikers/remote_connect

Remote connect
kivy_mainline_and_py3.8
goodboy 2018-05-09 18:18:35 -04:00 committed by GitHub
commit 09ae9f5ef1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 49 deletions

View File

@ -306,15 +306,11 @@ async def start_quoter(
get_quotes = await brokermod.quoter(client, tickers) get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = ( clients[broker] = (
brokermod, client, client_cntxmng, get_quotes) brokermod, client, client_cntxmng, get_quotes)
tickers2qs = broker2tickersubs.setdefault( tickers2qs = broker2tickersubs.setdefault(broker, {})
broker, {}.fromkeys(tickers, {queue, }))
else: else:
log.info(f"Subscribing with existing `{broker}` daemon") log.info(f"Subscribing with existing `{broker}` daemon")
brokermod, client, _, get_quotes = clients[broker] brokermod, client, _, get_quotes = clients[broker]
tickers2qs = broker2tickersubs[broker] tickers2qs = broker2tickersubs[broker]
# update map from each symbol to requesting client's queue
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# beginning of section to be trimmed out with #37 # beginning of section to be trimmed out with #37
################################################# #################################################
@ -324,13 +320,13 @@ async def start_quoter(
# since the new client needs to know what symbols are accepted # since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for {queue.peer}") log.warn(f"Retrieving smoke quote for {queue.peer}")
quotes = await get_quotes(tickers) quotes = await get_quotes(tickers)
# pop any tickers that aren't returned in the first quote # report any tickers that aren't returned in the first quote
tickers = set(tickers) - set(quotes) invalid_tickers = set(tickers) - set(quotes)
for ticker in tickers: for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn( log.warn(
f"Symbol `{ticker}` not found by broker `{brokermod.name}`" f"Symbol `{symbol}` not found by broker `{brokermod.name}`"
) )
tickers2qs.pop(ticker, None)
# pop any tickers that return "empty" quotes # pop any tickers that return "empty" quotes
payload = {} payload = {}
@ -339,16 +335,26 @@ async def start_quoter(
log.warn( log.warn(
f"Symbol `{symbol}` not found by broker" f"Symbol `{symbol}` not found by broker"
f" `{brokermod.name}`") f" `{brokermod.name}`")
tickers2qs.pop(symbol, None) tickers.remove(symbol)
continue continue
payload[symbol] = quote payload[symbol] = quote
# push initial quotes response for client initialization
await queue.put(payload)
# end of section to be trimmed out with #37 # end of section to be trimmed out with #37
########################################### ###########################################
# first respond with symbol data for all tickers (allows
# clients to receive broker specific setup info)
sd = await client.symbol_data(tickers)
assert sd, "No symbol data could be found?"
await queue.put(sd)
# update map from each symbol to requesting client's queue
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# push initial quotes response for client initialization
await queue.put(payload)
if broker not in dtasks: # no quoter task yet if broker not in dtasks: # no quoter task yet
# task should begin on the next checkpoint/iteration # task should begin on the next checkpoint/iteration
log.info(f"Spawning quoter task for {brokermod.name}") log.info(f"Spawning quoter task for {brokermod.name}")
@ -376,7 +382,7 @@ async def start_quoter(
await cntxmng.__aexit__(None, None, None) await cntxmng.__aexit__(None, None, None)
async def _daemon_main() -> None: async def _daemon_main(host) -> None:
"""Entry point for the broker daemon which waits for connections """Entry point for the broker daemon which waits for connections
before spawning micro-services. before spawning micro-services.
""" """
@ -393,7 +399,7 @@ async def _daemon_main() -> None:
start_quoter, broker2tickersubs, clients, start_quoter, broker2tickersubs, clients,
dtasks, nursery dtasks, nursery
), ),
1616, host='127.0.0.1' 1616, host=host,
) )
) )
log.debug(f"Spawned {listeners}") log.debug(f"Spawned {listeners}")

View File

@ -414,7 +414,8 @@ def format_quote(
# convert values to a displayble format using available formatting func # convert values to a displayble format using available formatting func
if isinstance(new_key, tuple): if isinstance(new_key, tuple):
new_key, func = new_key new_key, func = new_key
display_value = func(value) display_value = func(value) if value else value
new[new_key] = value new[new_key] = value
displayable[new_key] = display_value displayable[new_key] = display_value

View File

@ -8,7 +8,7 @@ from async_generator import asynccontextmanager
import asks import asks
from ..log import get_logger from ..log import get_logger
from ._util import resproc from ._util import resproc, BrokerError
from ..calc import percent_change from ..calc import percent_change
asks.init('trio') asks.init('trio')
@ -51,10 +51,12 @@ class Client:
async def quote(self, symbols: [str]): async def quote(self, symbols: [str]):
"""Retrieve quotes for a list of ``symbols``. """Retrieve quotes for a list of ``symbols``.
""" """
return self._zip_in_order( try:
symbols, resp = await self.api.quotes(','.join(symbols))
(await self.api.quotes(','.join(symbols)))['results'] except BrokerError:
) resp = {'results': [None] * len(symbols)}
return self._zip_in_order(symbols, resp['results'])
async def symbol_data(self, symbols: [str]): async def symbol_data(self, symbols: [str]):
"""Retrieve symbol data via the ``fundmentals`` endpoint. """Retrieve symbol data via the ``fundmentals`` endpoint.

View File

@ -27,4 +27,6 @@ def percent_change(init, new):
"""Calcuate the percentage change of some ``new`` value """Calcuate the percentage change of some ``new`` value
from some initial value, ``init``. from some initial value, ``init``.
""" """
if not (init and new):
return 0
return (new - init) / init * 100. return (new - init) / init * 100.

View File

@ -23,23 +23,17 @@ _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
def run(main, loglevel='info'): def run(main, loglevel='info'):
log = get_console_log(loglevel) get_console_log(loglevel)
return trio.run(main)
# main sandwich
try:
return trio.run(main)
except Exception as err:
log.exception(err)
finally:
log.debug("Exiting piker")
@click.command() @click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
def pikerd(loglevel): @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host):
"""Spawn the piker daemon. """Spawn the piker daemon.
""" """
run(_daemon_main, loglevel) run(partial(_daemon_main, host), loglevel)
@click.group() @click.group()
@ -102,9 +96,9 @@ def quote(loglevel, broker, tickers, df_output):
log.error(f"No quotes could be found for {tickers}?") log.error(f"No quotes could be found for {tickers}?")
return return
cols = next(filter(bool, quotes.values())).copy()
cols.pop('symbol')
if df_output: if df_output:
cols = next(filter(bool, quotes.values())).copy()
cols.pop('symbol')
df = pd.DataFrame( df = pd.DataFrame(
(quote or {} for quote in quotes.values()), (quote or {} for quote in quotes.values()),
index=quotes.keys(), index=quotes.keys(),
@ -120,8 +114,10 @@ def quote(loglevel, broker, tickers, df_output):
help='Broker backend to use') help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--rate', '-r', default=5, help='Logging level') @click.option('--rate', '-r', default=5, help='Logging level')
@click.option('--dhost', '-dh', default='127.0.0.1',
help='Daemon host address to connect to')
@click.argument('name', nargs=1, required=True) @click.argument('name', nargs=1, required=True)
def watch(loglevel, broker, rate, name): def watch(loglevel, broker, rate, name, dhost):
"""Spawn a real-time watchlist. """Spawn a real-time watchlist.
""" """
from .ui.watchlist import _async_main from .ui.watchlist import _async_main
@ -131,17 +127,20 @@ def watch(loglevel, broker, rate, name):
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
tickers = watchlists[name] tickers = watchlists[name]
async def main(timeout=1): async def launch_client(sleep=0.5, tries=10):
async def subscribe(client): async def subscribe(client):
# initial request for symbols price streams # initial request for symbols price streams
await client.send((brokermod.name, tickers)) await client.send((brokermod.name, tickers))
client = Client(('127.0.0.1', 1616), subscribe) client = Client((dhost, 1616), subscribe)
try: for _ in range(tries): # try for 5 seconds
await client.connect() try:
except OSError as oserr: await client.connect()
await trio.sleep(0.5) break
except OSError as oserr:
await trio.sleep(sleep)
else:
# will raise indicating child proc should be spawned # will raise indicating child proc should be spawned
await client.connect() await client.connect()
@ -155,18 +154,18 @@ def watch(loglevel, broker, rate, name):
await client.aclose() await client.aclose()
try: try:
trio.run(main) trio.run(partial(launch_client, tries=1))
except OSError as oserr: except OSError as oserr:
log.warn("No broker daemon could be found") log.warn("No broker daemon could be found")
log.warn(oserr) log.warn(oserr)
log.warning("Spawning local broker-daemon...") log.warning("Spawning local broker-daemon...")
child = Process( child = Process(
target=run, target=run,
args=(_daemon_main, loglevel), args=(partial(_daemon_main, dhost), loglevel),
daemon=True, daemon=True,
) )
child.start() child.start()
trio.run(main, 5) trio.run(launch_client, 5)
child.join() child.join()

View File

@ -387,10 +387,9 @@ async def _async_main(name, client, tickers, brokermod, rate):
This is started with cli command `piker watch`. This is started with cli command `piker watch`.
''' '''
# get initial symbol data # get initial symbol data (long term data including last days close price)
async with brokermod.get_client() as bclient: # TODO: need something better this this toy protocol
# get long term data including last days close price sd = await client.recv()
sd = await bclient.symbol_data(tickers)
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
# get first quotes response # get first quotes response