Passthrough loglevel to fsp actor

fsp_feeds
Tyler Goodlet 2021-09-20 16:36:11 -04:00
parent 4227b2e7a0
commit 547f6692d6
1 changed files with 19 additions and 5 deletions

View File

@ -18,14 +18,14 @@
Financial signal processing for the peeps. Financial signal processing for the peeps.
""" """
from functools import partial from functools import partial
from typing import AsyncIterator, Callable, Tuple from typing import AsyncIterator, Callable, Tuple, Optional
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
import numpy as np import numpy as np
from ..log import get_logger from ..log import get_logger, get_console_log
from .. import data from .. import data
from ._momo import _rsi, _wma from ._momo import _rsi, _wma
from ._volume import _tina_vwap from ._volume import _tina_vwap
@ -134,7 +134,7 @@ async def fsp_compute(
# check for data length mis-allignment and fill missing values # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history) diff = len(src.array) - len(history)
if diff >= 0: if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}") log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff): for _ in range(diff):
dst.push(history[:1]) dst.push(history[:1])
@ -149,6 +149,12 @@ async def fsp_compute(
# rt stream # rt stream
async for processed in out_stream: async for processed in out_stream:
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
log.debug(f"{fsp_func_name}: {processed}") log.debug(f"{fsp_func_name}: {processed}")
index = src.index index = src.index
dst.array[-1][fsp_func_name] = processed dst.array[-1][fsp_func_name] = processed
@ -165,12 +171,16 @@ async def cascade(
dst_shm_token: Tuple[str, np.dtype], dst_shm_token: Tuple[str, np.dtype],
symbol: str, symbol: str,
fsp_func_name: str, fsp_func_name: str,
loglevel: Optional[str] = None,
) -> None: ) -> None:
"""Chain streaming signal processors and deliver output to '''Chain streaming signal processors and deliver output to
destination mem buf. destination mem buf.
""" '''
if loglevel:
get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token) src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token)
@ -180,6 +190,10 @@ async def cascade(
async with data.feed.maybe_open_feed( async with data.feed.maybe_open_feed(
brokername, brokername,
[symbol], [symbol],
# TODO:
# tick_throttle=60,
) as (feed, stream): ) as (feed, stream):
assert src.token == feed.shm.token assert src.token == feed.shm.token