diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7bb0231d..e5b87a2a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -95,6 +95,12 @@ class Sampler: # history loading. incr_task_cs: trio.CancelScope | None = None + bcast_errors: tuple[Exception] = ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.EndOfChannel, + ) + # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. @@ -258,14 +264,15 @@ class Sampler: subs: set last_ts, subs = pair - task = trio.lowlevel.current_task() - log.debug( - f'SUBS {self.subscribers}\n' - f'PAIR {pair}\n' - f'TASK: {task}: {id(task)}\n' - f'broadcasting {period_s} -> {last_ts}\n' - # f'consumers: {subs}' - ) + # NOTE, for debugging pub-sub issues + # task = trio.lowlevel.current_task() + # log.debug( + # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' + # f'PAIR: {pair}\n' + # f'TASK: {task}: {id(task)}\n' + # f'broadcasting {period_s} -> {last_ts}\n' + # f'consumers: {subs}' + # ) borked: set[MsgStream] = set() sent: set[MsgStream] = set() while True: @@ -282,12 +289,11 @@ class Sampler: await stream.send(msg) sent.add(stream) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): + except self.bcast_errors as err: log.error( - f'{stream._ctx.chan.uid} dropped connection' + f'Connection dropped for IPC ctx\n' + f'{stream._ctx}\n\n' + f'Due to {type(err)}' ) borked.add(stream) else: @@ -394,7 +400,8 @@ async def register_with_sampler( finally: if ( sub_for_broadcasts - and subs + and + subs ): try: subs.remove(stream) @@ -561,8 +568,7 @@ async def open_sample_stream( async def sample_and_broadcast( - - bus: _FeedsBus, # noqa + bus: _FeedsBus, rt_shm: ShmArray, hist_shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, @@ -582,11 +588,33 @@ async def sample_and_broadcast( overruns = Counter() + # NOTE, only used for debugging live-data-feed issues, though + # this should be resolved more correctly in the future using the + # new typed-msgspec feats of `tractor`! + # + # XXX, a multiline nested `dict` formatter (since rn quote-msgs + # are just that). + # pfmt: Callable[[str], str] = mk_repr() + # iterate stream delivered by broker async for quotes in quote_stream: # print(quotes) - # TODO: ``numba`` this! + # XXX WARNING XXX only enable for debugging bc ow can cost + # ALOT of perf with HF-feedz!!! + # + # log.info( + # 'Rx live quotes:\n' + # f'{pfmt(quotes)}' + # ) + + # TODO, + # -[ ] `numba` or `cython`-nize this loop possibly? + # |_alternatively could we do it in rust somehow by upacking + # arrow msgs instead of using `msgspec`? + # -[ ] use `msgspec.Struct` support in new typed-msging from + # `tractor` to ensure only allowed msgs are transmitted? + # for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -659,6 +687,21 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) + # TODO, figure out how to make this useful whilst + # incoporating feed "pausing" .. + # + # if not subs: + # all_bs_fqmes: list[str] = list( + # bus._subscribers.keys() + # ) + # log.warning( + # f'No subscribers for {brokername!r} live-quote ??\n' + # f'broker_symbol: {broker_symbol}\n\n' + + # f'Maybe the backend-sys symbol does not match one of,\n' + # f'{pfmt(all_bs_fqmes)}\n' + # ) + # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct @@ -728,18 +771,14 @@ async def sample_and_broadcast( if lags > 10: await tractor.pause() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - trio.EndOfChannel, - ): + except Sampler.bcast_errors as ipc_err: ctx: Context = ipc._ctx chan: Channel = ctx.chan if ctx: log.warning( - 'Dropped `brokerd`-quotes-feed connection:\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' + f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n' + f'x>) {ctx.cid}@{chan.uid}' + f'|_{ipc_err!r}\n\n' ) if sub.throttle_rate: assert ipc._closed @@ -756,12 +795,11 @@ async def sample_and_broadcast( async def uniform_rate_send( - rate: float, quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -779,13 +817,16 @@ async def uniform_rate_send( https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 ''' - # TODO: compute the approx overhead latency per cycle - left_to_sleep = throttle_period = 1/rate - 0.000616 + # ?TODO? dynamically compute the **actual** approx overhead latency per cycle + # instead of this magic # bidinezz? + throttle_period: float = 1/rate - 0.000616 + left_to_sleep: float = throttle_period # send cycle state + first_quote: dict|None first_quote = last_quote = None - last_send = time.time() - diff = 0 + last_send: float = time.time() + diff: float = 0 task_status.started() ticks_by_type: dict[ @@ -796,22 +837,28 @@ async def uniform_rate_send( clear_types = _tick_groups['clears'] while True: - # compute the remaining time to sleep for this throttled cycle - left_to_sleep = throttle_period - diff + left_to_sleep: float = throttle_period - diff if left_to_sleep > 0: + cs: trio.CancelScope with trio.move_on_after(left_to_sleep) as cs: + sym: str + last_quote: dict try: sym, last_quote = await quote_stream.receive() except trio.EndOfChannel: - log.exception(f"feed for {stream} ended?") + log.exception( + f'Live stream for feed for ended?\n' + f'<=c\n' + f' |_[{stream!r}\n' + ) break - diff = time.time() - last_send + diff: float = time.time() - last_send if not first_quote: - first_quote = last_quote + first_quote: float = last_quote # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: @@ -872,7 +919,9 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({sym: first_quote}) + await stream.send({ + sym: first_quote + }) except tractor.RemoteActorError as rme: if rme.type is not tractor._exceptions.StreamOverrun: raise @@ -883,19 +932,28 @@ async def uniform_rate_send( f'{sym}:{ctx.cid}@{chan.uid}' ) + # NOTE: any of these can be raised by `tractor`'s IPC + # transport-layer and we want to be highly resilient + # to consumers which crash or lose network connection. + # I.e. we **DO NOT** want to crash and propagate up to + # ``pikerd`` these kinds of errors! except ( - # NOTE: any of these can be raised by ``tractor``'s IPC - # transport-layer and we want to be highly resilient - # to consumers which crash or lose network connection. - # I.e. we **DO NOT** want to crash and propagate up to - # ``pikerd`` these kinds of errors! - trio.ClosedResourceError, - trio.BrokenResourceError, ConnectionResetError, - ): - # if the feed consumer goes down then drop - # out of this rate limiter - log.warning(f'{stream} closed') + ) + Sampler.bcast_errors as ipc_err: + match ipc_err: + case trio.EndOfChannel(): + log.info( + f'{stream} terminated by peer,\n' + f'{ipc_err!r}' + ) + case _: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning( + f'{stream} closed due to,\n' + f'{ipc_err!r}' + ) + await stream.aclose() return diff --git a/piker/log.py b/piker/log.py index 56776e1e..dc5cfc59 100644 --- a/piker/log.py +++ b/piker/log.py @@ -19,6 +19,10 @@ Log like a forester! """ import logging import json +import reprlib +from typing import ( + Callable, +) import tractor from pygments import ( @@ -84,3 +88,29 @@ def colorize_json( # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) + + +# TODO, eventually defer to the version in `modden` once +# it becomes a dep! +def mk_repr( + **repr_kws, +) -> Callable[[str], str]: + ''' + Allocate and deliver a `repr.Repr` instance with provided input + settings using the std-lib's `reprlib` mod, + * https://docs.python.org/3/library/reprlib.html + + ------ Ex. ------ + An up to 6-layer-nested `dict` as multi-line: + - https://stackoverflow.com/a/79102479 + - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel + + ''' + def_kws: dict[str, int] = dict( + indent=2, + maxlevel=6, # recursion levels + maxstring=66, # match editor line-len limit + ) + def_kws |= repr_kws + reprr = reprlib.Repr(**def_kws) + return reprr.repr