diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 74e6410a..18d7b542 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -30,7 +30,6 @@ import time from typing import ( Any, AsyncIterator, - Callable, TYPE_CHECKING, ) @@ -101,6 +100,12 @@ class Sampler: # history loading. incr_task_cs: trio.CancelScope | None = None + bcast_errors: tuple[Exception] = ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.EndOfChannel, + ) + # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. @@ -264,14 +269,15 @@ class Sampler: subs: set last_ts, subs = pair - task = trio.lowlevel.current_task() - log.debug( - f'SUBS {self.subscribers}\n' - f'PAIR {pair}\n' - f'TASK: {task}: {id(task)}\n' - f'broadcasting {period_s} -> {last_ts}\n' - # f'consumers: {subs}' - ) + # NOTE, for debugging pub-sub issues + # task = trio.lowlevel.current_task() + # log.debug( + # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' + # f'PAIR: {pair}\n' + # f'TASK: {task}: {id(task)}\n' + # f'broadcasting {period_s} -> {last_ts}\n' + # f'consumers: {subs}' + # ) borked: set[MsgStream] = set() sent: set[MsgStream] = set() while True: @@ -288,12 +294,11 @@ class Sampler: await stream.send(msg) sent.add(stream) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): + except self.bcast_errors as err: log.error( - f'{stream._ctx.chan.uid} dropped connection' + f'Connection dropped for IPC ctx\n' + f'{stream._ctx}\n\n' + f'Due to {type(err)}' ) borked.add(stream) else: @@ -579,7 +584,7 @@ async def open_sample_stream( async def sample_and_broadcast( - bus: _FeedsBus, # noqa + bus: _FeedsBus, rt_shm: ShmArray, hist_shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, @@ -599,9 +604,13 @@ async def sample_and_broadcast( overruns = Counter() - # multiline nested `dict` formatter (since rn quote-msgs are - # just that). - pfmt: Callable[[str], str] = mk_repr() + # NOTE, only used for debugging live-data-feed issues, though + # this should be resolved more correctly in the future using the + # new typed-msgspec feats of `tractor`! + # + # XXX, a multiline nested `dict` formatter (since rn quote-msgs + # are just that). + # pfmt: Callable[[str], str] = mk_repr() # iterate stream delivered by broker async for quotes in quote_stream: @@ -614,7 +623,13 @@ async def sample_and_broadcast( # f'{pfmt(quotes)}' # ) - # TODO: `numba` this! + # TODO, + # -[ ] `numba` or `cython`-nize this loop possibly? + # |_alternatively could we do it in rust somehow by upacking + # arrow msgs instead of using `msgspec`? + # -[ ] use `msgspec.Struct` support in new typed-msging from + # `tractor` to ensure only allowed msgs are transmitted? + # for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -687,17 +702,20 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) - if not subs: - all_bs_fqmes: list[str] = list( - bus._subscribers.keys() - ) - log.warning( - f'No subscribers for {brokername!r} live-quote ??\n' - f'broker_symbol: {broker_symbol}\n\n' + # TODO, figure out how to make this useful whilst + # incoporating feed "pausing" .. + # + # if not subs: + # all_bs_fqmes: list[str] = list( + # bus._subscribers.keys() + # ) + # log.warning( + # f'No subscribers for {brokername!r} live-quote ??\n' + # f'broker_symbol: {broker_symbol}\n\n' - f'Maybe the backend-sys symbol does not match one of,\n' - f'{pfmt(all_bs_fqmes)}\n' - ) + # f'Maybe the backend-sys symbol does not match one of,\n' + # f'{pfmt(all_bs_fqmes)}\n' + # ) # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it @@ -796,7 +814,6 @@ async def sample_and_broadcast( async def uniform_rate_send( - rate: float, quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, diff --git a/piker/log.py b/piker/log.py index 7f554f16..8db15644 100644 --- a/piker/log.py +++ b/piker/log.py @@ -90,6 +90,8 @@ def colorize_json( ) +# TODO, eventually defer to the version in `modden` once +# it becomes a dep! def mk_repr( **repr_kws, ) -> Callable[[str], str]: