`.data._sampling`: warn about subscriber-less msgs
Since it usually means the data-provider backend is keying the msgs incorrectly (not using the equivalent `MktPair.bs_fqme` which as would be rendered from the delivered `FeedInit.mkt` instance..) and reporting the subs list should make it clear how the fqme matching is off. Deats, - use the new `.log.mk_repr()` for a formatter. - add a commented info emission that can be unmasked to help debug any such cases as mentioned in the summary ^^.pull/20/head
							parent
							
								
									fe422bfcb0
								
							
						
					
					
						commit
						e4b670f6f0
					
				|  | @ -30,6 +30,7 @@ import time | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     AsyncIterator, |     AsyncIterator, | ||||||
|  |     Callable, | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -54,6 +55,9 @@ from ._util import ( | ||||||
|     get_console_log, |     get_console_log, | ||||||
| ) | ) | ||||||
| from ..service import maybe_spawn_daemon | from ..service import maybe_spawn_daemon | ||||||
|  | from piker.log import ( | ||||||
|  |     mk_repr, | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|     from ._sharedmem import ( |     from ._sharedmem import ( | ||||||
|  | @ -575,7 +579,6 @@ async def open_sample_stream( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def sample_and_broadcast( | async def sample_and_broadcast( | ||||||
| 
 |  | ||||||
|     bus: _FeedsBus,  # noqa |     bus: _FeedsBus,  # noqa | ||||||
|     rt_shm: ShmArray, |     rt_shm: ShmArray, | ||||||
|     hist_shm: ShmArray, |     hist_shm: ShmArray, | ||||||
|  | @ -596,11 +599,22 @@ async def sample_and_broadcast( | ||||||
| 
 | 
 | ||||||
|     overruns = Counter() |     overruns = Counter() | ||||||
| 
 | 
 | ||||||
|  |     # multiline nested `dict` formatter (since rn quote-msgs are | ||||||
|  |     # just that). | ||||||
|  |     pfmt: Callable[[str], str] = mk_repr() | ||||||
|  | 
 | ||||||
|     # iterate stream delivered by broker |     # iterate stream delivered by broker | ||||||
|     async for quotes in quote_stream: |     async for quotes in quote_stream: | ||||||
|         # print(quotes) |  | ||||||
| 
 | 
 | ||||||
|         # TODO: ``numba`` this! |         # XXX WARNING XXX only enable for debugging bc ow can cost | ||||||
|  |         # ALOT of perf with HF-feedz!!! | ||||||
|  |         # | ||||||
|  |         # log.info( | ||||||
|  |         #     'Rx live quotes:\n' | ||||||
|  |         #     f'{pfmt(quotes)}' | ||||||
|  |         # ) | ||||||
|  | 
 | ||||||
|  |         # TODO: `numba` this! | ||||||
|         for broker_symbol, quote in quotes.items(): |         for broker_symbol, quote in quotes.items(): | ||||||
|             # TODO: in theory you can send the IPC msg *before* writing |             # TODO: in theory you can send the IPC msg *before* writing | ||||||
|             # to the sharedmem array to decrease latency, however, that |             # to the sharedmem array to decrease latency, however, that | ||||||
|  | @ -673,6 +687,18 @@ async def sample_and_broadcast( | ||||||
|             sub_key: str = broker_symbol.lower() |             sub_key: str = broker_symbol.lower() | ||||||
|             subs: set[Sub] = bus.get_subs(sub_key) |             subs: set[Sub] = bus.get_subs(sub_key) | ||||||
| 
 | 
 | ||||||
|  |             if not subs: | ||||||
|  |                 all_bs_fqmes: list[str] = list( | ||||||
|  |                     bus._subscribers.keys() | ||||||
|  |                 ) | ||||||
|  |                 log.warning( | ||||||
|  |                     f'No subscribers for {brokername!r} live-quote ??\n' | ||||||
|  |                     f'broker_symbol: {broker_symbol}\n\n' | ||||||
|  | 
 | ||||||
|  |                     f'Maybe the backend-sys symbol does not match one of,\n' | ||||||
|  |                     f'{pfmt(all_bs_fqmes)}\n' | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|             # NOTE: by default the broker backend doesn't append |             # NOTE: by default the broker backend doesn't append | ||||||
|             # it's own "name" into the fqme schema (but maybe it |             # it's own "name" into the fqme schema (but maybe it | ||||||
|             # should?) so we have to manually generate the correct |             # should?) so we have to manually generate the correct | ||||||
|  |  | ||||||
|  | @ -540,7 +540,10 @@ async def open_feed_bus( | ||||||
|         # subscription since the backend isn't (yet) expected to |         # subscription since the backend isn't (yet) expected to | ||||||
|         # append it's own name to the fqme, so we filter on keys |         # append it's own name to the fqme, so we filter on keys | ||||||
|         # which *do not* include that name (e.g .ib) . |         # which *do not* include that name (e.g .ib) . | ||||||
|         bus._subscribers.setdefault(bs_fqme, set()) |         bus._subscribers.setdefault( | ||||||
|  |             bs_fqme, | ||||||
|  |             set(), | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|     # sync feed subscribers with flume handles |     # sync feed subscribers with flume handles | ||||||
|     await ctx.started( |     await ctx.started( | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue