208 lines
6.6 KiB
Python
208 lines
6.6 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
Financial signal processing for the peeps.
|
|
"""
|
|
from typing import AsyncIterator, Callable, Tuple
|
|
|
|
import trio
|
|
from trio_typing import TaskStatus
|
|
import tractor
|
|
import numpy as np
|
|
|
|
from ..log import get_logger
|
|
from .. import data
|
|
from ._momo import _rsi, _wma
|
|
from ._volume import _tina_vwap
|
|
from ..data import attach_shm_array, Feed
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
_fsps = {
|
|
'rsi': _rsi,
|
|
'wma': _wma,
|
|
'vwap': _tina_vwap,
|
|
}
|
|
|
|
|
|
async def latency(
|
|
source: 'TickStream[Dict[str, float]]', # noqa
|
|
ohlcv: np.ndarray
|
|
) -> AsyncIterator[np.ndarray]:
|
|
"""Latency measurements, broker to piker.
|
|
"""
|
|
# TODO: do we want to offer yielding this async
|
|
# before the rt data connection comes up?
|
|
|
|
# deliver zeros for all prior history
|
|
yield np.zeros(len(ohlcv))
|
|
|
|
async for quote in source:
|
|
ts = quote.get('broker_ts')
|
|
if ts:
|
|
# This is codified in the per-broker normalization layer
|
|
# TODO: Add more measure points and diffs for full system
|
|
# stack tracing.
|
|
value = quote['brokerd_ts'] - quote['broker_ts']
|
|
yield value
|
|
|
|
|
|
async def increment_signals(
|
|
feed: Feed,
|
|
dst_shm: 'SharedArray', # noqa
|
|
) -> None:
|
|
"""Increment the underlying shared memory buffer on every "increment"
|
|
msg received from the underlying data feed.
|
|
|
|
"""
|
|
async for msg in await feed.index_stream():
|
|
array = dst_shm.array
|
|
last = array[-1:].copy()
|
|
|
|
# write new slot to the buffer
|
|
dst_shm.push(last)
|
|
len(dst_shm.array)
|
|
|
|
|
|
@tractor.stream
|
|
async def cascade(
|
|
ctx: tractor.Context,
|
|
brokername: str,
|
|
src_shm_token: dict,
|
|
dst_shm_token: Tuple[str, np.dtype],
|
|
symbol: str,
|
|
fsp_func_name: str,
|
|
) -> AsyncIterator[dict]:
|
|
"""Chain streaming signal processors and deliver output to
|
|
destination mem buf.
|
|
|
|
"""
|
|
src = attach_shm_array(token=src_shm_token)
|
|
dst = attach_shm_array(readonly=False, token=dst_shm_token)
|
|
|
|
func: Callable = _fsps[fsp_func_name]
|
|
|
|
# open a data feed stream with requested broker
|
|
async with data.open_feed(brokername, [symbol]) as feed:
|
|
|
|
assert src.token == feed.shm.token
|
|
|
|
async def fsp_compute(
|
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
|
) -> None:
|
|
|
|
# TODO: load appropriate fsp with input args
|
|
|
|
async def filter_by_sym(
|
|
sym: str,
|
|
stream,
|
|
):
|
|
# task cancellation won't kill the channel
|
|
with stream.shield():
|
|
async for quotes in stream:
|
|
for symbol, quotes in quotes.items():
|
|
if symbol == sym:
|
|
yield quotes
|
|
|
|
out_stream = func(
|
|
filter_by_sym(symbol, feed.stream),
|
|
feed.shm,
|
|
)
|
|
|
|
# TODO: XXX:
|
|
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
|
|
# prepending a copy of the first value a few times to make
|
|
# sub-curves align with the parent bar chart.
|
|
# This likely needs to be fixed either by,
|
|
# - manually assigning the index and historical data
|
|
# seperately to the shm array (i.e. not using .push())
|
|
# - developing some system on top of the shared mem array that
|
|
# is `index` aware such that historical data can be indexed
|
|
# relative to the true first datum? Not sure if this is sane
|
|
# for incremental compuations.
|
|
dst._first.value = src._first.value
|
|
dst._last.value = src._first.value
|
|
|
|
# Conduct a single iteration of fsp with historical bars input
|
|
# and get historical output
|
|
history_output = await out_stream.__anext__()
|
|
|
|
# build a struct array which includes an 'index' field to push
|
|
# as history
|
|
history = np.array(
|
|
np.arange(len(history_output)),
|
|
dtype=dst.array.dtype
|
|
)
|
|
history[fsp_func_name] = history_output
|
|
|
|
|
|
# check for data length mis-allignment and fill missing values
|
|
diff = len(src.array) - len(history)
|
|
if diff >= 0:
|
|
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
|
|
for _ in range(diff):
|
|
dst.push(history[:1])
|
|
|
|
# compare with source signal and time align
|
|
index = dst.push(history)
|
|
|
|
await ctx.send_yield(index)
|
|
|
|
# setup a respawn handle
|
|
with trio.CancelScope() as cs:
|
|
task_status.started(cs)
|
|
|
|
# rt stream
|
|
async for processed in out_stream:
|
|
log.debug(f"{fsp_func_name}: {processed}")
|
|
index = src.index
|
|
dst.array[-1][fsp_func_name] = processed
|
|
|
|
# stream latest shm array index entry
|
|
await ctx.send_yield(index)
|
|
|
|
last_len = new_len = len(src.array)
|
|
|
|
async with trio.open_nursery() as n:
|
|
|
|
cs = await n.start(fsp_compute)
|
|
|
|
# Increment the underlying shared memory buffer on every "increment"
|
|
# msg received from the underlying data feed.
|
|
|
|
async for msg in await feed.index_stream():
|
|
|
|
new_len = len(src.array)
|
|
|
|
if new_len > last_len + 1:
|
|
# respawn the signal compute task if the source
|
|
# signal has been updated
|
|
cs.cancel()
|
|
cs = await n.start(fsp_compute)
|
|
|
|
# TODO: adopt an incremental update engine/approach
|
|
# where possible here eventually!
|
|
|
|
array = dst.array
|
|
last = array[-1:].copy()
|
|
|
|
# write new slot to the buffer
|
|
dst.push(last)
|
|
|
|
last_len = new_len
|