Add signal backfilling via trio task respawn

vwap_fsp
Tyler Goodlet 2020-12-16 12:30:40 -05:00
parent e89d3f9560
commit c01382294e
3 changed files with 119 additions and 54 deletions

View File

@ -20,6 +20,7 @@ Financial signal processing for the peeps.
from typing import AsyncIterator, Callable, Tuple from typing import AsyncIterator, Callable, Tuple
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
import numpy as np import numpy as np
@ -75,6 +76,7 @@ async def increment_signals(
# write new slot to the buffer # write new slot to the buffer
dst_shm.push(last) dst_shm.push(last)
len(dst_shm.array)
@tractor.stream @tractor.stream
@ -99,60 +101,107 @@ async def cascade(
async with data.open_feed(brokername, [symbol]) as feed: async with data.open_feed(brokername, [symbol]) as feed:
assert src.token == feed.shm.token assert src.token == feed.shm.token
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream): async def fsp_compute(
async for quotes in stream: task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
for symbol, quotes in quotes.items(): ) -> None:
if symbol == sym:
yield quotes
out_stream = func( # TODO: load appropriate fsp with input args
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: XXX: async def filter_by_sym(
# THERE'S A BIG BUG HERE WITH THE `index` field since we're sym: str,
# prepending a copy of the first value a few times to make stream,
# sub-curves align with the parent bar chart. ):
# This likely needs to be fixed either by, # task cancellation won't kill the channel
# - manually assigning the index and historical data async with stream.shield():
# seperately to the shm array (i.e. not using .push()) async for quotes in stream:
# - developing some system on top of the shared mem array that for symbol, quotes in quotes.items():
# is `index` aware such that historical data can be indexed if symbol == sym:
# relative to the true first datum? Not sure if this is sane yield quotes
# for derivatives.
# Conduct a single iteration of fsp with historical bars input out_stream = func(
# and get historical output filter_by_sym(symbol, feed.stream),
history_output = await out_stream.__anext__() feed.shm,
)
# build a struct array which includes an 'index' field to push # TODO: XXX:
# as history # THERE'S A BIG BUG HERE WITH THE `index` field since we're
history = np.array( # prepending a copy of the first value a few times to make
np.arange(len(history_output)), # sub-curves align with the parent bar chart.
dtype=dst.array.dtype # This likely needs to be fixed either by,
) # - manually assigning the index and historical data
history[fsp_func_name] = history_output # seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# check for data length mis-allignment and fill missing values # Conduct a single iteration of fsp with historical bars input
diff = len(src.array) - len(history) # and get historical output
if diff >= 0: history_output = await out_stream.__anext__()
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align # build a struct array which includes an 'index' field to push
index = dst.push(history) # as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
yield index
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon(increment_signals, feed, dst)
async for processed in out_stream: cs = await n.start(fsp_compute)
log.debug(f"{fsp_func_name}: {processed}")
index = src.index # Increment the underlying shared memory buffer on every "increment"
dst.array[-1][fsp_func_name] = processed # msg received from the underlying data feed.
await ctx.send_yield(index)
async for msg in await feed.index_stream():
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
array = dst.array
last = array[-1:].copy()
# write new slot to the buffer
dst.push(last)
last_len = new_len

View File

@ -151,8 +151,8 @@ def wma(
return np.convolve(signal, weights, 'valid') return np.convolve(signal, weights, 'valid')
# @piker.fsp( # @piker.fsp.signal(
# aggregates=[60, 60*5, 60*60, '4H', '1D'], # timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# ) # )
async def _rsi( async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa source: 'QuoteStream[Dict[str, Any]]', # noqa
@ -171,8 +171,8 @@ async def _rsi(
# TODO: the emas here should be seeded with a period SMA as per # TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula.. # wilder's original formula..
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed)
up_ema_last = last_up_ema_close up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close down_ema_last = last_down_ema_close
# deliver history # deliver history
yield rsi_h yield rsi_h

View File

@ -558,7 +558,9 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: see how this handles with custom ohlcv bars graphics # TODO: see how this handles with custom ohlcv bars graphics
# and/or if we can implement something similar for OHLC graphics # and/or if we can implement something similar for OHLC graphics
clipToView=True, # clipToView=True,
autoDownsample=True,
downsampleMethod='subsample',
**pdi_kwargs, **pdi_kwargs,
) )
@ -1221,9 +1223,23 @@ async def update_signals(
# update chart graphics # update chart graphics
async for value in stream: async for value in stream:
# read last # TODO: provide a read sync mechanism to avoid this polling.
array = shm.array # the underlying issue is that a backfill and subsequent shm
value = array[-1][fsp_func_name] # array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky: if last_val_sticky:
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)