From 3646fb4a233afc6f8ba19f4c1221b1df16698414 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 May 2018 22:48:06 -0400 Subject: [PATCH] Filter out bad symbols before adding client subscription Event if a broker client is already spawned new clients should still receive a detailed symbol data packet as the first response. Avoid exposing the new client's queue to the broker (i.e. subscribing it for quotes) until after first pushing this packet with all bad symbols filtered out. --- piker/brokers/core.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e521477e..90b9420c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -306,15 +306,11 @@ async def start_quoter( get_quotes = await brokermod.quoter(client, tickers) clients[broker] = ( brokermod, client, client_cntxmng, get_quotes) - tickers2qs = broker2tickersubs.setdefault( - broker, {}.fromkeys(tickers, {queue, })) + tickers2qs = broker2tickersubs.setdefault(broker, {}) else: log.info(f"Subscribing with existing `{broker}` daemon") brokermod, client, _, get_quotes = clients[broker] tickers2qs = broker2tickersubs[broker] - # update map from each symbol to requesting client's queue - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) # beginning of section to be trimmed out with #37 ################################################# @@ -324,19 +320,13 @@ async def start_quoter( # since the new client needs to know what symbols are accepted log.warn(f"Retrieving smoke quote for {queue.peer}") quotes = await get_quotes(tickers) - # pop any tickers that aren't returned in the first quote - valid_tickers = set(tickers) - set(quotes) - for ticker in valid_tickers: + # report any tickers that aren't returned in the first quote + invalid_tickers = set(tickers) - set(quotes) + for symbol in invalid_tickers: + tickers.remove(symbol) log.warn( - f"Symbol `{ticker}` not found by broker `{brokermod.name}`" + f"Symbol `{symbol}` not found by broker `{brokermod.name}`" ) - tickers2qs.pop(ticker, None) - - # first respond with symbol data for all tickers (allows - # clients to receive broker specific setup info) - sd = await client.symbol_data(tickers) - assert sd, "No symbol data could be found?" - await queue.put(sd) # pop any tickers that return "empty" quotes payload = {} @@ -345,16 +335,26 @@ async def start_quoter( log.warn( f"Symbol `{symbol}` not found by broker" f" `{brokermod.name}`") - tickers2qs.pop(symbol, None) + tickers.remove(symbol) continue payload[symbol] = quote - # push initial quotes response for client initialization - await queue.put(payload) - # end of section to be trimmed out with #37 ########################################### + # first respond with symbol data for all tickers (allows + # clients to receive broker specific setup info) + sd = await client.symbol_data(tickers) + assert sd, "No symbol data could be found?" + await queue.put(sd) + + # update map from each symbol to requesting client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + + # push initial quotes response for client initialization + await queue.put(payload) + if broker not in dtasks: # no quoter task yet # task should begin on the next checkpoint/iteration log.info(f"Spawning quoter task for {brokermod.name}")