Catch `KeyError` on bcast errors which pop the sub
Not sure how i missed this (and left in handling of `list.remove()` and
it ever worked for that?) after the `samplerd` impl in 5ec1a72
but, this
adjusts the remove-broken-subscriber loop to catch the correct
`set.remove()` exception type on a missing (likely already removed)
subscription entry.
storage_cli
parent
6f3a6bcb42
commit
5bf40ceb79
|
@ -68,7 +68,7 @@ class Sampler:
|
||||||
|
|
||||||
This non-instantiated type is meant to be a singleton within
|
This non-instantiated type is meant to be a singleton within
|
||||||
a `samplerd` actor-service spawned once by the user wishing to
|
a `samplerd` actor-service spawned once by the user wishing to
|
||||||
time-step sample real-time quote feeds, see
|
time-step-sample (real-time) quote feeds, see
|
||||||
``._daemon.maybe_open_samplerd()`` and the below
|
``._daemon.maybe_open_samplerd()`` and the below
|
||||||
``register_with_sampler()``.
|
``register_with_sampler()``.
|
||||||
|
|
||||||
|
@ -87,7 +87,6 @@ class Sampler:
|
||||||
# holds all the ``tractor.Context`` remote subscriptions for
|
# holds all the ``tractor.Context`` remote subscriptions for
|
||||||
# a particular sample period increment event: all subscribers are
|
# a particular sample period increment event: all subscribers are
|
||||||
# notified on a step.
|
# notified on a step.
|
||||||
# subscribers: dict[int, list[tractor.MsgStream]] = {}
|
|
||||||
subscribers: defaultdict[
|
subscribers: defaultdict[
|
||||||
float,
|
float,
|
||||||
list[
|
list[
|
||||||
|
@ -240,8 +239,11 @@ class Sampler:
|
||||||
subscribers for a given sample period.
|
subscribers for a given sample period.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
pair: list[float, set]
|
||||||
pair = self.subscribers[period_s]
|
pair = self.subscribers[period_s]
|
||||||
|
|
||||||
|
last_ts: float
|
||||||
|
subs: set
|
||||||
last_ts, subs = pair
|
last_ts, subs = pair
|
||||||
|
|
||||||
task = trio.lowlevel.current_task()
|
task = trio.lowlevel.current_task()
|
||||||
|
@ -281,7 +283,7 @@ class Sampler:
|
||||||
for stream in borked:
|
for stream in borked:
|
||||||
try:
|
try:
|
||||||
subs.remove(stream)
|
subs.remove(stream)
|
||||||
except ValueError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'{stream._ctx.chan.uid} sub already removed!?'
|
f'{stream._ctx.chan.uid} sub already removed!?'
|
||||||
)
|
)
|
||||||
|
@ -429,7 +431,7 @@ async def maybe_open_samplerd(
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tractor._portal.Portal: # noqa
|
) -> tractor.Portal: # noqa
|
||||||
'''
|
'''
|
||||||
Client-side helper to maybe startup the ``samplerd`` service
|
Client-side helper to maybe startup the ``samplerd`` service
|
||||||
under the ``pikerd`` tree.
|
under the ``pikerd`` tree.
|
||||||
|
@ -619,6 +621,14 @@ async def sample_and_broadcast(
|
||||||
fqsn = f'{broker_symbol}.{brokername}'
|
fqsn = f'{broker_symbol}.{brokername}'
|
||||||
lags: int = 0
|
lags: int = 0
|
||||||
|
|
||||||
|
# TODO: speed up this loop in an AOT compiled lang (like
|
||||||
|
# rust or nim or zig) and/or instead of doing a fan out to
|
||||||
|
# TCP sockets here, we add a shm-style tick queue which
|
||||||
|
# readers can pull from instead of placing the burden of
|
||||||
|
# broadcast on solely on this `brokerd` actor. see issues:
|
||||||
|
# - https://github.com/pikers/piker/issues/98
|
||||||
|
# - https://github.com/pikers/piker/issues/107
|
||||||
|
|
||||||
for (stream, tick_throttle) in subs.copy():
|
for (stream, tick_throttle) in subs.copy():
|
||||||
try:
|
try:
|
||||||
with trio.move_on_after(0.2) as cs:
|
with trio.move_on_after(0.2) as cs:
|
||||||
|
@ -748,9 +758,6 @@ def frame_ticks(
|
||||||
ticks_by_type[ttype].append(tick)
|
ticks_by_type[ttype].append(tick)
|
||||||
|
|
||||||
|
|
||||||
# TODO: a less naive throttler, here's some snippets:
|
|
||||||
# token bucket by njs:
|
|
||||||
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
|
|
||||||
async def uniform_rate_send(
|
async def uniform_rate_send(
|
||||||
|
|
||||||
rate: float,
|
rate: float,
|
||||||
|
@ -760,8 +767,22 @@ async def uniform_rate_send(
|
||||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
Throttle a real-time (presumably tick event) stream to a uniform
|
||||||
|
transmissiom rate, normally for the purposes of throttling a data
|
||||||
|
flow being consumed by a graphics rendering actor which itself is limited
|
||||||
|
by a fixed maximum display rate.
|
||||||
|
|
||||||
# try not to error-out on overruns of the subscribed (chart) client
|
Though this function isn't documented (nor was intentially written
|
||||||
|
to be) a token-bucket style algo, it effectively operates as one (we
|
||||||
|
think?).
|
||||||
|
|
||||||
|
TODO: a less naive throttler, here's some snippets:
|
||||||
|
token bucket by njs:
|
||||||
|
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
|
||||||
|
|
||||||
|
'''
|
||||||
|
# try not to error-out on overruns of the subscribed client
|
||||||
stream._ctx._backpressure = True
|
stream._ctx._backpressure = True
|
||||||
|
|
||||||
# TODO: compute the approx overhead latency per cycle
|
# TODO: compute the approx overhead latency per cycle
|
||||||
|
|
Loading…
Reference in New Issue