diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 928b3694..16c6b017 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -181,6 +181,7 @@ async def sample_and_broadcast( bus: '_FeedsBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, + brokername: str, sum_tick_vlm: bool = True, ) -> None: @@ -190,8 +191,7 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: # TODO: ``numba`` this! - for sym, quote in quotes.items(): - + for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that # will require at least some way to prevent task switching @@ -255,7 +255,13 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus._subscribers[sym.lower()] + subs = bus._subscribers[broker_symbol.lower()] + + # NOTE: by default the broker backend doesn't append + # it's own "name" into the fqsn schema (but maybe it + # should?) so we have to manually generate the correct + # key here. + bsym = f'{broker_symbol}.{brokername}' lags = 0 for (stream, tick_throttle) in subs: @@ -266,7 +272,9 @@ async def sample_and_broadcast( # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. try: - stream.send_nowait((sym, quote)) + stream.send_nowait( + (bsym, quote) + ) except trio.WouldBlock: ctx = getattr(stream, '_ctx', None) if ctx: @@ -280,7 +288,9 @@ async def sample_and_broadcast( f'feed @ {tick_throttle} Hz' ) else: - await stream.send({sym: quote}) + await stream.send( + {bsym: quote} + ) if cs.cancelled_caught: lags += 1