Allow broker specific subscriptions

Allow client connections to subscribe for quote streams from specific
brokers and spawn broker-client quoter tasks on-demand according
to client connection demands. Support multiple subscribers to a
single daemon process.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-04-17 16:53:29 -04:00
parent f80735121c
commit 6359623019
1 changed files with 120 additions and 99 deletions

View File

@ -12,6 +12,7 @@ from typing import AsyncContextManager
import trio
from ..log import get_logger
from . import get_brokermod
log = get_logger('broker.core')
@ -126,10 +127,8 @@ class StreamQueue:
async def poll_tickers(
client: 'Client',
quoter: AsyncContextManager,
tickers: [str],
queue: StreamQueue,
brokermod: ModuleType,
tickers2qs: {str: StreamQueue},
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None:
@ -142,107 +141,129 @@ async def poll_tickers(
sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching
async with quoter(client, tickers) as get_quotes:
# run a first quote smoke test filtering out any bad tickers
first_quotes_dict = await get_quotes(tickers)
# FIXME: oh god it's so hideous
tickers[:] = list(first_quotes_dict.keys())[:]
while True: # use an event here to trigger exit?
prequote_start = time.time()
with trio.move_on_after(3) as cancel_scope:
quotes = await get_quotes(tickers)
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...")
# handle network outages by idling until response is received
quotes = await wait_for_network(partial(get_quotes, tickers))
postquote_start = time.time()
payload = {}
for symbol, quote in quotes.items():
# FIXME: None is returned if a symbol can't be found.
# Consider filtering out such symbols before starting poll loop
if quote is None:
continue
if diff_cached:
# if cache is enabled then only deliver "new" changes
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
payload[symbol] = quote
else:
payload[symbol] = quote
if payload:
await queue.put(payload)
req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3)
tot = req_time + proc_time
log.debug(f"Request + processing took {tot}")
delay = sleeptime - tot
if delay <= 0:
log.warn(
f"Took {req_time} (request) + {proc_time} (processing) "
f"= {tot} secs (> {sleeptime}) for processing quotes?")
else:
log.debug(f"Sleeping for {delay}")
await trio.sleep(delay)
async def _handle_subs(
queue,
stream2tickers,
nursery,
task_status=trio.TASK_STATUS_IGNORED
):
"""Handle quote stream subscriptions.
"""
async with queue.stream:
async for tickers in queue:
task_status.started(tickers)
log.info(f"{queue.peer} subscribed for tickers {tickers}")
stream2tickers[queue.peer] = tickers
else:
log.info(f"{queue.peer} was disconnected")
nursery.cancel_scope.cancel()
async def _daemon_main(brokermod):
"""Main entry point for the piker daemon.
"""
rate = 5
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
stream2tickers = {}
tickers = list(tickers2qs.keys())
async with brokermod.get_client() as client:
async with brokermod.quoter(client, tickers) as get_quotes:
# run a first quote smoke test filtering out any bad tickers
first_quotes_dict = await get_quotes(tickers)
valid_symbols = list(first_quotes_dict.keys())[:]
async def start_quoter(stream):
queue = StreamQueue(stream) # wrap in a shabby queue-like api
log.debug(f"Accepted new connection from {queue.peer}")
for ticker in set(tickers) - set(valid_symbols):
tickers2qs.pop(ticker)
# spawn request handler
async with trio.open_nursery() as nursery:
await nursery.start(
_handle_subs, queue, stream2tickers, nursery)
nursery.start_soon(
partial(
poll_tickers, client, brokermod.quoter,
stream2tickers[queue.peer], queue, rate=rate)
)
# push intial quotes
q_payloads = {}
for symbol, quote in first_quotes_dict.items():
if quote is None:
tickers2qs.pop(symbol)
continue
for queue in tickers2qs[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote
async with trio.open_nursery() as nursery:
listeners = await nursery.start(
partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1')
)
log.debug(f"Spawned {listeners}")
if q_payloads:
for queue, payload in q_payloads.items():
await queue.put(payload)
# assign valid symbol set
tickers = list(tickers2qs.keys())
while True: # use an event here to trigger exit?
prequote_start = time.time()
with trio.move_on_after(3) as cancel_scope:
quotes = await get_quotes(valid_symbols)
cancelled = cancel_scope.cancelled_caught
if cancelled:
log.warn("Quote query timed out after 3 seconds, retrying...")
# handle network outages by idling until response is received
quotes = await wait_for_network(partial(get_quotes, tickers))
postquote_start = time.time()
q_payloads = {}
for symbol, quote in quotes.items():
# FIXME: None is returned if a symbol can't be found.
# Consider filtering out such symbols before starting poll loop
if quote is None:
continue
if diff_cached:
# if cache is enabled then only deliver "new" changes
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(
f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote
for queue in tickers2qs[symbol]:
q_payloads.setdefault(queue, {})[symbol] = quote
else:
for queue in tickers2qs[symbol]:
q_payloads[queue] = {symbol: quote}
# deliver to each subscriber
if q_payloads:
for queue, payload in q_payloads.items():
await queue.put(payload)
req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3)
tot = req_time + proc_time
log.debug(f"Request + processing took {tot}")
delay = sleeptime - tot
if delay <= 0:
log.warn(
f"Took {req_time} (request) + {proc_time} (processing) "
f"= {tot} secs (> {sleeptime}) for processing quotes?")
else:
log.debug(f"Sleeping for {delay}")
await trio.sleep(delay)
async def start_quoter(stream):
"""Handle per-broker quote stream subscriptions.
"""
broker2tickersubs = {}
tickers2qs = {}
queue = StreamQueue(stream) # wrap in a shabby queue-like api
log.debug(f"Accepted new connection from {queue.peer}")
async with trio.open_nursery() as nursery:
async with queue.stream:
async for (broker, tickers) in queue:
log.info(
f"{queue.peer} subscribed to {broker} for tickers {tickers}")
if broker not in broker2tickersubs: # spawn quote streamer
tickers2qs = broker2tickersubs.setdefault(broker, {})
brokermod = get_brokermod(broker)
log.info(f"Spawning quote streamer for broker {broker}")
# task should begin on the next checkpoint/iteration
nursery.start_soon(poll_tickers, brokermod, tickers2qs)
# create map from each symbol to consuming client queues
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# remove queue from any ticker subscriptions it no longer wants
for ticker in set(tickers2qs) - set(tickers):
tickers2qs[ticker].remove(queue)
else:
log.info(f"{queue.peer} was disconnected")
nursery.cancel_scope.cancel()
async def _daemon_main(brokermod):
"""Entry point for the piker daemon.
"""
async with trio.open_nursery() as nursery:
listeners = await nursery.start(
partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1')
)
log.debug(f"Spawned {listeners}")