From 6e574835c85a104dc6eb1fe5e41fb9d768d3ea98 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Sep 2022 11:27:39 -0400 Subject: [PATCH] Update history shm buffer in ohlc sampler loop --- piker/data/_sampling.py | 71 ++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 428540a8..015de05e 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -137,7 +137,7 @@ async def increment_ohlc_buffer( # this copies non-std fields (eg. vwap) from the last datum last[ ['time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (t + delay_s, 0, close, close, close, close) + ][0] = (t + this_delay_s, 0, close, close, close, close) # write to the buffer shm.push(last) @@ -227,7 +227,8 @@ async def iter_ohlc_periods( async def sample_and_broadcast( bus: _FeedsBus, # noqa - shm: ShmArray, + rt_shm: ShmArray, + hist_shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, brokername: str, sum_tick_vlm: bool = True, @@ -263,41 +264,45 @@ async def sample_and_broadcast( last = tick['price'] - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] + # more compact inline-way to do this assignment + # to both buffers? + for shm in [rt_shm, hist_shm]: + # update last entry + # benchmarked in the 4-5 us range + # for shm in [rt_shm, hist_shm]: + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] - new_v = tick.get('size', 0) + new_v = tick.get('size', 0) - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - if sum_tick_vlm: - volume = v + new_v - else: - # presume backend takes care of summing - # it's own vlm - volume = quote['volume'] + if sum_tick_vlm: + volume = v + new_v + else: + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'bar_wap', # can be optionally provided - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - quote.get('bar_wap', 0), - volume, - ) + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'bar_wap', # can be optionally provided + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + quote.get('bar_wap', 0), + volume, + ) # XXX: we need to be very cautious here that no # context-channel is left lingering which doesn't have