From 8069bbe105663cac3ba6d635c23903868e044565 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Apr 2021 12:21:50 -0400 Subject: [PATCH] Drop old incrementer func --- piker/fsp/__init__.py | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index c798bf28..0b432e5b 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -28,7 +28,7 @@ from ..log import get_logger from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap -from ..data import attach_shm_array, Feed +from ..data import attach_shm_array log = get_logger(__name__) @@ -62,23 +62,6 @@ async def latency( yield value -async def increment_signals( - feed: Feed, - dst_shm: 'SharedArray', # noqa -) -> None: - """Increment the underlying shared memory buffer on every "increment" - msg received from the underlying data feed. - - """ - async for msg in await feed.index_stream(): - array = dst_shm.array - last = array[-1:].copy() - - # write new slot to the buffer - dst_shm.push(last) - len(dst_shm.array) - - @tractor.stream async def cascade( ctx: tractor.Context, @@ -150,7 +133,6 @@ async def cascade( ) history[fsp_func_name] = history_output - # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff >= 0: @@ -182,8 +164,8 @@ async def cascade( cs = await n.start(fsp_compute) - # Increment the underlying shared memory buffer on every "increment" - # msg received from the underlying data feed. + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. async for msg in await feed.index_stream(): @@ -198,10 +180,11 @@ async def cascade( # TODO: adopt an incremental update engine/approach # where possible here eventually! + # read out last shm row array = dst.array last = array[-1:].copy() - # write new slot to the buffer + # write new row to the shm buffer dst.push(last) last_len = new_len