From 5dd69b22957c547c9286272e6e0e6107308a32b5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 25 Jan 2023 09:11:34 -0500 Subject: [PATCH] Better handle dynamic registry sampler broadcasts In situations where clients are (dynamically) subscribing *while* broadcasts are starting to taking place we need to handle the `set`-modified-during-iteration case. This scenario seems to be more common during races on concurrent startup of multiple symbols. The solution here is to use another set to take note of subscribers which are successfully sent-to and then skipping them on re-try. This also contains an attempt to exception-handle throttled stream overruns caused by higher frequency feeds (like binance) pushing more quotes then can be handled during (UI) client startup. --- piker/data/_sampling.py | 46 +++++++++++++++++++++++++++++------------ piker/data/feed.py | 3 +++ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f44304bf..ec29c6ae 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -253,20 +253,30 @@ class Sampler: # f'consumers: {subs}' ) borked: set[tractor.MsgStream] = set() - for stream in subs: + sent: set[tractor.MsgStream] = set() + while True: try: - await stream.send({ - 'index': time_stamp or last_ts, - 'period': period_s, - }) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error( - f'{stream._ctx.chan.uid} dropped connection' - ) - borked.add(stream) + for stream in (subs - sent): + try: + await stream.send({ + 'index': time_stamp or last_ts, + 'period': period_s, + }) + sent.add(stream) + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + borked.add(stream) + else: + break + except RuntimeError: + log.warning(f'Client subs {subs} changed while broadcasting') + continue for stream in borked: try: @@ -848,6 +858,16 @@ async def uniform_rate_send( # rate timing exactly lul try: await stream.send({sym: first_quote}) + except tractor.RemoteActorError as rme: + if rme.type is not tractor._exceptions.StreamOverrun: + raise + ctx = stream._ctx + chan = ctx.chan + log.warning( + 'Throttled quote-stream overrun!\n' + f'{sym}:{ctx.cid}@{chan.uid}' + ) + except ( # NOTE: any of these can be raised by ``tractor``'s IPC # transport-layer and we want to be highly resilient diff --git a/piker/data/feed.py b/piker/data/feed.py index 69d5be7d..7efd5eb3 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1589,6 +1589,9 @@ async def open_feed( (brokermod, bfqsns), ) in zip(ctxs, providers.items()): + # NOTE: do it asap to avoid overruns during multi-feed setup? + ctx._backpressure = backpressure + for fqsn, flume_msg in flumes_msg_dict.items(): flume = Flume.from_msg(flume_msg) assert flume.symbol.fqsn == fqsn