Filter out bad symbols before adding client subscription

Event if a broker client is already spawned new clients should still
receive a detailed symbol data packet as the first response. Avoid
exposing the new client's queue to the broker (i.e. subscribing it for
quotes) until after first pushing this packet with all bad symbols
filtered out.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-05-08 22:48:06 -04:00
parent 3a40c2f8fe
commit 3646fb4a23
1 changed files with 20 additions and 20 deletions

View File

@ -306,15 +306,11 @@ async def start_quoter(
get_quotes = await brokermod.quoter(client, tickers) get_quotes = await brokermod.quoter(client, tickers)
clients[broker] = ( clients[broker] = (
brokermod, client, client_cntxmng, get_quotes) brokermod, client, client_cntxmng, get_quotes)
tickers2qs = broker2tickersubs.setdefault( tickers2qs = broker2tickersubs.setdefault(broker, {})
broker, {}.fromkeys(tickers, {queue, }))
else: else:
log.info(f"Subscribing with existing `{broker}` daemon") log.info(f"Subscribing with existing `{broker}` daemon")
brokermod, client, _, get_quotes = clients[broker] brokermod, client, _, get_quotes = clients[broker]
tickers2qs = broker2tickersubs[broker] tickers2qs = broker2tickersubs[broker]
# update map from each symbol to requesting client's queue
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# beginning of section to be trimmed out with #37 # beginning of section to be trimmed out with #37
################################################# #################################################
@ -324,19 +320,13 @@ async def start_quoter(
# since the new client needs to know what symbols are accepted # since the new client needs to know what symbols are accepted
log.warn(f"Retrieving smoke quote for {queue.peer}") log.warn(f"Retrieving smoke quote for {queue.peer}")
quotes = await get_quotes(tickers) quotes = await get_quotes(tickers)
# pop any tickers that aren't returned in the first quote # report any tickers that aren't returned in the first quote
valid_tickers = set(tickers) - set(quotes) invalid_tickers = set(tickers) - set(quotes)
for ticker in valid_tickers: for symbol in invalid_tickers:
tickers.remove(symbol)
log.warn( log.warn(
f"Symbol `{ticker}` not found by broker `{brokermod.name}`" f"Symbol `{symbol}` not found by broker `{brokermod.name}`"
) )
tickers2qs.pop(ticker, None)
# first respond with symbol data for all tickers (allows
# clients to receive broker specific setup info)
sd = await client.symbol_data(tickers)
assert sd, "No symbol data could be found?"
await queue.put(sd)
# pop any tickers that return "empty" quotes # pop any tickers that return "empty" quotes
payload = {} payload = {}
@ -345,16 +335,26 @@ async def start_quoter(
log.warn( log.warn(
f"Symbol `{symbol}` not found by broker" f"Symbol `{symbol}` not found by broker"
f" `{brokermod.name}`") f" `{brokermod.name}`")
tickers2qs.pop(symbol, None) tickers.remove(symbol)
continue continue
payload[symbol] = quote payload[symbol] = quote
# push initial quotes response for client initialization
await queue.put(payload)
# end of section to be trimmed out with #37 # end of section to be trimmed out with #37
########################################### ###########################################
# first respond with symbol data for all tickers (allows
# clients to receive broker specific setup info)
sd = await client.symbol_data(tickers)
assert sd, "No symbol data could be found?"
await queue.put(sd)
# update map from each symbol to requesting client's queue
for ticker in tickers:
tickers2qs.setdefault(ticker, set()).add(queue)
# push initial quotes response for client initialization
await queue.put(payload)
if broker not in dtasks: # no quoter task yet if broker not in dtasks: # no quoter task yet
# task should begin on the next checkpoint/iteration # task should begin on the next checkpoint/iteration
log.info(f"Spawning quoter task for {brokermod.name}") log.info(f"Spawning quoter task for {brokermod.name}")