diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 20067f82..065d338c 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -68,7 +68,7 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step sample real-time quote feeds, see + time-step-sample (real-time) quote feeds, see ``._daemon.maybe_open_samplerd()`` and the below ``register_with_sampler()``. @@ -87,7 +87,6 @@ class Sampler: # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. - # subscribers: dict[int, list[tractor.MsgStream]] = {} subscribers: defaultdict[ float, list[ @@ -240,8 +239,11 @@ class Sampler: subscribers for a given sample period. ''' + pair: list[float, set] pair = self.subscribers[period_s] + last_ts: float + subs: set last_ts, subs = pair task = trio.lowlevel.current_task() @@ -281,7 +283,7 @@ class Sampler: for stream in borked: try: subs.remove(stream) - except ValueError: + except KeyError: log.warning( f'{stream._ctx.chan.uid} sub already removed!?' ) @@ -429,7 +431,7 @@ async def maybe_open_samplerd( loglevel: str | None = None, **kwargs, -) -> tractor._portal.Portal: # noqa +) -> tractor.Portal: # noqa ''' Client-side helper to maybe startup the ``samplerd`` service under the ``pikerd`` tree. @@ -619,6 +621,14 @@ async def sample_and_broadcast( fqsn = f'{broker_symbol}.{brokername}' lags: int = 0 + # TODO: speed up this loop in an AOT compiled lang (like + # rust or nim or zig) and/or instead of doing a fan out to + # TCP sockets here, we add a shm-style tick queue which + # readers can pull from instead of placing the burden of + # broadcast on solely on this `brokerd` actor. see issues: + # - https://github.com/pikers/piker/issues/98 + # - https://github.com/pikers/piker/issues/107 + for (stream, tick_throttle) in subs.copy(): try: with trio.move_on_after(0.2) as cs: @@ -748,9 +758,6 @@ def frame_ticks( ticks_by_type[ttype].append(tick) -# TODO: a less naive throttler, here's some snippets: -# token bucket by njs: -# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 async def uniform_rate_send( rate: float, @@ -760,8 +767,22 @@ async def uniform_rate_send( task_status: TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: + ''' + Throttle a real-time (presumably tick event) stream to a uniform + transmissiom rate, normally for the purposes of throttling a data + flow being consumed by a graphics rendering actor which itself is limited + by a fixed maximum display rate. - # try not to error-out on overruns of the subscribed (chart) client + Though this function isn't documented (nor was intentially written + to be) a token-bucket style algo, it effectively operates as one (we + think?). + + TODO: a less naive throttler, here's some snippets: + token bucket by njs: + https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 + + ''' + # try not to error-out on overruns of the subscribed client stream._ctx._backpressure = True # TODO: compute the approx overhead latency per cycle