diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 7a05c070..15bf798b 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -75,6 +75,7 @@ async def stream_quotes( tickers2chans: {str: Channel}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue + cid: str = None, ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` per second. @@ -82,6 +83,7 @@ async def stream_quotes( A broker-client ``quoter`` async context manager must be provided which returns an async quote function. """ + packet = {'yield': {}, 'cid': cid} broker_limit = getattr(brokermod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit @@ -108,7 +110,7 @@ async def stream_quotes( quotes = await wait_for_network(partial(get_quotes, tickers)) postquote_start = time.time() - q_payloads = {} + chan_payloads = {} for symbol, quote in quotes.items(): if diff_cached: # if cache is enabled then only deliver "new" changes @@ -119,14 +121,15 @@ async def stream_quotes( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote for chan in tickers2chans[symbol]: - q_payloads.setdefault(chan, {})[symbol] = quote + chan_payloads.setdefault( + chan, packet.copy())['yield'][symbol] = quote else: for chan in tickers2chans[symbol]: - q_payloads.setdefault(chan, {})[symbol] = quote - + chan_payloads.setdefault( + chan, packet.copy())['yield'][symbol] = quote # deliver to each subscriber - if q_payloads: - for chan, payload in q_payloads.items(): + if chan_payloads: + for chan, payload in chan_payloads.items(): try: await chan.send(payload) except ( @@ -134,7 +137,7 @@ async def stream_quotes( trio.ClosedStreamError, ConnectionResetError, ConnectionRefusedError, ): - log.warn(f"{chan.addr} went down?") + log.warn(f"{chan.raddr} went down?") for qset in tickers2chans.values(): qset.discard(chan) @@ -275,13 +278,13 @@ async def _brokerd_main(host) -> None: start_quoter, broker2tickersubs, clients, dtasks, nursery ), - 1616, host=host, + 1617, host=host, ) ) log.debug(f"Spawned {listeners}") -async def _test_price_stream(broker, symbols, *, chan=None): +async def _test_price_stream(broker, symbols, *, chan=None, cid=None): """Test function for initial tractor draft. """ brokermod = get_brokermod(broker) @@ -292,14 +295,8 @@ async def _test_price_stream(broker, symbols, *, chan=None): assert chan tickers2chans = {}.fromkeys(symbols, {chan, }) - async def terminate_on_None(chan, nursery): - val = await chan.recv() - if val is None: - log.info("Got terminate sentinel!") - nursery.cancel_scope.cancel() - async with trio.open_nursery() as nursery: nursery.start_soon( - stream_quotes, brokermod, get_quotes, tickers2chans) - # nursery.start_soon( - # terminate_on_None, chan, nursery) + partial( + stream_quotes, brokermod, get_quotes, tickers2chans, cid=cid) + )