Make shared array buffer incrementer a message pub

Drop ctx manager api and use `tractor.msg.pub`.
bar_select
Tyler Goodlet 2020-09-22 14:30:50 -04:00
parent 373ff90229
commit efb52f2292
1 changed files with 42 additions and 39 deletions

View File

@ -1,18 +1,19 @@
""" """
Data buffers for fast shared humpy. Data buffers for fast shared humpy.
""" """
from typing import Tuple, Callable
import time import time
import tractor import tractor
import trio import trio
from ._sharedmem import attach_shared_array from ._sharedmem import attach_shm_array
@tractor.stream @tractor.msg.pub
async def incr_buffer( async def incr_buffer(
ctx: tractor.Context, shm_token: dict,
shm_token: str, get_topics: Callable[..., Tuple[str]],
# delay_s: Optional[float] = None, # delay_s: Optional[float] = None,
): ):
"""Task which inserts new bars into the provide shared memory array """Task which inserts new bars into the provide shared memory array
@ -23,49 +24,51 @@ async def incr_buffer(
# Likely the best way to solve this is to make this task # Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours? # aware of the instrument's tradable hours?
with attach_shared_array( shm = attach_shm_array(
token=shm_token, token=shm_token,
readonly=False, readonly=False,
) as shm: )
# determine ohlc delay between bars # determine ohlc delay between bars
# to determine time step between datums # to determine time step between datums
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
# adjust delay to compensate for trio processing time # adjust delay to compensate for trio processing time
ad = delay_s - 0.002 ad = delay_s - 0.002
async def sleep(): async def sleep():
"""Sleep until next time frames worth has passed from last bar. """Sleep until next time frames worth has passed from last bar.
""" """
last_ts = shm.array[-1]['time'] # last_ts = shm.array[-1]['time']
delay = max((last_ts + ad) - time.time(), 0) # delay = max((last_ts + ad) - time.time(), 0)
await trio.sleep(delay) # await trio.sleep(delay)
# await trio.sleep(ad) await trio.sleep(ad)
while True: while True:
# sleep for duration of current bar # sleep for duration of current bar
await sleep() await sleep()
# TODO: in theory we could make this faster by copying the # TODO: in theory we could make this faster by copying the
# "last" readable value into the underlying larger buffer's # "last" readable value into the underlying larger buffer's
# next value and then incrementing the counter instead of # next value and then incrementing the counter instead of
# using ``.push()``? # using ``.push()``?
# append new entry to buffer thus "incrementing" the bar # append new entry to buffer thus "incrementing" the bar
array = shm.array array = shm.array
last = array[-1:].copy() last = array[-1:].copy()
(index, t, close) = last[0][['index', 'time', 'close']] (index, t, close) = last[0][['index', 'time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum # this copies non-std fields (eg. vwap) from the last datum
last[ last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close'] ['index', 'time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close) ][0] = (index + 1, t + delay_s, 0, close, close, close, close)
# write to the buffer # write to the buffer
shm.push(last) shm.push(last)
# print('incrementing array') # print('incrementing array')
# yield the new buffer index value # print(get_topics())
await ctx.send_yield(shm._i.value)
# broadcast the buffer index step
yield {'index': shm._i.value}