Enable backpressure in an effort to prevent bootup overruns

samplerd_service
Tyler Goodlet 2023-01-30 11:45:29 -05:00
parent 69074f4fa5
commit fcfc0f31f0
2 changed files with 24 additions and 2 deletions

View File

@ -751,6 +751,9 @@ async def uniform_rate_send(
) -> None: ) -> None:
# try not to error-out on overruns of the subscribed (chart) client
stream._ctx._backpressure = True
# TODO: compute the approx overhead latency per cycle # TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616 left_to_sleep = throttle_period = 1/rate - 0.000616

View File

@ -283,8 +283,22 @@ async def start_backfill(
if step_size_s == 60: if step_size_s == 60:
inow = round(time.time()) inow = round(time.time())
if (inow - times[-1]) > 60: diff = inow - times[-1]
await tractor.breakpoint() if abs(diff) > 60:
surr = array[-6:]
diff_in_mins = round(diff/60., ndigits=2)
log.warning(
f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n'
f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
'Surrounding 6 time stamps:\n'
f'{list(surr["time"])}\n'
'Here is surrounding 6 samples:\n'
f'{surr}\nn'
)
# uncomment this for a hacker who wants to investigate
# this case manually..
# await tractor.breakpoint()
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * step_size_s frame_size_s = len(array) * step_size_s
@ -1104,6 +1118,10 @@ async def open_feed_bus(
symbol. symbol.
''' '''
# ensure that a quote feed stream which is pushing too fast doesn't
# cause and overrun in the client.
ctx._backpressure = True
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
@ -1215,6 +1233,7 @@ async def open_feed_bus(
# a max ``tick_throttle`` instantaneous rate. # a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
ctx._backpressure = False
cs = await bus.start_task( cs = await bus.start_task(
uniform_rate_send, uniform_rate_send,
tick_throttle, tick_throttle,