Expose dollar volume to fsp engine

It can now be declared inside an fsp config dict under the name
`dolla_vlm`. We still need to offer an engine control that zeros
the newest sample value instead of copying from the previous.

This also litters the engine code with `pyqtgraph` profiling to see if
we can improve startup times - likely it'll mean pre-allocating a small
fsp daemon cluster at startup.
vlm_plotz_backup
Tyler Goodlet 2021-09-30 07:41:09 -04:00
parent 614bb1717b
commit 2a723ac994
1 changed files with 52 additions and 27 deletions

View File

@ -21,10 +21,11 @@ core task logic for processing chains
from functools import partial from functools import partial
from typing import AsyncIterator, Callable, Optional from typing import AsyncIterator, Callable, Optional
import numpy as np
import pyqtgraph as pg
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
import numpy as np
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .. import data from .. import data
@ -32,7 +33,7 @@ from ..data import attach_shm_array
from ..data.feed import Feed from ..data.feed import Feed
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ._momo import _rsi, _wma from ._momo import _rsi, _wma
from ._volume import _tina_vwap from ._volume import _tina_vwap, dolla_vlm
log = get_logger(__name__) log = get_logger(__name__)
@ -40,6 +41,7 @@ _fsp_builtins = {
'rsi': _rsi, 'rsi': _rsi,
'wma': _wma, 'wma': _wma,
'vwap': _tina_vwap, 'vwap': _tina_vwap,
'dolla_vlm': dolla_vlm,
} }
@ -61,8 +63,6 @@ async def filter_quotes_by_sym(
quote = quotes.get(sym) quote = quotes.get(sym)
if quote: if quote:
yield quote yield quote
# for symbol, quote in quotes.items():
# if symbol == sym:
async def fsp_compute( async def fsp_compute(
@ -78,11 +78,15 @@ async def fsp_compute(
func_name: str, func_name: str,
func: Callable, func: Callable,
attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# TODO: load appropriate fsp with input args profiler = pg.debug.Profiler(
delayed=False,
disabled=True
)
out_stream = func( out_stream = func(
@ -94,6 +98,21 @@ async def fsp_compute(
feed.shm, feed.shm,
) )
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# await tractor.breakpoint()
profiler(f'{func_name} generated history')
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[func_name] = history_output
# TODO: XXX: # TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're # THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make # prepending a copy of the first value a few times to make
@ -108,31 +127,25 @@ async def fsp_compute(
dst._first.value = src._first.value dst._first.value = src._first.value
dst._last.value = src._first.value dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input # compare with source signal and time align
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[func_name] = history_output
# check for data length mis-allignment and fill missing values # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history) diff = len(src.array) - len(history)
if diff > 0: if diff > 0:
log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") log.warning(f"WTF DIFF fsp to ohlc history {diff}")
for _ in range(diff): for _ in range(diff):
dst.push(history[:1]) dst.push(history[:1])
# compare with source signal and time align # TODO: can we use this `start` flag instead of the manual
index = dst.push(history) # setting above?
index = dst.push(history) #, start=src._first.value)
profiler(f'{func_name} pushed history')
profiler.finish()
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
task_status.started((cs, index)) task_status.started((cs, index))
profiler(f'{func_name} yield last index')
import time import time
last = time.time() last = time.time()
@ -140,20 +153,21 @@ async def fsp_compute(
# rt stream # rt stream
async for processed in out_stream: async for processed in out_stream:
period = time.time() - last
hz = 1/period if period else float('nan')
if hz > 60:
log.info(f'FSP quote too fast: {hz}')
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
index = src.index index = src.index
dst.array[-1][func_name] = processed dst.array[-1][func_name] = processed
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts # stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm # as trigger msg to tell the consumer to read from shm
await stream.send(index) if attach_stream:
await stream.send(index)
last = time.time() # period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
@tractor.context @tractor.context
@ -175,6 +189,8 @@ async def cascade(
destination mem buf. destination mem buf.
''' '''
profiler = pg.debug.Profiler(delayed=False, disabled=False)
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
@ -199,6 +215,8 @@ async def cascade(
) as (feed, quote_stream): ) as (feed, quote_stream):
profiler(f'{func_name}: feed up')
assert src.token == feed.shm.token assert src.token == feed.shm.token
last_len = new_len = len(src.array) last_len = new_len = len(src.array)
@ -225,11 +243,16 @@ async def cascade(
cs, index = await n.start(fsp_target) cs, index = await n.start(fsp_target)
await ctx.started(index) await ctx.started(index)
profiler(f'{func_name}: fsp up')
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream: async with feed.index_stream() as stream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for msg in stream: async for msg in stream:
new_len = len(src.array) new_len = len(src.array)
@ -246,6 +269,8 @@ async def cascade(
# read out last shm row, copy and write new row # read out last shm row, copy and write new row
array = dst.array array = dst.array
# TODO: some signals, like vlm should be reset to
# zero every step.
last = array[-1:].copy() last = array[-1:].copy()
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len