Catch `KeyError` on bcast errors which pop the sub

Not sure how i missed this (and left in handling of `list.remove()` and
it ever worked for that?) after the `samplerd` impl in 5ec1a72 but, this
adjusts the remove-broken-subscriber loop to catch the correct
`set.remove()` exception type on a missing (likely already removed)
subscription entry.
log_linearized_curve_overlays
Tyler Goodlet 2023-03-07 15:42:06 -05:00
parent 8a87e5f390
commit 12e196a6f7
1 changed files with 28 additions and 7 deletions

View File

@ -87,7 +87,6 @@ class Sampler:
# holds all the ``tractor.Context`` remote subscriptions for # holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are # a particular sample period increment event: all subscribers are
# notified on a step. # notified on a step.
# subscribers: dict[int, list[tractor.MsgStream]] = {}
subscribers: defaultdict[ subscribers: defaultdict[
float, float,
list[ list[
@ -240,8 +239,11 @@ class Sampler:
subscribers for a given sample period. subscribers for a given sample period.
''' '''
pair: list[float, set]
pair = self.subscribers[period_s] pair = self.subscribers[period_s]
last_ts: float
subs: set
last_ts, subs = pair last_ts, subs = pair
task = trio.lowlevel.current_task() task = trio.lowlevel.current_task()
@ -281,7 +283,7 @@ class Sampler:
for stream in borked: for stream in borked:
try: try:
subs.remove(stream) subs.remove(stream)
except ValueError: except KeyError:
log.warning( log.warning(
f'{stream._ctx.chan.uid} sub already removed!?' f'{stream._ctx.chan.uid} sub already removed!?'
) )
@ -429,7 +431,7 @@ async def maybe_open_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal: # noqa ) -> tractor.Portal: # noqa
''' '''
Client-side helper to maybe startup the ``samplerd`` service Client-side helper to maybe startup the ``samplerd`` service
under the ``pikerd`` tree. under the ``pikerd`` tree.
@ -619,6 +621,14 @@ async def sample_and_broadcast(
fqsn = f'{broker_symbol}.{brokername}' fqsn = f'{broker_symbol}.{brokername}'
lags: int = 0 lags: int = 0
# TODO: speed up this loop in an AOT compiled lang (like
# rust or nim or zig) and/or instead of doing a fan out to
# TCP sockets here, we add a shm-style tick queue which
# readers can pull from instead of placing the burden of
# broadcast on solely on this `brokerd` actor. see issues:
# - https://github.com/pikers/piker/issues/98
# - https://github.com/pikers/piker/issues/107
for (stream, tick_throttle) in subs.copy(): for (stream, tick_throttle) in subs.copy():
try: try:
with trio.move_on_after(0.2) as cs: with trio.move_on_after(0.2) as cs:
@ -748,9 +758,6 @@ def frame_ticks(
ticks_by_type[ttype].append(tick) ticks_by_type[ttype].append(tick)
# TODO: a less naive throttler, here's some snippets:
# token bucket by njs:
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
async def uniform_rate_send( async def uniform_rate_send(
rate: float, rate: float,
@ -760,8 +767,22 @@ async def uniform_rate_send(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Throttle a real-time (presumably tick event) stream to a uniform
transmissiom rate, normally for the purposes of throttling a data
flow being consumed by a graphics rendering actor which itself is limited
by a fixed maximum display rate.
# try not to error-out on overruns of the subscribed (chart) client Though this function isn't documented (nor was intentially written
to be) a token-bucket style algo, it effectively operates as one (we
think?).
TODO: a less naive throttler, here's some snippets:
token bucket by njs:
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
'''
# try not to error-out on overruns of the subscribed client
stream._ctx._backpressure = True stream._ctx._backpressure = True
# TODO: compute the approx overhead latency per cycle # TODO: compute the approx overhead latency per cycle