diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 233cf71a..506e1869 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -86,7 +86,6 @@ async def stream_quotes( A broker-client ``quoter`` async context manager must be provided which returns an async quote function. """ - packet = {'yield': {}, 'cid': cid} broker_limit = getattr(brokermod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit @@ -123,13 +122,18 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - for chan in tickers2chans[symbol]: + for chan, cid in tickers2chans.get(symbol, set()): chan_payloads.setdefault( - chan, packet.copy())['yield'][symbol] = quote + chan, + {'yield': {}, 'cid': cid} + )['yield'][symbol] = quote else: - for chan in tickers2chans[symbol]: + for chan, cid in tickers2chans[symbol]: chan_payloads.setdefault( - chan, packet.copy())['yield'][symbol] = quote + chan, + {'yield': {}, 'cid': cid} + )['yield'][symbol] = quote + # deliver to each subscriber if chan_payloads: for chan, payload in chan_payloads.items(): @@ -140,9 +144,9 @@ async def stream_quotes( trio.ClosedStreamError, ConnectionResetError, ConnectionRefusedError, ): - log.warn(f"{chan.raddr} went down?") + log.warn(f"{chan} went down?") for chanset in tickers2chans.values(): - chanset.discard(chan) + chanset.discard((chan, cid)) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -157,6 +161,8 @@ async def stream_quotes( log.debug(f"Sleeping for {delay}") await trio.sleep(delay) + log.info(f"Terminating stream quoter task for {brokermod.name}") + async def get_cached_client(broker, tickers): """Get the current actor's cached broker client if available or create a @@ -164,9 +170,9 @@ async def get_cached_client(broker, tickers): """ # check if a cached client is in the local actor's statespace clients = tractor.current_actor().statespace.setdefault('clients', {}) - if clients: + try: return clients[broker] - else: + except KeyError: log.info(f"Creating new client for broker {broker}") brokermod = get_brokermod(broker) # TODO: move to AsyncExitStack in 3.7 @@ -188,7 +194,7 @@ async def symbol_data(broker, tickers): async def smoke_quote(get_quotes, tickers, broker): """Do an initial "smoke" request for symbols in ``tickers`` filtering - oout any symbols not supported by the broker queried in the call to + out any symbols not supported by the broker queried in the call to ``get_quotes()``. """ # TODO: trim out with #37 @@ -234,14 +240,18 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): tickers2chans = broker2tickersubs.get(broker) # update map from each symbol to requesting client's chan for ticker in tickers: - chanset = tickers2chans.setdefault(ticker, set()) - if chan not in chanset: - chanset.add(chan) + tickers2chans.setdefault(ticker, set()).add((chan, cid)) - for ticker in filter(lambda ticker: ticker not in tickers, tickers2chans): + for ticker in filter( + lambda ticker: ticker not in tickers, tickers2chans.copy() + ): chanset = tickers2chans.get(ticker) - if chanset and chan in chanset: - chanset.discard(chan) + if chanset: + chanset.discard((chan, cid)) + + if not chanset: + # pop empty sets which will trigger bg quoter task termination + tickers2chans.pop(ticker) async def start_quote_stream( @@ -281,31 +291,36 @@ async def start_quote_stream( # update map from each symbol to requesting client's chan modify_quote_stream(broker, tickers, chan=chan, cid=cid) - if broker not in dtasks: # no quoter task yet - # task should begin on the next checkpoint/iteration - log.info(f"Spawning quoter task for {brokermod.name}") - async with trio.open_nursery() as nursery: - nursery.start_soon(partial( - stream_quotes, brokermod, get_quotes, tickers2chans, - cid=cid) - ) - dtasks.add(broker) - # unblocks when no more symbols subscriptions exist and the - # quote streamer task terminates (usually because another call - # was made to `modify_quoter` to unsubscribe from streaming - # symbols) + try: + if broker not in dtasks: # no quoter task yet + # task should begin on the next checkpoint/iteration + # with trio.open_cancel_scope(shield=True): + log.info(f"Spawning quoter task for {brokermod.name}") + # await actor._root_nursery.start(partial( + async with trio.open_nursery() as nursery: + nursery.start_soon(partial( + stream_quotes, brokermod, get_quotes, tickers2chans, + cid=cid) + ) + dtasks.add(broker) - # if there are truly no more subscriptions with this broker - # drop from broker subs dict - if not any(tickers2chans.values()): - log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker, None) - dtasks.discard(broker) + # unblocks when no more symbols subscriptions exist and the + # quote streamer task terminates (usually because another call + # was made to `modify_quoter` to unsubscribe from streaming + # symbols) + log.info(f"Terminated quoter task for {brokermod.name}") - # TODO: move to AsyncExitStack in 3.7 - for _, _, cntxmng, _ in clients.values(): - # FIXME: yes I know there's no error handling.. - await cntxmng.__aexit__(None, None, None) + # TODO: move to AsyncExitStack in 3.7 + for _, _, cntxmng, _ in clients.values(): + # FIXME: yes I know there's no error handling.. + await cntxmng.__aexit__(None, None, None) + finally: + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(tickers2chans.values()): + log.info(f"No more subscriptions for {broker}") + broker2tickersubs.pop(broker, None) + dtasks.discard(broker) async def _test_price_stream(broker, symbols, *, chan=None, cid=None): @@ -317,7 +332,7 @@ async def _test_price_stream(broker, symbols, *, chan=None, cid=None): get_quotes = await brokermod.quoter(client, symbols) log.info(f"Spawning quoter task for {brokermod.name}") assert chan - tickers2chans = {}.fromkeys(symbols, {chan, }) + tickers2chans = {}.fromkeys(symbols, {(chan, cid), }) async with trio.open_nursery() as nursery: nursery.start_soon(