diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index a6324bb6..f3f2d5de 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -18,14 +18,14 @@ Financial signal processing for the peeps. """ from functools import partial -from typing import AsyncIterator, Callable, Tuple +from typing import AsyncIterator, Callable, Tuple, Optional import trio from trio_typing import TaskStatus import tractor import numpy as np -from ..log import get_logger +from ..log import get_logger, get_console_log from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap @@ -134,7 +134,7 @@ async def fsp_compute( # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff >= 0: - print(f"WTF DIFF SIGNAL to HISTORY {diff}") + log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") for _ in range(diff): dst.push(history[:1]) @@ -149,6 +149,12 @@ async def fsp_compute( # rt stream async for processed in out_stream: + + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + log.debug(f"{fsp_func_name}: {processed}") index = src.index dst.array[-1][fsp_func_name] = processed @@ -165,12 +171,16 @@ async def cascade( dst_shm_token: Tuple[str, np.dtype], symbol: str, fsp_func_name: str, + loglevel: Optional[str] = None, ) -> None: - """Chain streaming signal processors and deliver output to + '''Chain streaming signal processors and deliver output to destination mem buf. - """ + ''' + if loglevel: + get_console_log(loglevel) + src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) @@ -180,6 +190,10 @@ async def cascade( async with data.feed.maybe_open_feed( brokername, [symbol], + + # TODO: + # tick_throttle=60, + ) as (feed, stream): assert src.token == feed.shm.token