From efd93d058a69cf44c24c8459ba85122f003d2811 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 May 2021 08:47:30 -0400 Subject: [PATCH] Breakout fsp rt loop as non-closure for readability --- piker/fsp/__init__.py | 184 +++++++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 75 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 2345b516..312a0cef 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -17,6 +17,7 @@ """ Financial signal processing for the peeps. """ +from functools import partial from typing import AsyncIterator, Callable, Tuple import trio @@ -29,6 +30,8 @@ from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap from ..data import attach_shm_array +from ..data.feed import Feed +from ..data._sharedmem import ShmArray log = get_logger(__name__) @@ -62,6 +65,97 @@ async def latency( yield value +async def fsp_compute( + ctx: tractor.Context, + symbol: str, + feed: Feed, + + src: ShmArray, + dst: ShmArray, + + fsp_func_name: str, + func: Callable, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> None: + + # TODO: load appropriate fsp with input args + + async def filter_by_sym( + sym: str, + stream, + ): + + # TODO: make this the actualy first quote from feed + # XXX: this allows for a single iteration to run for history + # processing without waiting on the real-time feed for a new quote + yield {} + + # task cancellation won't kill the channel + with stream.shield(): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes + + out_stream = func( + filter_by_sym(symbol, feed.stream), + feed.shm, + ) + + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + dst._first.value = src._first.value + dst._last.value = src._first.value + + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() + + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[fsp_func_name] = history_output + + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff >= 0: + print(f"WTF DIFF SIGNAL to HISTORY {diff}") + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + await ctx.send_yield(index) + + # setup a respawn handle + with trio.CancelScope() as cs: + task_status.started(cs) + + # rt stream + async for processed in out_stream: + log.debug(f"{fsp_func_name}: {processed}") + index = src.index + dst.array[-1][fsp_func_name] = processed + + # stream latest shm array index entry + await ctx.send_yield(index) + + @tractor.stream async def cascade( ctx: tractor.Context, @@ -85,84 +179,24 @@ async def cascade( assert src.token == feed.shm.token - async def fsp_compute( - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - ) -> None: - - # TODO: load appropriate fsp with input args - - async def filter_by_sym( - sym: str, - stream, - ): - # task cancellation won't kill the channel - with stream.shield(): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes - - out_stream = func( - filter_by_sym(symbol, feed.stream), - feed.shm, - ) - - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for incremental compuations. - dst._first.value = src._first.value - dst._last.value = src._first.value - - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() - - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[fsp_func_name] = history_output - - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff >= 0: - print(f"WTF DIFF SIGNAL to HISTORY {diff}") - for _ in range(diff): - dst.push(history[:1]) - - # compare with source signal and time align - index = dst.push(history) - - await ctx.send_yield(index) - - # setup a respawn handle - with trio.CancelScope() as cs: - task_status.started(cs) - - # rt stream - async for processed in out_stream: - log.debug(f"{fsp_func_name}: {processed}") - index = src.index - dst.array[-1][fsp_func_name] = processed - - # stream latest shm array index entry - await ctx.send_yield(index) - last_len = new_len = len(src.array) + fsp_target = partial( + fsp_compute, + ctx=ctx, + symbol=symbol, + feed=feed, + + src=src, + dst=dst, + + fsp_func_name=fsp_func_name, + func=func + ) + async with trio.open_nursery() as n: - cs = await n.start(fsp_compute) + cs = await n.start(fsp_target) # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. @@ -176,7 +210,7 @@ async def cascade( # respawn the signal compute task if the source # signal has been updated cs.cancel() - cs = await n.start(fsp_compute) + cs = await n.start(fsp_target) # TODO: adopt an incremental update engine/approach # where possible here eventually!