From 63e705bab01d50b22ab08d7236299584c8990a0f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 25 Jan 2023 09:11:34 -0500 Subject: [PATCH] Better handle dynamic registry sampler broadcasts In situations where clients are (dynamically) subscribing *while* broadcasts are starting to taking place we need to handle the `set`-modified-during-iteration case. This scenario seems to be more common during races on concurrent startup of multiple symbols. The solution here is to use another set to take note of subscribers which are successfully sent-to and then skipping them on re-try. This also contains an attempt to exception-handle throttled stream overruns caused by higher frequency feeds (like binance) pushing more quotes then can be handled during (UI) client startup. --- piker/data/_sampling.py | 46 +++++++++++++++++++++++++++++------------ piker/data/feed.py | 3 +++ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a5df96cc..20067f82 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -253,20 +253,30 @@ class Sampler: # f'consumers: {subs}' ) borked: set[tractor.MsgStream] = set() - for stream in subs: + sent: set[tractor.MsgStream] = set() + while True: try: - await stream.send({ - 'index': time_stamp or last_ts, - 'period': period_s, - }) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error( - f'{stream._ctx.chan.uid} dropped connection' - ) - borked.add(stream) + for stream in (subs - sent): + try: + await stream.send({ + 'index': time_stamp or last_ts, + 'period': period_s, + }) + sent.add(stream) + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + borked.add(stream) + else: + break + except RuntimeError: + log.warning(f'Client subs {subs} changed while broadcasting') + continue for stream in borked: try: @@ -848,6 +858,16 @@ async def uniform_rate_send( # rate timing exactly lul try: await stream.send({sym: first_quote}) + except tractor.RemoteActorError as rme: + if rme.type is not tractor._exceptions.StreamOverrun: + raise + ctx = stream._ctx + chan = ctx.chan + log.warning( + 'Throttled quote-stream overrun!\n' + f'{sym}:{ctx.cid}@{chan.uid}' + ) + except ( # NOTE: any of these can be raised by ``tractor``'s IPC # transport-layer and we want to be highly resilient diff --git a/piker/data/feed.py b/piker/data/feed.py index 906f4bb4..c0ae2df9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1589,6 +1589,9 @@ async def open_feed( (brokermod, bfqsns), ) in zip(ctxs, providers.items()): + # NOTE: do it asap to avoid overruns during multi-feed setup? + ctx._backpressure = backpressure + for fqsn, flume_msg in flumes_msg_dict.items(): flume = Flume.from_msg(flume_msg) assert flume.symbol.fqsn == fqsn