Merge pull request #36 from pikers/daemonize

Daemonize broker quote engine
kivy_mainline_and_py3.8
goodboy 2018-04-23 00:47:51 -04:00 committed by GitHub
commit b00b872414
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 471 additions and 144 deletions

View File

@ -34,6 +34,15 @@ broker quote query ``rate`` with ``-r``::
piker watch indexes -l info -r 10
It is also possible to run the broker-client micro service as a daemon::
pikerd -l info
Then start the client app as normal::
piker watch indexes -l info
.. _trio: https://github.com/python-trio/trio
.. _pipenv: https://docs.pipenv.org/

View File

@ -13,7 +13,10 @@ __brokers__ = [
def get_brokermod(brokername: str) -> ModuleType:
"""Return the imported broker module by name.
"""
return import_module('.' + brokername, 'piker.brokers')
module = import_module('.' + brokername, 'piker.brokers')
# we only allows monkeys because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module
def iter_brokermods():

View File

@ -6,11 +6,13 @@ import inspect
from functools import partial
import socket
from types import ModuleType
from typing import AsyncContextManager
from typing import Coroutine, Callable
import msgpack
import trio
from ..log import get_logger
from . import get_brokermod
log = get_logger('broker.core')
@ -46,14 +48,14 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict:
return results
async def wait_for_network(get_quotes, sleep=1):
async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
"""Wait until the network comes back up.
"""
down = False
while True:
try:
with trio.move_on_after(1) as cancel_scope:
quotes = await get_quotes()
quotes = await net_func()
if down:
log.warn("Network is back up")
return quotes
@ -67,11 +69,135 @@ async def wait_for_network(get_quotes, sleep=1):
await trio.sleep(sleep)
async def poll_tickers(
client: 'Client',
quoter: AsyncContextManager,
tickers: [str],
q: trio.Queue,
class StreamQueue:
"""Stream wrapped as a queue that delivers ``msgpack`` serialized objects.
"""
def __init__(self, stream):
self.stream = stream
self.peer = stream.socket.getpeername()
self._agen = self._iter_packets()
async def _iter_packets(self):
"""Yield packets from the underlying stream.
"""
unpacker = msgpack.Unpacker(raw=False)
while True:
try:
data = await self.stream.receive_some(2**10)
log.trace(f"Data is {data}")
except trio.BrokenStreamError:
log.error(f"Stream connection {self.peer} broke")
return
if data == b'':
log.debug("Stream connection was closed")
return
unpacker.feed(data)
for packet in unpacker:
yield packet
async def put(self, data):
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
async def get(self):
return await self._agen.asend(None)
async def __aiter__(self):
return self._agen
class Client:
"""The most basic client.
Use this to talk to any micro-service daemon or other client(s) over a
TCP socket managed by ``trio``.
"""
def __init__(
self, sockaddr: tuple,
startup_seq: Coroutine,
auto_reconnect: bool = True,
):
self.sockaddr = sockaddr
self._startup_seq = startup_seq
self._autorecon = auto_reconnect
self.squeue = None
async def connect(self, sockaddr: tuple = None, **kwargs):
sockaddr = sockaddr or self.sockaddr
stream = await trio.open_tcp_stream(*sockaddr, **kwargs)
self.squeue = StreamQueue(stream)
await self._startup_seq(self)
return stream
async def send(self, item):
await self.squeue.put(item)
async def recv(self):
try:
return await self.squeue.get()
except trio.BrokenStreamError as err:
if self._autorecon:
await self._reconnect()
return await self.recv()
async def aclose(self, *args):
await self.squeue.stream.aclose()
async def __aenter__(self):
await self.connect(self.sockaddr)
return self
async def __aexit__(self, *args):
await self.aclose(*args)
async def _reconnect(self):
"""Handle connection failures by polling until a reconnect can be
established.
"""
down = False
while True:
try:
with trio.move_on_after(3) as cancel_scope:
await self.connect()
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn(
"Reconnect timed out after 3 seconds, retrying...")
continue
else:
log.warn("Stream connection re-established!")
break
except (OSError, ConnectionRefusedError):
if not down:
down = True
log.warn(
f"Connection to {self.sockaddr} went down, waiting"
" for re-establishment")
await trio.sleep(1)
async def aiter_recv(self):
"""Async iterate items from underlying stream.
"""
while True:
try:
async for item in self.squeue:
yield item
except trio.BrokenStreamError as err:
if not self._autorecon:
raise
if self._autorecon: # attempt reconnect
await self._reconnect()
continue
else:
return
async def stream_quotes(
brokermod: ModuleType,
get_quotes: Coroutine,
tickers2qs: {str: StreamQueue},
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None:
@ -81,13 +207,22 @@ async def poll_tickers(
A broker-client ``quoter`` async context manager must be provided which
returns an async quote function.
"""
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching
async with quoter(client, tickers) as get_quotes:
while True: # use an event here to trigger exit?
prequote_start = time.time()
if not any(tickers2qs.values()):
log.warn(f"No subs left for broker {brokermod.name}, exiting task")
break
tickers = list(tickers2qs.keys())
with trio.move_on_after(3) as cancel_scope:
quotes = await get_quotes(tickers)
@ -98,13 +233,8 @@ async def poll_tickers(
quotes = await wait_for_network(partial(get_quotes, tickers))
postquote_start = time.time()
payload = {}
q_payloads = {}
for symbol, quote in quotes.items():
# FIXME: None is returned if a symbol can't be found.
# Consider filtering out such symbols before starting poll loop
if quote is None:
continue
if diff_cached:
# if cache is enabled then only deliver "new" changes
last = _cache.setdefault(symbol, {})
@ -113,12 +243,25 @@ async def poll_tickers(
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
payload[symbol] = quote
for queue in tickers2qs[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote
else:
payload[symbol] = quote
for queue in tickers2qs[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote
if payload:
q.put_nowait(payload)
# deliver to each subscriber
if q_payloads:
for queue, payload in q_payloads.items():
try:
await queue.put(payload)
except (
# That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError,
):
log.warn(f"{queue.peer} went down?")
for qset in tickers2qs.values():
qset.discard(queue)
req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3)
@ -132,3 +275,125 @@ async def poll_tickers(
else:
log.debug(f"Sleeping for {delay}")
await trio.sleep(delay)
async def start_quoter(
broker2tickersubs: dict,
clients: dict,
dtasks: set, # daemon task registry
nursery: "Nusery",
stream: trio.SocketStream,
) -> None:
"""Handle per-broker quote stream subscriptions.
Spawns new quoter tasks for each broker backend on-demand.
Since most brokers seems to support batch quote requests we
limit to one task per process for now.
"""
queue = StreamQueue(stream) # wrap in a shabby queue-like api
log.info(f"Accepted new connection from {queue.peer}")
async with queue.stream:
async for broker, tickers in queue:
log.info(
f"{queue.peer} subscribed to {broker} for tickers {tickers}")
if broker not in broker2tickersubs:
brokermod = get_brokermod(broker)
# TODO: move to AsyncExitStack in 3.7
client_cntxmng = brokermod.get_client()
client = await client_cntxmng.__aenter__()
get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = (
brokermod, client, client_cntxmng, get_quotes)
tickers2qs = broker2tickersubs.setdefault(
broker, {}.fromkeys(tickers, {queue, }))
else:
log.info(f"Subscribing with existing `{broker}` daemon")
brokermod, client, _, get_quotes = clients[broker]
tickers2qs = broker2tickersubs[broker]
# update map from each symbol to requesting client's queue
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# beginning of section to be trimmed out with #37
#################################################
# get a single quote filtering out any bad tickers
# NOTE: this code is always run for every new client
# subscription even when a broker quoter task is already running
# since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for {queue.peer}")
quotes = await get_quotes(tickers)
# pop any tickers that aren't returned in the first quote
tickers = set(tickers) - set(quotes)
for ticker in tickers:
log.warn(
f"Symbol `{ticker}` not found by broker `{brokermod.name}`"
)
tickers2qs.pop(ticker, None)
# pop any tickers that return "empty" quotes
payload = {}
for symbol, quote in quotes.items():
if quote is None:
log.warn(
f"Symbol `{symbol}` not found by broker"
f" `{brokermod.name}`")
tickers2qs.pop(symbol, None)
continue
payload[symbol] = quote
# push initial quotes response for client initialization
await queue.put(payload)
# end of section to be trimmed out with #37
###########################################
if broker not in dtasks: # no quoter task yet
# task should begin on the next checkpoint/iteration
log.info(f"Spawning quoter task for {brokermod.name}")
nursery.start_soon(
stream_quotes, brokermod, get_quotes, tickers2qs)
dtasks.add(broker)
log.debug("Waiting on subscription request")
else:
log.info(f"client @ {queue.peer} disconnected")
# drop any lingering subscriptions
for ticker, qset in tickers2qs.items():
qset.discard(queue)
# if there are no more subscriptions with this broker
# drop from broker subs dict
if not any(tickers2qs.values()):
log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None)
dtasks.discard(broker)
# TODO: move to AsyncExitStack in 3.7
for _, _, cntxmng, _ in clients.values():
# FIXME: yes I know it's totally wrong...
await cntxmng.__aexit__(None, None, None)
async def _daemon_main() -> None:
"""Entry point for the broker daemon which waits for connections
before spawning micro-services.
"""
# global space for broker-daemon subscriptions
broker2tickersubs = {}
clients = {}
dtasks = set()
async with trio.open_nursery() as nursery:
listeners = await nursery.start(
partial(
trio.serve_tcp,
partial(
start_quoter, broker2tickersubs, clients,
dtasks, nursery
),
1616, host='127.0.0.1'
)
)
log.debug(f"Spawned {listeners}")

View File

@ -284,16 +284,33 @@ async def get_client() -> Client:
write_conf(client)
@asynccontextmanager
async def quoter(client: Client, tickers: [str]):
"""Quoter context.
"""
t2ids = await client.tickers2ids(tickers)
ids = ','.join(map(str, t2ids.values()))
t2ids = {}
ids = ''
def filter_symbols(quotes_dict):
nonlocal t2ids
for symbol, quote in quotes_dict.items():
if quote['low52w'] is None:
log.warn(
f"{symbol} seems to be defunct discarding from tickers")
t2ids.pop(symbol)
async def get_quote(tickers):
"""Query for quotes using cached symbol ids.
"""
if not tickers:
return {}
nonlocal ids, t2ids
new, current = set(tickers), set(t2ids.keys())
if new != current:
# update ticker ids cache
log.debug(f"Tickers set changed {new - current}")
t2ids = await client.tickers2ids(tickers)
ids = ','.join(map(str, t2ids.values()))
try:
quotes_resp = await client.api.quotes(ids=ids)
except QuestradeError as qterr:
@ -310,20 +327,17 @@ async def quoter(client: Client, tickers: [str]):
quotes[quote['symbol']] = quote
if quote.get('delay', 0) > 0:
log.warning(f"Delayed quote:\n{quote}")
log.warn(f"Delayed quote:\n{quote}")
return quotes
first_quotes_dict = await get_quote(tickers)
for symbol, quote in first_quotes_dict.items():
if quote['low52w'] is None:
log.warn(f"{symbol} seems to be defunct discarding from tickers")
t2ids.pop(symbol)
filter_symbols(first_quotes_dict)
# re-save symbol ids cache
ids = ','.join(map(str, t2ids.values()))
yield get_quote
return get_quote
# Questrade key conversion / column order

View File

@ -72,11 +72,10 @@ async def get_client() -> Client:
yield Client()
@asynccontextmanager
async def quoter(client: Client, tickers: [str]):
"""Quoter context.
"""
yield client.quote
return client.quote
# Robinhood key conversion / column order

View File

@ -2,19 +2,18 @@
Console interface to broker client/daemons.
"""
from functools import partial
from importlib import import_module
import os
from collections import defaultdict
from multiprocessing import Process
import json
import os
import click
import trio
import pandas as pd
import trio
from .log import get_console_log, colorize_json, get_logger
from . import watchlists as wl
from .brokers import core, get_brokermod
from .brokers.core import _daemon_main, Client
from .log import get_console_log, colorize_json, get_logger
log = get_logger('cli')
DEFAULT_BROKER = 'robinhood'
@ -35,6 +34,14 @@ def run(main, loglevel='info'):
log.debug("Exiting piker")
@click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level')
def pikerd(loglevel):
"""Spawn the piker daemon.
"""
run(_daemon_main, loglevel)
@click.group()
def cli():
pass
@ -115,20 +122,52 @@ def quote(loglevel, broker, tickers, df_output):
@click.option('--rate', '-r', default=5, help='Logging level')
@click.argument('name', nargs=1, required=True)
def watch(loglevel, broker, rate, name):
"""Spawn a watchlist.
"""Spawn a real-time watchlist.
"""
from .ui.watchlist import _async_main
log = get_console_log(loglevel) # activate console logging
brokermod = get_brokermod(broker)
watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path)
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
tickers = watchlists[name]
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
async def main(timeout=1):
trio.run(_async_main, name, watchlists[name], brokermod, rate)
async def subscribe(client):
# initial request for symbols price streams
await client.send((brokermod.name, tickers))
client = Client(('127.0.0.1', 1616), subscribe)
try:
await client.connect()
except OSError as oserr:
await trio.sleep(0.5)
# will raise indicating child proc should be spawned
await client.connect()
async with trio.open_nursery() as nursery:
nursery.start_soon(
_async_main, name, client, tickers,
brokermod, rate
)
# signal exit of stream handler task
await client.aclose()
try:
trio.run(main)
except OSError as oserr:
log.warn("No broker daemon could be found")
log.warn(oserr)
log.warning("Spawning local broker-daemon...")
child = Process(
target=run,
args=(_daemon_main, loglevel),
daemon=True,
)
child.start()
trio.run(main, 5)
child.join()
@cli.group()

View File

@ -7,7 +7,6 @@ Launch with ``piker watch <watchlist name>``.
"""
from itertools import chain
from types import ModuleType
from functools import partial
import trio
from kivy.uix.boxlayout import BoxLayout
@ -21,7 +20,6 @@ from kivy.core.window import Window
from ..log import get_logger
from .pager import PagerView
from ..brokers.core import poll_tickers
log = get_logger('watchlist')
@ -49,7 +47,7 @@ _kv = (f'''
#:kivy 1.10.0
<Cell>
font_size: 18
font_size: 20
# text_size: self.size
size: self.texture_size
color: {colorcode('gray')}
@ -318,9 +316,10 @@ class TickerTable(GridLayout):
async def update_quotes(
nursery: 'Nursery',
brokermod: ModuleType,
widgets: dict,
queue: trio.Queue,
client: 'Client',
symbol_data: dict,
first_quotes: dict
):
@ -360,9 +359,7 @@ async def update_quotes(
grid.render_rows(cache)
# core cell update loop
while True:
log.debug("Waiting on quotes")
quotes = await queue.get() # new quotes data only
async for quotes in client.aiter_recv(): # new quotes data only
for symbol, quote in quotes.items():
record, displayable = brokermod.format_quote(
quote, symbol_data=symbol_data)
@ -372,6 +369,10 @@ async def update_quotes(
color_row(row, record)
grid.render_rows(cache)
log.debug("Waiting on quotes")
log.warn("Server connection dropped")
nursery.cancel_scope.cancel()
async def run_kivy(root, nursery):
@ -381,24 +382,20 @@ async def run_kivy(root, nursery):
nursery.cancel_scope.cancel() # cancel all other tasks that may be running
async def _async_main(name, tickers, brokermod, rate):
async def _async_main(name, client, tickers, brokermod, rate):
'''Launch kivy app + all other related tasks.
This is started with cli command `piker watch`.
'''
queue = trio.Queue(1000)
async with brokermod.get_client() as client:
async with trio.open_nursery() as nursery:
# get initial symbol data
async with brokermod.get_client() as bclient:
# get long term data including last days close price
sd = await client.symbol_data(tickers)
nursery.start_soon(
partial(poll_tickers, client, brokermod.quoter, tickers, queue,
rate=rate)
)
sd = await bclient.symbol_data(tickers)
async with trio.open_nursery() as nursery:
# get first quotes response
quotes = await queue.get()
log.debug("Waiting on first quote...")
quotes = await client.recv()
first_quotes = [
brokermod.format_quote(quote, symbol_data=sd)[0]
for quote in quotes.values()]
@ -459,4 +456,4 @@ async def _async_main(name, tickers, brokermod, rate):
}
nursery.start_soon(run_kivy, widgets['root'], nursery)
nursery.start_soon(
update_quotes, brokermod, widgets, queue, sd, quotes)
update_quotes, nursery, brokermod, widgets, client, sd, quotes)

View File

@ -31,11 +31,12 @@ setup(
entry_points={
'console_scripts': [
'piker = piker.cli:cli',
'pikerd = piker.cli:pikerd',
]
},
install_requires=[
'click', 'colorlog', 'trio', 'attrs', 'async_generator',
'pygments', 'cython', 'asks', 'pandas',
'pygments', 'cython', 'asks', 'pandas', 'msgpack',
#'kivy', see requirement.txt; using a custom branch atm
],
extras_require={