Drop old incrementer func

cached_feeds
Tyler Goodlet 2021-04-03 12:21:50 -04:00
parent ce4144aace
commit 8069bbe105
1 changed files with 5 additions and 22 deletions

View File

@ -28,7 +28,7 @@ from ..log import get_logger
from .. import data from .. import data
from ._momo import _rsi, _wma from ._momo import _rsi, _wma
from ._volume import _tina_vwap from ._volume import _tina_vwap
from ..data import attach_shm_array, Feed from ..data import attach_shm_array
log = get_logger(__name__) log = get_logger(__name__)
@ -62,23 +62,6 @@ async def latency(
yield value yield value
async def increment_signals(
feed: Feed,
dst_shm: 'SharedArray', # noqa
) -> None:
"""Increment the underlying shared memory buffer on every "increment"
msg received from the underlying data feed.
"""
async for msg in await feed.index_stream():
array = dst_shm.array
last = array[-1:].copy()
# write new slot to the buffer
dst_shm.push(last)
len(dst_shm.array)
@tractor.stream @tractor.stream
async def cascade( async def cascade(
ctx: tractor.Context, ctx: tractor.Context,
@ -150,7 +133,6 @@ async def cascade(
) )
history[fsp_func_name] = history_output history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history) diff = len(src.array) - len(history)
if diff >= 0: if diff >= 0:
@ -182,8 +164,8 @@ async def cascade(
cs = await n.start(fsp_compute) cs = await n.start(fsp_compute)
# Increment the underlying shared memory buffer on every "increment" # Increment the underlying shared memory buffer on every
# msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async for msg in await feed.index_stream(): async for msg in await feed.index_stream():
@ -198,10 +180,11 @@ async def cascade(
# TODO: adopt an incremental update engine/approach # TODO: adopt an incremental update engine/approach
# where possible here eventually! # where possible here eventually!
# read out last shm row
array = dst.array array = dst.array
last = array[-1:].copy() last = array[-1:].copy()
# write new slot to the buffer # write new row to the shm buffer
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len