From 1616cc0e8202e9e7aa931eab5139415e93430d65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 21:09:09 -0500 Subject: [PATCH] `.data._sampling`: warn about subscriber-less msgs Since it usually means the data-provider backend is keying the msgs incorrectly (not using the equivalent `MktPair.bs_fqme` which as would be rendered from the delivered `FeedInit.mkt` instance..) and reporting the subs list should make it clear how the fqme matching is off. Deats, - use the new `.log.mk_repr()` for a formatter. - add a commented info emission that can be unmasked to help debug any such cases as mentioned in the summary ^^. --- piker/data/_sampling.py | 32 +++++++++++++++++++++++++++++--- piker/data/feed.py | 5 ++++- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 83cd363d..74e6410a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -30,6 +30,7 @@ import time from typing import ( Any, AsyncIterator, + Callable, TYPE_CHECKING, ) @@ -54,6 +55,9 @@ from ._util import ( get_console_log, ) from ..service import maybe_spawn_daemon +from piker.log import ( + mk_repr, +) if TYPE_CHECKING: from ._sharedmem import ( @@ -575,7 +579,6 @@ async def open_sample_stream( async def sample_and_broadcast( - bus: _FeedsBus, # noqa rt_shm: ShmArray, hist_shm: ShmArray, @@ -596,11 +599,22 @@ async def sample_and_broadcast( overruns = Counter() + # multiline nested `dict` formatter (since rn quote-msgs are + # just that). + pfmt: Callable[[str], str] = mk_repr() + # iterate stream delivered by broker async for quotes in quote_stream: - # print(quotes) - # TODO: ``numba`` this! + # XXX WARNING XXX only enable for debugging bc ow can cost + # ALOT of perf with HF-feedz!!! + # + # log.info( + # 'Rx live quotes:\n' + # f'{pfmt(quotes)}' + # ) + + # TODO: `numba` this! for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -673,6 +687,18 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) + if not subs: + all_bs_fqmes: list[str] = list( + bus._subscribers.keys() + ) + log.warning( + f'No subscribers for {brokername!r} live-quote ??\n' + f'broker_symbol: {broker_symbol}\n\n' + + f'Maybe the backend-sys symbol does not match one of,\n' + f'{pfmt(all_bs_fqmes)}\n' + ) + # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct diff --git a/piker/data/feed.py b/piker/data/feed.py index 7264c8e6..20f7da30 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -540,7 +540,10 @@ async def open_feed_bus( # subscription since the backend isn't (yet) expected to # append it's own name to the fqme, so we filter on keys # which *do not* include that name (e.g .ib) . - bus._subscribers.setdefault(bs_fqme, set()) + bus._subscribers.setdefault( + bs_fqme, + set(), + ) # sync feed subscribers with flume handles await ctx.started(