Merge pull request #131 from pikers/vwap_fsp

Vwap fsp
kraken_history
goodboy 2020-12-29 15:43:38 -05:00 committed by GitHub
commit 6166e5900e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 250 additions and 78 deletions

View File

@ -192,6 +192,7 @@ class Client:
# barSizeSetting='5 secs', # barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count), durationStr='{count} S'.format(count=period_count),
# barSizeSetting='5 secs',
barSizeSetting='1 secs', barSizeSetting='1 secs',
# barSizeSetting='1 min', # barSizeSetting='1 min',
@ -328,8 +329,9 @@ class Client:
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0] contract = (await self.ib.qualifyContractsAsync(con))[0]
head = await self.get_head_time(contract) # head = await self.get_head_time(contract)
print(head) # print(head)
except IndexError: except IndexError:
raise ValueError(f"No contract could be found {con}") raise ValueError(f"No contract could be found {con}")
return contract return contract
@ -617,9 +619,11 @@ async def activate_writer(key: str) -> (bool, trio.Nursery):
async def fill_bars( async def fill_bars(
first_bars, sym: str,
shm, first_bars: list,
shm: 'ShmArray', # type: ignore # noqa
count: int = 21, count: int = 21,
# count: int = 1,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
@ -635,9 +639,7 @@ async def fill_bars(
try: try:
bars, bars_array = await _trio_run_client_method( bars, bars_array = await _trio_run_client_method(
method='bars', method='bars',
symbol='.'.join( symbol=sym,
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt, end_dt=next_dt,
) )
@ -723,7 +725,7 @@ async def stream_quotes(
# TODO: generalize this for other brokers # TODO: generalize this for other brokers
# start bar filler task in bg # start bar filler task in bg
ln.start_soon(fill_bars, bars, shm) ln.start_soon(fill_bars, sym, bars, shm)
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
@ -808,7 +810,7 @@ async def stream_quotes(
['open', 'high', 'low', 'volume'] ['open', 'high', 'low', 'volume']
] ]
new_v = tick['size'] new_v = tick.get('size', 0)
if v == 0 and new_v: if v == 0 and new_v:
# no trades for this bar yet so the open # no trades for this bar yet so the open

View File

@ -20,18 +20,24 @@ Financial signal processing for the peeps.
from typing import AsyncIterator, Callable, Tuple from typing import AsyncIterator, Callable, Tuple
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
import numpy as np import numpy as np
from ..log import get_logger from ..log import get_logger
from .. import data from .. import data
from ._momo import _rsi from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array, Feed from ..data import attach_shm_array, Feed
log = get_logger(__name__) log = get_logger(__name__)
_fsps = {'rsi': _rsi} _fsps = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
}
async def latency( async def latency(
@ -70,7 +76,7 @@ async def increment_signals(
# write new slot to the buffer # write new slot to the buffer
dst_shm.push(last) dst_shm.push(last)
len(dst_shm.array)
@tractor.stream @tractor.stream
@ -95,66 +101,107 @@ async def cascade(
async with data.open_feed(brokername, [symbol]) as feed: async with data.open_feed(brokername, [symbol]) as feed:
assert src.token == feed.shm.token assert src.token == feed.shm.token
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream): async def fsp_compute(
async for quotes in stream: task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
for symbol, quotes in quotes.items(): ) -> None:
if symbol == sym:
yield quotes
out_stream = func( # TODO: load appropriate fsp with input args
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: XXX: async def filter_by_sym(
# THERE'S A BIG BUG HERE WITH THE `index` field since we're sym: str,
# prepending a copy of the first value a few times to make stream,
# sub-curves align with the parent bar chart. ):
# # task cancellation won't kill the channel
# This likely needs to be fixed either by, with stream.shield():
# - manually assigning the index and historical data async for quotes in stream:
# seperately to the shm array (i.e. not using .push()) for symbol, quotes in quotes.items():
# - developing some system on top of the shared mem array that if symbol == sym:
# is `index` aware such that historical data can be indexed yield quotes
# relative to the true first datum? Not sure if this is sane
# for derivatives.
# Conduct a single iteration of fsp with historical bars input out_stream = func(
# and get historical output filter_by_sym(symbol, feed.stream),
history_output = await out_stream.__anext__() feed.shm,
)
# build a struct array which includes an 'index' field to push # TODO: XXX:
# as history # THERE'S A BIG BUG HERE WITH THE `index` field since we're
history = np.array( # prepending a copy of the first value a few times to make
np.arange(len(history_output)), # sub-curves align with the parent bar chart.
dtype=dst.array.dtype # This likely needs to be fixed either by,
) # - manually assigning the index and historical data
history[fsp_func_name] = history_output # seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# TODO: talk to ``pyqtgraph`` core about proper way to solve this: # Conduct a single iteration of fsp with historical bars input
# XXX: hack to get curves aligned with bars graphics: prepend # and get historical output
# a copy of the first datum.. history_output = await out_stream.__anext__()
# dst.push(history[:1])
# check for data length mis-allignment and fill missing values # build a struct array which includes an 'index' field to push
diff = len(src.array) - len(history) # as history
if diff >= 0: history = np.array(
print(f"WTF DIFFZZZ {diff}") np.arange(len(history_output)),
for _ in range(diff): dtype=dst.array.dtype
dst.push(history[:1]) )
history[fsp_func_name] = history_output
# compare with source signal and time align
index = dst.push(history)
yield index # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon(increment_signals, feed, dst)
async for processed in out_stream: cs = await n.start(fsp_compute)
log.debug(f"{fsp_func_name}: {processed}")
index = src.index # Increment the underlying shared memory buffer on every "increment"
dst.array[-1][fsp_func_name] = processed # msg received from the underlying data feed.
await ctx.send_yield(index)
async for msg in await feed.index_stream():
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
array = dst.array
last = array[-1:].copy()
# write new slot to the buffer
dst.push(last)
last_len = new_len

View File

@ -151,8 +151,8 @@ def wma(
return np.convolve(signal, weights, 'valid') return np.convolve(signal, weights, 'valid')
# @piker.fsp( # @piker.fsp.signal(
# aggregates=[60, 60*5, 60*60, '4H', '1D'], # timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# ) # )
async def _rsi( async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa source: 'QuoteStream[Dict[str, Any]]', # noqa
@ -171,8 +171,8 @@ async def _rsi(
# TODO: the emas here should be seeded with a period SMA as per # TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula.. # wilder's original formula..
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed)
up_ema_last = last_up_ema_close up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close down_ema_last = last_down_ema_close
# deliver history # deliver history
yield rsi_h yield rsi_h

View File

@ -0,0 +1,93 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import AsyncIterator, Optional
import numpy as np
from ..data._normalize import iterticks
def wap(
signal: np.ndarray,
weights: np.ndarray,
) -> np.ndarray:
"""Weighted average price from signal and weights.
"""
cum_weights = np.cumsum(weights)
cum_weighted_input = np.cumsum(signal * weights)
# cum_weighted_input / cum_weights
# but, avoid divide by zero errors
avg = np.divide(
cum_weighted_input,
cum_weights,
where=cum_weights != 0
)
return (
avg,
cum_weighted_input,
cum_weights,
)
async def _tina_vwap(
source, #: AsyncStream[np.ndarray],
ohlcv: np.ndarray, # price time-frame "aware"
anchors: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming volume weighted moving average.
Calling this "tina" for now since we're using HLC3 instead of tick.
"""
if anchors is None:
# TODO:
# anchor to session start of data if possible
pass
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
h_vwap, cum_wp, cum_v = wap(chl3, v)
# deliver historical output as "first yield"
yield h_vwap
w_tot = cum_wp[-1]
v_tot = cum_v[-1]
# vwap_tot = h_vwap[-1]
async for quote in source:
for tick in iterticks(quote, types=['trade']):
# c, h, l, v = ohlcv.array[-1][
# ['closes', 'high', 'low', 'volume']
# ]
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
v_tot += size
w_tot += price * size
# yield ((((o + h + l) / 3) * v) weights_tot) / v_tot
yield w_tot / v_tot

View File

@ -558,7 +558,9 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: see how this handles with custom ohlcv bars graphics # TODO: see how this handles with custom ohlcv bars graphics
# and/or if we can implement something similar for OHLC graphics # and/or if we can implement something similar for OHLC graphics
clipToView=True, # clipToView=True,
autoDownsample=True,
downsampleMethod='subsample',
**pdi_kwargs, **pdi_kwargs,
) )
@ -839,10 +841,6 @@ async def _async_main(
# eventually we'll support some kind of n-compose syntax # eventually we'll support some kind of n-compose syntax
fsp_conf = { fsp_conf = {
# 'vwap': {
# 'overlay': True,
# 'anchor': 'session',
# },
'rsi': { 'rsi': {
'period': 14, 'period': 14,
'chart_kwargs': { 'chart_kwargs': {
@ -852,6 +850,24 @@ async def _async_main(
} }
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and derivatives)
volm = ohlcv.array['volume']
if (
np.all(np.isin(volm, -1)) or
np.all(np.isnan(volm))
):
log.warning(
f"{sym} does not seem to have volume info,"
" dropping volume signals")
else:
fsp_conf.update({
'vwap': {
'overlay': True,
'anchor': 'session',
},
})
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
@ -1099,11 +1115,11 @@ async def spawn_fsps(
print(f'FSP NAME: {fsp_name}') print(f'FSP NAME: {fsp_name}')
portal = await n.run_in_actor( portal = await n.run_in_actor(
# name as title of sub-chart
display_name,
# subactor entrypoint # subactor entrypoint
fsp.cascade, fsp.cascade,
# name as title of sub-chart
name=display_name,
brokername=brokermod.name, brokername=brokermod.name,
src_shm_token=src_shm.token, src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token, dst_shm_token=conf['shm'].token,
@ -1221,9 +1237,23 @@ async def update_signals(
# update chart graphics # update chart graphics
async for value in stream: async for value in stream:
# read last # TODO: provide a read sync mechanism to avoid this polling.
array = shm.array # the underlying issue is that a backfill and subsequent shm
value = array[-1][fsp_func_name] # array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky: if last_val_sticky:
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)