diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 15bf798b..233cf71a 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -10,9 +10,12 @@ from typing import Coroutine, Callable import trio +from .. import tractor from ..log import get_logger -from ..ipc import StreamQueue, Channel +from ..ipc import Channel from . import get_brokermod + + log = get_logger('broker.core') @@ -138,8 +141,8 @@ async def stream_quotes( ConnectionRefusedError, ): log.warn(f"{chan.raddr} went down?") - for qset in tickers2chans.values(): - qset.discard(chan) + for chanset in tickers2chans.values(): + chanset.discard(chan) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -155,12 +158,97 @@ async def stream_quotes( await trio.sleep(delay) -async def start_quoter( - broker2tickersubs: dict, - clients: dict, - dtasks: set, # daemon task registry - nursery: 'Nursery', - stream: trio.SocketStream, +async def get_cached_client(broker, tickers): + """Get the current actor's cached broker client if available or create a + new one. + """ + # check if a cached client is in the local actor's statespace + clients = tractor.current_actor().statespace.setdefault('clients', {}) + if clients: + return clients[broker] + else: + log.info(f"Creating new client for broker {broker}") + brokermod = get_brokermod(broker) + # TODO: move to AsyncExitStack in 3.7 + client_cntxmng = brokermod.get_client() + client = await client_cntxmng.__aenter__() + get_quotes = await brokermod.quoter(client, tickers) + clients[broker] = ( + brokermod, client, client_cntxmng, get_quotes) + + return brokermod, client, client_cntxmng, get_quotes + + +async def symbol_data(broker, tickers): + """Retrieve baseline symbol info from broker. + """ + _, client, _, get_quotes = await get_cached_client(broker, tickers) + return await client.symbol_data(tickers) + + +async def smoke_quote(get_quotes, tickers, broker): + """Do an initial "smoke" request for symbols in ``tickers`` filtering + oout any symbols not supported by the broker queried in the call to + ``get_quotes()``. + """ + # TODO: trim out with #37 + ################################################# + # get a single quote filtering out any bad tickers + # NOTE: this code is always run for every new client + # subscription even when a broker quoter task is already running + # since the new client needs to know what symbols are accepted + log.warn(f"Retrieving smoke quote for symbols {tickers}") + quotes = await get_quotes(tickers) + # report any tickers that aren't returned in the first quote + invalid_tickers = set(tickers) - set(quotes) + for symbol in invalid_tickers: + tickers.remove(symbol) + log.warn( + f"Symbol `{symbol}` not found by broker `{broker}`" + ) + + # pop any tickers that return "empty" quotes + payload = {} + for symbol, quote in quotes.items(): + if quote is None: + log.warn( + f"Symbol `{symbol}` not found by broker" + f" `{broker}`") + # XXX: not this mutates the input list (for now) + tickers.remove(symbol) + continue + payload[symbol] = quote + + return payload + + # end of section to be trimmed out with #37 + ########################################### + + +def modify_quote_stream(broker, tickers, chan=None, cid=None): + """Absolute symbol subscription list for each quote stream. + """ + log.info(f"{chan} changed symbol subscription to {tickers}") + ss = tractor.current_actor().statespace + broker2tickersubs = ss['broker2tickersubs'] + tickers2chans = broker2tickersubs.get(broker) + # update map from each symbol to requesting client's chan + for ticker in tickers: + chanset = tickers2chans.setdefault(ticker, set()) + if chan not in chanset: + chanset.add(chan) + + for ticker in filter(lambda ticker: ticker not in tickers, tickers2chans): + chanset = tickers2chans.get(ticker) + if chanset and chan in chanset: + chanset.discard(chan) + + +async def start_quote_stream( + broker: str, + tickers: [str], + chan: 'Channel' = None, + cid: str = None, ) -> None: """Handle per-broker quote stream subscriptions. @@ -168,120 +256,56 @@ async def start_quoter( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - chan = Channel(stream=stream) - log.info(f"Accepted new connection from {chan.raddr}") - async with chan.squeue.stream: - async for broker, tickers in chan: - tickers = list(tickers) - log.info( - f"{chan.raddr} subscribed to {broker} for tickers {tickers}") + # pull global vars from local actor + ss = tractor.current_actor().statespace + broker2tickersubs = ss['broker2tickersubs'] + clients = ss['clients'] + dtasks = ss['dtasks'] + tickers = list(tickers) + log.info( + f"{chan.uid} subscribed to {broker} for tickers {tickers}") - if broker not in broker2tickersubs: - brokermod = get_brokermod(broker) + brokermod, client, _, get_quotes = await get_cached_client(broker, tickers) + if broker not in broker2tickersubs: + tickers2chans = broker2tickersubs.setdefault(broker, {}) + else: + log.info(f"Subscribing with existing `{broker}` daemon") + tickers2chans = broker2tickersubs[broker] - # TODO: move to AsyncExitStack in 3.7 - client_cntxmng = brokermod.get_client() - client = await client_cntxmng.__aenter__() - get_quotes = await brokermod.quoter(client, tickers) - clients[broker] = ( - brokermod, client, client_cntxmng, get_quotes) - tickers2chans = broker2tickersubs.setdefault(broker, {}) - else: - log.info(f"Subscribing with existing `{broker}` daemon") - brokermod, client, _, get_quotes = clients[broker] - tickers2chans = broker2tickersubs[broker] + # do a smoke quote (not this mutates the input list and filters out bad + # symbols for now) + payload = await smoke_quote(get_quotes, tickers, broker) + # push initial smoke quote response for client initialization + await chan.send({'yield': payload, 'cid': cid}) - # beginning of section to be trimmed out with #37 - ################################################# - # get a single quote filtering out any bad tickers - # NOTE: this code is always run for every new client - # subscription even when a broker quoter task is already running - # since the new client needs to know what symbols are accepted - log.warn(f"Retrieving smoke quote for {chan.raddr}") - quotes = await get_quotes(tickers) - # report any tickers that aren't returned in the first quote - invalid_tickers = set(tickers) - set(quotes) - for symbol in invalid_tickers: - tickers.remove(symbol) - log.warn( - f"Symbol `{symbol}` not found by broker `{brokermod.name}`" - ) + # update map from each symbol to requesting client's chan + modify_quote_stream(broker, tickers, chan=chan, cid=cid) - # pop any tickers that return "empty" quotes - payload = {} - for symbol, quote in quotes.items(): - if quote is None: - log.warn( - f"Symbol `{symbol}` not found by broker" - f" `{brokermod.name}`") - tickers.remove(symbol) - continue - payload[symbol] = quote - - # end of section to be trimmed out with #37 - ########################################### - - # first respond with symbol data for all tickers (allows - # clients to receive broker specific setup info) - sd = await client.symbol_data(tickers) - assert sd, "No symbol data could be found?" - await chan.send(sd) - - # update map from each symbol to requesting client's chan - for ticker in tickers: - tickers2chans.setdefault(ticker, set()).add(chan) - - # push initial quotes response for client initialization - await chan.send(payload) - - if broker not in dtasks: # no quoter task yet - # task should begin on the next checkpoint/iteration - log.info(f"Spawning quoter task for {brokermod.name}") - nursery.start_soon( - stream_quotes, brokermod, get_quotes, tickers2chans) - dtasks.add(broker) - - log.debug("Waiting on subscription request") - else: - log.info(f"client @ {chan.raddr} disconnected") - # drop any lingering subscriptions - for ticker, qset in tickers2chans.items(): - qset.discard(chan) - - # if there are no more subscriptions with this broker - # drop from broker subs dict - if not any(tickers2chans.values()): - log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker, None) - dtasks.discard(broker) - - # TODO: move to AsyncExitStack in 3.7 - for _, _, cntxmng, _ in clients.values(): - # FIXME: yes I know it's totally wrong... - await cntxmng.__aexit__(None, None, None) - - -async def _brokerd_main(host) -> None: - """Entry point for the broker daemon which waits for connections - before spawning micro-services. - """ - # global space for broker-daemon subscriptions - broker2tickersubs = {} - clients = {} - dtasks = set() - - async with trio.open_nursery() as nursery: - listeners = await nursery.start( - partial( - trio.serve_tcp, - partial( - start_quoter, broker2tickersubs, clients, - dtasks, nursery - ), - 1617, host=host, + if broker not in dtasks: # no quoter task yet + # task should begin on the next checkpoint/iteration + log.info(f"Spawning quoter task for {brokermod.name}") + async with trio.open_nursery() as nursery: + nursery.start_soon(partial( + stream_quotes, brokermod, get_quotes, tickers2chans, + cid=cid) ) - ) - log.debug(f"Spawned {listeners}") + dtasks.add(broker) + # unblocks when no more symbols subscriptions exist and the + # quote streamer task terminates (usually because another call + # was made to `modify_quoter` to unsubscribe from streaming + # symbols) + + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(tickers2chans.values()): + log.info(f"No more subscriptions for {broker}") + broker2tickersubs.pop(broker, None) + dtasks.discard(broker) + + # TODO: move to AsyncExitStack in 3.7 + for _, _, cntxmng, _ in clients.values(): + # FIXME: yes I know there's no error handling.. + await cntxmng.__aexit__(None, None, None) async def _test_price_stream(broker, symbols, *, chan=None, cid=None):