From e4b670f6f0d737c954b3b0130f7b7a27be378c51 Mon Sep 17 00:00:00 2001
From: Nelson Torres <nelson.torres.alvarado1@gmail.com>
Date: Wed, 29 Jan 2025 23:49:09 +0000
Subject: [PATCH] `.data._sampling`: warn about subscriber-less msgs

Since it usually means the data-provider backend is keying the msgs
incorrectly (not using the equivalent `MktPair.bs_fqme` which as
would be rendered from the delivered `FeedInit.mkt` instance..) and
reporting the subs list should make it clear how the fqme matching is
off.

Deats,
- use the new `.log.mk_repr()` for a formatter.
- add a commented info emission that can be unmasked to help debug any
  such cases as mentioned in the summary ^^.
---
 piker/data/_sampling.py | 32 +++++++++++++++++++++++++++++---
 piker/data/feed.py      |  5 ++++-
 2 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py
index 83cd363d..74e6410a 100644
--- a/piker/data/_sampling.py
+++ b/piker/data/_sampling.py
@@ -30,6 +30,7 @@ import time
 from typing import (
     Any,
     AsyncIterator,
+    Callable,
     TYPE_CHECKING,
 )
 
@@ -54,6 +55,9 @@ from ._util import (
     get_console_log,
 )
 from ..service import maybe_spawn_daemon
+from piker.log import (
+    mk_repr,
+)
 
 if TYPE_CHECKING:
     from ._sharedmem import (
@@ -575,7 +579,6 @@ async def open_sample_stream(
 
 
 async def sample_and_broadcast(
-
     bus: _FeedsBus,  # noqa
     rt_shm: ShmArray,
     hist_shm: ShmArray,
@@ -596,11 +599,22 @@ async def sample_and_broadcast(
 
     overruns = Counter()
 
+    # multiline nested `dict` formatter (since rn quote-msgs are
+    # just that).
+    pfmt: Callable[[str], str] = mk_repr()
+
     # iterate stream delivered by broker
     async for quotes in quote_stream:
-        # print(quotes)
 
-        # TODO: ``numba`` this!
+        # XXX WARNING XXX only enable for debugging bc ow can cost
+        # ALOT of perf with HF-feedz!!!
+        #
+        # log.info(
+        #     'Rx live quotes:\n'
+        #     f'{pfmt(quotes)}'
+        # )
+
+        # TODO: `numba` this!
         for broker_symbol, quote in quotes.items():
             # TODO: in theory you can send the IPC msg *before* writing
             # to the sharedmem array to decrease latency, however, that
@@ -673,6 +687,18 @@ async def sample_and_broadcast(
             sub_key: str = broker_symbol.lower()
             subs: set[Sub] = bus.get_subs(sub_key)
 
+            if not subs:
+                all_bs_fqmes: list[str] = list(
+                    bus._subscribers.keys()
+                )
+                log.warning(
+                    f'No subscribers for {brokername!r} live-quote ??\n'
+                    f'broker_symbol: {broker_symbol}\n\n'
+
+                    f'Maybe the backend-sys symbol does not match one of,\n'
+                    f'{pfmt(all_bs_fqmes)}\n'
+                )
+
             # NOTE: by default the broker backend doesn't append
             # it's own "name" into the fqme schema (but maybe it
             # should?) so we have to manually generate the correct
diff --git a/piker/data/feed.py b/piker/data/feed.py
index 7264c8e6..20f7da30 100644
--- a/piker/data/feed.py
+++ b/piker/data/feed.py
@@ -540,7 +540,10 @@ async def open_feed_bus(
         # subscription since the backend isn't (yet) expected to
         # append it's own name to the fqme, so we filter on keys
         # which *do not* include that name (e.g .ib) .
-        bus._subscribers.setdefault(bs_fqme, set())
+        bus._subscribers.setdefault(
+            bs_fqme,
+            set(),
+        )
 
     # sync feed subscribers with flume handles
     await ctx.started(