diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7e6fef54..0bb9a247 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -95,6 +95,12 @@ class Sampler: # history loading. incr_task_cs: trio.CancelScope | None = None + bcast_errors: tuple[Exception] = ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.EndOfChannel, + ) + # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. @@ -258,14 +264,15 @@ class Sampler: subs: set last_ts, subs = pair - task = trio.lowlevel.current_task() - log.debug( - f'SUBS {self.subscribers}\n' - f'PAIR {pair}\n' - f'TASK: {task}: {id(task)}\n' - f'broadcasting {period_s} -> {last_ts}\n' - # f'consumers: {subs}' - ) + # NOTE, for debugging pub-sub issues + # task = trio.lowlevel.current_task() + # log.debug( + # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' + # f'PAIR: {pair}\n' + # f'TASK: {task}: {id(task)}\n' + # f'broadcasting {period_s} -> {last_ts}\n' + # f'consumers: {subs}' + # ) borked: set[MsgStream] = set() sent: set[MsgStream] = set() while True: @@ -282,12 +289,11 @@ class Sampler: await stream.send(msg) sent.add(stream) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): + except self.bcast_errors as err: log.error( - f'{stream._ctx.chan.uid} dropped connection' + f'Connection dropped for IPC ctx\n' + f'{stream._ctx}\n\n' + f'Due to {type(err)}' ) borked.add(stream) else: @@ -562,8 +568,7 @@ async def open_sample_stream( async def sample_and_broadcast( - - bus: _FeedsBus, # noqa + bus: _FeedsBus, rt_shm: ShmArray, hist_shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, @@ -583,11 +588,33 @@ async def sample_and_broadcast( overruns = Counter() + # NOTE, only used for debugging live-data-feed issues, though + # this should be resolved more correctly in the future using the + # new typed-msgspec feats of `tractor`! + # + # XXX, a multiline nested `dict` formatter (since rn quote-msgs + # are just that). + # pfmt: Callable[[str], str] = mk_repr() + # iterate stream delivered by broker async for quotes in quote_stream: # print(quotes) - # TODO: ``numba`` this! + # XXX WARNING XXX only enable for debugging bc ow can cost + # ALOT of perf with HF-feedz!!! + # + # log.info( + # 'Rx live quotes:\n' + # f'{pfmt(quotes)}' + # ) + + # TODO, + # -[ ] `numba` or `cython`-nize this loop possibly? + # |_alternatively could we do it in rust somehow by upacking + # arrow msgs instead of using `msgspec`? + # -[ ] use `msgspec.Struct` support in new typed-msging from + # `tractor` to ensure only allowed msgs are transmitted? + # for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -660,6 +687,21 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) + # TODO, figure out how to make this useful whilst + # incoporating feed "pausing" .. + # + # if not subs: + # all_bs_fqmes: list[str] = list( + # bus._subscribers.keys() + # ) + # log.warning( + # f'No subscribers for {brokername!r} live-quote ??\n' + # f'broker_symbol: {broker_symbol}\n\n' + + # f'Maybe the backend-sys symbol does not match one of,\n' + # f'{pfmt(all_bs_fqmes)}\n' + # ) + # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct @@ -757,7 +799,6 @@ async def sample_and_broadcast( async def uniform_rate_send( - rate: float, quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, diff --git a/piker/log.py b/piker/log.py index 56776e1e..dc5cfc59 100644 --- a/piker/log.py +++ b/piker/log.py @@ -19,6 +19,10 @@ Log like a forester! """ import logging import json +import reprlib +from typing import ( + Callable, +) import tractor from pygments import ( @@ -84,3 +88,29 @@ def colorize_json( # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) + + +# TODO, eventually defer to the version in `modden` once +# it becomes a dep! +def mk_repr( + **repr_kws, +) -> Callable[[str], str]: + ''' + Allocate and deliver a `repr.Repr` instance with provided input + settings using the std-lib's `reprlib` mod, + * https://docs.python.org/3/library/reprlib.html + + ------ Ex. ------ + An up to 6-layer-nested `dict` as multi-line: + - https://stackoverflow.com/a/79102479 + - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel + + ''' + def_kws: dict[str, int] = dict( + indent=2, + maxlevel=6, # recursion levels + maxstring=66, # match editor line-len limit + ) + def_kws |= repr_kws + reprr = reprlib.Repr(**def_kws) + return reprr.repr