Port broker daemon to tractor

Drop all channel/connection handling from the core and break up all the
start up steps into compact and useful functions. The main difference is
the daemon now only needs to worry about spawning per broker streaming
tasks and handling symbol list subscription requests.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-06-26 17:55:52 -04:00
parent c0d8d4fd99
commit 8019296c67
1 changed files with 142 additions and 118 deletions

View File

@ -10,9 +10,12 @@ from typing import Coroutine, Callable
import trio import trio
from .. import tractor
from ..log import get_logger from ..log import get_logger
from ..ipc import StreamQueue, Channel from ..ipc import Channel
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.core') log = get_logger('broker.core')
@ -138,8 +141,8 @@ async def stream_quotes(
ConnectionRefusedError, ConnectionRefusedError,
): ):
log.warn(f"{chan.raddr} went down?") log.warn(f"{chan.raddr} went down?")
for qset in tickers2chans.values(): for chanset in tickers2chans.values():
qset.discard(chan) chanset.discard(chan)
req_time = round(postquote_start - prequote_start, 3) req_time = round(postquote_start - prequote_start, 3)
proc_time = round(time.time() - postquote_start, 3) proc_time = round(time.time() - postquote_start, 3)
@ -155,12 +158,97 @@ async def stream_quotes(
await trio.sleep(delay) await trio.sleep(delay)
async def start_quoter( async def get_cached_client(broker, tickers):
broker2tickersubs: dict, """Get the current actor's cached broker client if available or create a
clients: dict, new one.
dtasks: set, # daemon task registry """
nursery: 'Nursery', # check if a cached client is in the local actor's statespace
stream: trio.SocketStream, clients = tractor.current_actor().statespace.setdefault('clients', {})
if clients:
return clients[broker]
else:
log.info(f"Creating new client for broker {broker}")
brokermod = get_brokermod(broker)
# TODO: move to AsyncExitStack in 3.7
client_cntxmng = brokermod.get_client()
client = await client_cntxmng.__aenter__()
get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = (
brokermod, client, client_cntxmng, get_quotes)
return brokermod, client, client_cntxmng, get_quotes
async def symbol_data(broker, tickers):
"""Retrieve baseline symbol info from broker.
"""
_, client, _, get_quotes = await get_cached_client(broker, tickers)
return await client.symbol_data(tickers)
async def smoke_quote(get_quotes, tickers, broker):
"""Do an initial "smoke" request for symbols in ``tickers`` filtering
oout any symbols not supported by the broker queried in the call to
``get_quotes()``.
"""
# TODO: trim out with #37
#################################################
# get a single quote filtering out any bad tickers
# NOTE: this code is always run for every new client
# subscription even when a broker quoter task is already running
# since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for symbols {tickers}")
quotes = await get_quotes(tickers)
# report any tickers that aren't returned in the first quote
invalid_tickers = set(tickers) - set(quotes)
for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn(
f"Symbol `{symbol}` not found by broker `{broker}`"
)
# pop any tickers that return "empty" quotes
payload = {}
for symbol, quote in quotes.items():
if quote is None:
log.warn(
f"Symbol `{symbol}` not found by broker"
f" `{broker}`")
# XXX: not this mutates the input list (for now)
tickers.remove(symbol)
continue
payload[symbol] = quote
return payload
# end of section to be trimmed out with #37
###########################################
def modify_quote_stream(broker, tickers, chan=None, cid=None):
"""Absolute symbol subscription list for each quote stream.
"""
log.info(f"{chan} changed symbol subscription to {tickers}")
ss = tractor.current_actor().statespace
broker2tickersubs = ss['broker2tickersubs']
tickers2chans = broker2tickersubs.get(broker)
# update map from each symbol to requesting client's chan
for ticker in tickers:
chanset = tickers2chans.setdefault(ticker, set())
if chan not in chanset:
chanset.add(chan)
for ticker in filter(lambda ticker: ticker not in tickers, tickers2chans):
chanset = tickers2chans.get(ticker)
if chanset and chan in chanset:
chanset.discard(chan)
async def start_quote_stream(
broker: str,
tickers: [str],
chan: 'Channel' = None,
cid: str = None,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions. """Handle per-broker quote stream subscriptions.
@ -168,120 +256,56 @@ async def start_quoter(
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
limit to one task per process for now. limit to one task per process for now.
""" """
chan = Channel(stream=stream) # pull global vars from local actor
log.info(f"Accepted new connection from {chan.raddr}") ss = tractor.current_actor().statespace
async with chan.squeue.stream: broker2tickersubs = ss['broker2tickersubs']
async for broker, tickers in chan: clients = ss['clients']
tickers = list(tickers) dtasks = ss['dtasks']
log.info( tickers = list(tickers)
f"{chan.raddr} subscribed to {broker} for tickers {tickers}") log.info(
f"{chan.uid} subscribed to {broker} for tickers {tickers}")
if broker not in broker2tickersubs: brokermod, client, _, get_quotes = await get_cached_client(broker, tickers)
brokermod = get_brokermod(broker) if broker not in broker2tickersubs:
tickers2chans = broker2tickersubs.setdefault(broker, {})
else:
log.info(f"Subscribing with existing `{broker}` daemon")
tickers2chans = broker2tickersubs[broker]
# TODO: move to AsyncExitStack in 3.7 # do a smoke quote (not this mutates the input list and filters out bad
client_cntxmng = brokermod.get_client() # symbols for now)
client = await client_cntxmng.__aenter__() payload = await smoke_quote(get_quotes, tickers, broker)
get_quotes = await brokermod.quoter(client, tickers) # push initial smoke quote response for client initialization
clients[broker] = ( await chan.send({'yield': payload, 'cid': cid})
brokermod, client, client_cntxmng, get_quotes)
tickers2chans = broker2tickersubs.setdefault(broker, {})
else:
log.info(f"Subscribing with existing `{broker}` daemon")
brokermod, client, _, get_quotes = clients[broker]
tickers2chans = broker2tickersubs[broker]
# beginning of section to be trimmed out with #37 # update map from each symbol to requesting client's chan
################################################# modify_quote_stream(broker, tickers, chan=chan, cid=cid)
# get a single quote filtering out any bad tickers
# NOTE: this code is always run for every new client
# subscription even when a broker quoter task is already running
# since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for {chan.raddr}")
quotes = await get_quotes(tickers)
# report any tickers that aren't returned in the first quote
invalid_tickers = set(tickers) - set(quotes)
for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn(
f"Symbol `{symbol}` not found by broker `{brokermod.name}`"
)
# pop any tickers that return "empty" quotes if broker not in dtasks: # no quoter task yet
payload = {} # task should begin on the next checkpoint/iteration
for symbol, quote in quotes.items(): log.info(f"Spawning quoter task for {brokermod.name}")
if quote is None: async with trio.open_nursery() as nursery:
log.warn( nursery.start_soon(partial(
f"Symbol `{symbol}` not found by broker" stream_quotes, brokermod, get_quotes, tickers2chans,
f" `{brokermod.name}`") cid=cid)
tickers.remove(symbol)
continue
payload[symbol] = quote
# end of section to be trimmed out with #37
###########################################
# first respond with symbol data for all tickers (allows
# clients to receive broker specific setup info)
sd = await client.symbol_data(tickers)
assert sd, "No symbol data could be found?"
await chan.send(sd)
# update map from each symbol to requesting client's chan
for ticker in tickers:
tickers2chans.setdefault(ticker, set()).add(chan)
# push initial quotes response for client initialization
await chan.send(payload)
if broker not in dtasks: # no quoter task yet
# task should begin on the next checkpoint/iteration
log.info(f"Spawning quoter task for {brokermod.name}")
nursery.start_soon(
stream_quotes, brokermod, get_quotes, tickers2chans)
dtasks.add(broker)
log.debug("Waiting on subscription request")
else:
log.info(f"client @ {chan.raddr} disconnected")
# drop any lingering subscriptions
for ticker, qset in tickers2chans.items():
qset.discard(chan)
# if there are no more subscriptions with this broker
# drop from broker subs dict
if not any(tickers2chans.values()):
log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None)
dtasks.discard(broker)
# TODO: move to AsyncExitStack in 3.7
for _, _, cntxmng, _ in clients.values():
# FIXME: yes I know it's totally wrong...
await cntxmng.__aexit__(None, None, None)
async def _brokerd_main(host) -> None:
"""Entry point for the broker daemon which waits for connections
before spawning micro-services.
"""
# global space for broker-daemon subscriptions
broker2tickersubs = {}
clients = {}
dtasks = set()
async with trio.open_nursery() as nursery:
listeners = await nursery.start(
partial(
trio.serve_tcp,
partial(
start_quoter, broker2tickersubs, clients,
dtasks, nursery
),
1617, host=host,
) )
) dtasks.add(broker)
log.debug(f"Spawned {listeners}") # unblocks when no more symbols subscriptions exist and the
# quote streamer task terminates (usually because another call
# was made to `modify_quoter` to unsubscribe from streaming
# symbols)
# if there are truly no more subscriptions with this broker
# drop from broker subs dict
if not any(tickers2chans.values()):
log.info(f"No more subscriptions for {broker}")
broker2tickersubs.pop(broker, None)
dtasks.discard(broker)
# TODO: move to AsyncExitStack in 3.7
for _, _, cntxmng, _ in clients.values():
# FIXME: yes I know there's no error handling..
await cntxmng.__aexit__(None, None, None)
async def _test_price_stream(broker, symbols, *, chan=None, cid=None): async def _test_price_stream(broker, symbols, *, chan=None, cid=None):