Add profiling to fsp engine

Litter the engine code with `pyqtgraph` profiling to see if we can
improve startup times - likely it'll mean pre-allocating a small fsp
daemon cluster at startup.
fsp_drunken_alignment
Tyler Goodlet 2021-09-30 07:41:09 -04:00
parent d4b00d74f8
commit 2cd594ed35
1 changed files with 50 additions and 26 deletions

View File

@ -21,10 +21,11 @@ core task logic for processing chains
from functools import partial from functools import partial
from typing import AsyncIterator, Callable, Optional from typing import AsyncIterator, Callable, Optional
import numpy as np
import pyqtgraph as pg
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
import numpy as np
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .. import data from .. import data
@ -61,8 +62,6 @@ async def filter_quotes_by_sym(
quote = quotes.get(sym) quote = quotes.get(sym)
if quote: if quote:
yield quote yield quote
# for symbol, quote in quotes.items():
# if symbol == sym:
async def fsp_compute( async def fsp_compute(
@ -78,11 +77,15 @@ async def fsp_compute(
func_name: str, func_name: str,
func: Callable, func: Callable,
attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# TODO: load appropriate fsp with input args profiler = pg.debug.Profiler(
delayed=False,
disabled=True
)
out_stream = func( out_stream = func(
@ -94,6 +97,21 @@ async def fsp_compute(
feed.shm, feed.shm,
) )
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# await tractor.breakpoint()
profiler(f'{func_name} generated history')
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[func_name] = history_output
# TODO: XXX: # TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're # THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make # prepending a copy of the first value a few times to make
@ -108,31 +126,25 @@ async def fsp_compute(
dst._first.value = src._first.value dst._first.value = src._first.value
dst._last.value = src._first.value dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input # compare with source signal and time align
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[func_name] = history_output
# check for data length mis-allignment and fill missing values # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history) diff = len(src.array) - len(history)
if diff > 0: if diff > 0:
log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") log.warning(f"WTF DIFF fsp to ohlc history {diff}")
for _ in range(diff): for _ in range(diff):
dst.push(history[:1]) dst.push(history[:1])
# compare with source signal and time align # TODO: can we use this `start` flag instead of the manual
index = dst.push(history) # setting above?
index = dst.push(history) #, start=src._first.value)
profiler(f'{func_name} pushed history')
profiler.finish()
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
task_status.started((cs, index)) task_status.started((cs, index))
profiler(f'{func_name} yield last index')
import time import time
last = time.time() last = time.time()
@ -140,20 +152,21 @@ async def fsp_compute(
# rt stream # rt stream
async for processed in out_stream: async for processed in out_stream:
period = time.time() - last
hz = 1/period if period else float('nan')
if hz > 60:
log.info(f'FSP quote too fast: {hz}')
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
index = src.index index = src.index
dst.array[-1][func_name] = processed dst.array[-1][func_name] = processed
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts # stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm # as trigger msg to tell the consumer to read from shm
await stream.send(index) if attach_stream:
await stream.send(index)
last = time.time() # period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
@tractor.context @tractor.context
@ -175,6 +188,8 @@ async def cascade(
destination mem buf. destination mem buf.
''' '''
profiler = pg.debug.Profiler(delayed=False, disabled=False)
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
@ -199,6 +214,8 @@ async def cascade(
) as (feed, quote_stream): ) as (feed, quote_stream):
profiler(f'{func_name}: feed up')
assert src.token == feed.shm.token assert src.token == feed.shm.token
last_len = new_len = len(src.array) last_len = new_len = len(src.array)
@ -225,11 +242,16 @@ async def cascade(
cs, index = await n.start(fsp_target) cs, index = await n.start(fsp_target)
await ctx.started(index) await ctx.started(index)
profiler(f'{func_name}: fsp up')
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream: async with feed.index_stream() as stream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for msg in stream: async for msg in stream:
new_len = len(src.array) new_len = len(src.array)
@ -246,6 +268,8 @@ async def cascade(
# read out last shm row, copy and write new row # read out last shm row, copy and write new row
array = dst.array array = dst.array
# TODO: some signals, like vlm should be reset to
# zero every step.
last = array[-1:].copy() last = array[-1:].copy()
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len