From fcfc0f31f08ff5243e051e68609a4e367495c80c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Jan 2023 11:45:29 -0500 Subject: [PATCH] Enable backpressure in an effort to prevent bootup overruns --- piker/data/_sampling.py | 3 +++ piker/data/feed.py | 23 +++++++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f1bbc500..a5df96cc 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -751,6 +751,9 @@ async def uniform_rate_send( ) -> None: + # try not to error-out on overruns of the subscribed (chart) client + stream._ctx._backpressure = True + # TODO: compute the approx overhead latency per cycle left_to_sleep = throttle_period = 1/rate - 0.000616 diff --git a/piker/data/feed.py b/piker/data/feed.py index b714c77e..534aebc9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -283,8 +283,22 @@ async def start_backfill( if step_size_s == 60: inow = round(time.time()) - if (inow - times[-1]) > 60: - await tractor.breakpoint() + diff = inow - times[-1] + if abs(diff) > 60: + surr = array[-6:] + diff_in_mins = round(diff/60., ndigits=2) + log.warning( + f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n' + f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' + 'Surrounding 6 time stamps:\n' + f'{list(surr["time"])}\n' + 'Here is surrounding 6 samples:\n' + f'{surr}\nn' + ) + + # uncomment this for a hacker who wants to investigate + # this case manually.. + # await tractor.breakpoint() # frame's worth of sample-period-steps, in seconds frame_size_s = len(array) * step_size_s @@ -1104,6 +1118,10 @@ async def open_feed_bus( symbol. ''' + # ensure that a quote feed stream which is pushing too fast doesn't + # cause and overrun in the client. + ctx._backpressure = True + if loglevel is None: loglevel = tractor.current_actor().loglevel @@ -1215,6 +1233,7 @@ async def open_feed_bus( # a max ``tick_throttle`` instantaneous rate. send, recv = trio.open_memory_channel(2**10) + ctx._backpressure = False cs = await bus.start_task( uniform_rate_send, tick_throttle,