From efb52f22921d688ed686b270704ec96bf6dde9ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Sep 2020 14:30:50 -0400 Subject: [PATCH] Make shared array buffer incrementer a message pub Drop ctx manager api and use `tractor.msg.pub`. --- piker/data/_buffer.py | 81 ++++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 39 deletions(-) diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index 0c25309f..43be7dc0 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -1,18 +1,19 @@ """ Data buffers for fast shared humpy. """ +from typing import Tuple, Callable import time import tractor import trio -from ._sharedmem import attach_shared_array +from ._sharedmem import attach_shm_array -@tractor.stream +@tractor.msg.pub async def incr_buffer( - ctx: tractor.Context, - shm_token: str, + shm_token: dict, + get_topics: Callable[..., Tuple[str]], # delay_s: Optional[float] = None, ): """Task which inserts new bars into the provide shared memory array @@ -23,49 +24,51 @@ async def incr_buffer( # Likely the best way to solve this is to make this task # aware of the instrument's tradable hours? - with attach_shared_array( + shm = attach_shm_array( token=shm_token, readonly=False, - ) as shm: + ) - # determine ohlc delay between bars - # to determine time step between datums - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # determine ohlc delay between bars + # to determine time step between datums + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] - # adjust delay to compensate for trio processing time - ad = delay_s - 0.002 + # adjust delay to compensate for trio processing time + ad = delay_s - 0.002 - async def sleep(): - """Sleep until next time frames worth has passed from last bar. - """ - last_ts = shm.array[-1]['time'] - delay = max((last_ts + ad) - time.time(), 0) - await trio.sleep(delay) - # await trio.sleep(ad) + async def sleep(): + """Sleep until next time frames worth has passed from last bar. + """ + # last_ts = shm.array[-1]['time'] + # delay = max((last_ts + ad) - time.time(), 0) + # await trio.sleep(delay) + await trio.sleep(ad) - while True: - # sleep for duration of current bar - await sleep() + while True: + # sleep for duration of current bar + await sleep() - # TODO: in theory we could make this faster by copying the - # "last" readable value into the underlying larger buffer's - # next value and then incrementing the counter instead of - # using ``.push()``? + # TODO: in theory we could make this faster by copying the + # "last" readable value into the underlying larger buffer's + # next value and then incrementing the counter instead of + # using ``.push()``? - # append new entry to buffer thus "incrementing" the bar - array = shm.array - last = array[-1:].copy() - (index, t, close) = last[0][['index', 'time', 'close']] + # append new entry to buffer thus "incrementing" the bar + array = shm.array + last = array[-1:].copy() + (index, t, close) = last[0][['index', 'time', 'close']] - # this copies non-std fields (eg. vwap) from the last datum - last[ - ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (index + 1, t + delay_s, 0, close, close, close, close) + # this copies non-std fields (eg. vwap) from the last datum + last[ + ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (index + 1, t + delay_s, 0, close, close, close, close) - # write to the buffer - shm.push(last) - # print('incrementing array') + # write to the buffer + shm.push(last) + # print('incrementing array') - # yield the new buffer index value - await ctx.send_yield(shm._i.value) + # print(get_topics()) + + # broadcast the buffer index step + yield {'index': shm._i.value}