diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 9e9f1370..e246e0b7 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -21,10 +21,11 @@ core task logic for processing chains from functools import partial from typing import AsyncIterator, Callable, Optional +import numpy as np +import pyqtgraph as pg import trio from trio_typing import TaskStatus import tractor -import numpy as np from ..log import get_logger, get_console_log from .. import data @@ -61,8 +62,6 @@ async def filter_quotes_by_sym( quote = quotes.get(sym) if quote: yield quote - # for symbol, quote in quotes.items(): - # if symbol == sym: async def fsp_compute( @@ -78,11 +77,15 @@ async def fsp_compute( func_name: str, func: Callable, + attach_stream: bool = False, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: - # TODO: load appropriate fsp with input args + profiler = pg.debug.Profiler( + delayed=False, + disabled=True + ) out_stream = func( @@ -94,6 +97,21 @@ async def fsp_compute( feed.shm, ) + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() + + # await tractor.breakpoint() + profiler(f'{func_name} generated history') + + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[func_name] = history_output + # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're # prepending a copy of the first value a few times to make @@ -108,31 +126,25 @@ async def fsp_compute( dst._first.value = src._first.value dst._last.value = src._first.value - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() - - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[func_name] = history_output - + # compare with source signal and time align # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff > 0: - log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") + log.warning(f"WTF DIFF fsp to ohlc history {diff}") for _ in range(diff): dst.push(history[:1]) - # compare with source signal and time align - index = dst.push(history) + # TODO: can we use this `start` flag instead of the manual + # setting above? + index = dst.push(history) #, start=src._first.value) + + profiler(f'{func_name} pushed history') + profiler.finish() # setup a respawn handle with trio.CancelScope() as cs: task_status.started((cs, index)) + profiler(f'{func_name} yield last index') import time last = time.time() @@ -140,20 +152,21 @@ async def fsp_compute( # rt stream async for processed in out_stream: - period = time.time() - last - hz = 1/period if period else float('nan') - if hz > 60: - log.info(f'FSP quote too fast: {hz}') - log.debug(f"{func_name}: {processed}") index = src.index dst.array[-1][func_name] = processed + # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts # as trigger msg to tell the consumer to read from shm - await stream.send(index) + if attach_stream: + await stream.send(index) - last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() @tractor.context @@ -175,6 +188,8 @@ async def cascade( destination mem buf. ''' + profiler = pg.debug.Profiler(delayed=False, disabled=False) + if loglevel: get_console_log(loglevel) @@ -199,6 +214,8 @@ async def cascade( ) as (feed, quote_stream): + profiler(f'{func_name}: feed up') + assert src.token == feed.shm.token last_len = new_len = len(src.array) @@ -225,11 +242,16 @@ async def cascade( cs, index = await n.start(fsp_target) await ctx.started(index) + profiler(f'{func_name}: fsp up') # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. async with feed.index_stream() as stream: + + profiler(f'{func_name}: sample stream up') + profiler.finish() + async for msg in stream: new_len = len(src.array) @@ -246,6 +268,8 @@ async def cascade( # read out last shm row, copy and write new row array = dst.array + # TODO: some signals, like vlm should be reset to + # zero every step. last = array[-1:].copy() dst.push(last) last_len = new_len