diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 83cd363d..74e6410a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -30,6 +30,7 @@ import time from typing import ( Any, AsyncIterator, + Callable, TYPE_CHECKING, ) @@ -54,6 +55,9 @@ from ._util import ( get_console_log, ) from ..service import maybe_spawn_daemon +from piker.log import ( + mk_repr, +) if TYPE_CHECKING: from ._sharedmem import ( @@ -575,7 +579,6 @@ async def open_sample_stream( async def sample_and_broadcast( - bus: _FeedsBus, # noqa rt_shm: ShmArray, hist_shm: ShmArray, @@ -596,11 +599,22 @@ async def sample_and_broadcast( overruns = Counter() + # multiline nested `dict` formatter (since rn quote-msgs are + # just that). + pfmt: Callable[[str], str] = mk_repr() + # iterate stream delivered by broker async for quotes in quote_stream: - # print(quotes) - # TODO: ``numba`` this! + # XXX WARNING XXX only enable for debugging bc ow can cost + # ALOT of perf with HF-feedz!!! + # + # log.info( + # 'Rx live quotes:\n' + # f'{pfmt(quotes)}' + # ) + + # TODO: `numba` this! for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -673,6 +687,18 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) + if not subs: + all_bs_fqmes: list[str] = list( + bus._subscribers.keys() + ) + log.warning( + f'No subscribers for {brokername!r} live-quote ??\n' + f'broker_symbol: {broker_symbol}\n\n' + + f'Maybe the backend-sys symbol does not match one of,\n' + f'{pfmt(all_bs_fqmes)}\n' + ) + # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct diff --git a/piker/data/feed.py b/piker/data/feed.py index 7264c8e6..20f7da30 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -540,7 +540,10 @@ async def open_feed_bus( # subscription since the backend isn't (yet) expected to # append it's own name to the fqme, so we filter on keys # which *do not* include that name (e.g .ib) . - bus._subscribers.setdefault(bs_fqme, set()) + bus._subscribers.setdefault( + bs_fqme, + set(), + ) # sync feed subscribers with flume handles await ctx.started(