From ef2be16d9703e8d7da5428c54eee3e2b6b9d58c0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Nov 2020 20:16:24 -0500 Subject: [PATCH 01/10] Add initial tina (ohl3) vwap fsp --- piker/fsp/_volume.py | 60 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 piker/fsp/_volume.py diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py new file mode 100644 index 00000000..138effed --- /dev/null +++ b/piker/fsp/_volume.py @@ -0,0 +1,60 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import AsyncIterator + +import numpy as np + +from ..data._normalize import iterticks + + +async def _tina_vwap( + source, #: AsyncStream[np.ndarray], + ohlcv: np.ndarray, # price time-frame "aware" +) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? + """Streaming volume weighted moving average. + + + Calling this "tina" for now since we're using OHL3 instead of tick. + + """ + # TODO: anchor to session start + + a = ohlcv.array + ohl3 = (a['open'] + a['high'] + a['low']) / 3 + + v = a['volume'] + cum_v = np.cumsum(v) + cum_weights = np.cumsum(ohl3 * v) + + vwap = cum_weights / cum_v + + # deliver historical output as "first yield" + yield vwap + + weights_tot = cum_weights[-1] + v_tot = cum_v[-1] + + async for quote in source: + + for tick in iterticks(quote, types=['trade']): + + o, h, l, v = ohlcv.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + v_tot += v + + yield ((((o + h + l) / 3) * v) + weights_tot) / v_tot From 33515f45cc8fb9c1ca5cd09b23d0680c3cc0297b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Nov 2020 20:18:16 -0500 Subject: [PATCH 02/10] Add vwap to exposed fsp map --- piker/fsp/__init__.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 6e56c667..4b77db0b 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -25,13 +25,18 @@ import numpy as np from ..log import get_logger from .. import data -from ._momo import _rsi +from ._momo import _rsi, _wma +from ._volume import _tina_vwap from ..data import attach_shm_array, Feed log = get_logger(__name__) -_fsps = {'rsi': _rsi} +_fsps = { + 'rsi': _rsi, + 'wma': _wma, + 'vwap': _tina_vwap, +} async def latency( @@ -72,7 +77,6 @@ async def increment_signals( dst_shm.push(last) - @tractor.stream async def cascade( ctx: tractor.Context, @@ -112,7 +116,6 @@ async def cascade( # THERE'S A BIG BUG HERE WITH THE `index` field since we're # prepending a copy of the first value a few times to make # sub-curves align with the parent bar chart. - # # This likely needs to be fixed either by, # - manually assigning the index and historical data # seperately to the shm array (i.e. not using .push()) @@ -125,7 +128,7 @@ async def cascade( # and get historical output history_output = await out_stream.__anext__() - # build a struct array which includes an 'index' field to push + # build a struct array which includes an 'index' field to push # as history history = np.array( np.arange(len(history_output)), @@ -133,15 +136,10 @@ async def cascade( ) history[fsp_func_name] = history_output - # TODO: talk to ``pyqtgraph`` core about proper way to solve this: - # XXX: hack to get curves aligned with bars graphics: prepend - # a copy of the first datum.. - # dst.push(history[:1]) - # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff >= 0: - print(f"WTF DIFFZZZ {diff}") + print(f"WTF DIFF SIGNAL to HISTORY {diff}") for _ in range(diff): dst.push(history[:1]) From 7c7b31ebbe8b9b7f1ed2cfffc9f54e50c46b85a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Nov 2020 12:31:07 -0500 Subject: [PATCH 03/10] Break hist calc into wap func, use hlc3. --- piker/fsp/_volume.py | 51 +++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 138effed..735f3d8b 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -14,47 +14,68 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import AsyncIterator +from typing import AsyncIterator, Optional import numpy as np from ..data._normalize import iterticks +def wap( + signal: np.ndarray, + weights: np.ndarray, +) -> np.ndarray: + """Weighted average price from signal and weights. + + """ + cum_weights = np.cumsum(weights) + cum_weighted_input = np.cumsum(signal * weights) + return cum_weighted_input / cum_weights, cum_weighted_input, cum_weights + + async def _tina_vwap( source, #: AsyncStream[np.ndarray], ohlcv: np.ndarray, # price time-frame "aware" + anchors: Optional[np.ndarray] = None, ) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? """Streaming volume weighted moving average. - Calling this "tina" for now since we're using OHL3 instead of tick. + Calling this "tina" for now since we're using HLC3 instead of tick. """ - # TODO: anchor to session start + if anchors is None: + # TODO: + # anchor to session start of data if possible + pass a = ohlcv.array - ohl3 = (a['open'] + a['high'] + a['low']) / 3 - + chl3 = (a['close'] + a['high'] + a['low']) / 3 v = a['volume'] - cum_v = np.cumsum(v) - cum_weights = np.cumsum(ohl3 * v) - vwap = cum_weights / cum_v + h_vwap, cum_wp, cum_v = wap(chl3, v) # deliver historical output as "first yield" - yield vwap + yield h_vwap - weights_tot = cum_weights[-1] + w_tot = cum_wp[-1] v_tot = cum_v[-1] + # vwap_tot = h_vwap[-1] async for quote in source: for tick in iterticks(quote, types=['trade']): - o, h, l, v = ohlcv.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - v_tot += v + # c, h, l, v = ohlcv.array[-1][ + # ['closes', 'high', 'low', 'volume'] + # ] - yield ((((o + h + l) / 3) * v) + weights_tot) / v_tot + # this computes tick-by-tick weightings from here forward + size = tick['size'] + price = tick['price'] + + v_tot += size + w_tot += price * size + + # yield ((((o + h + l) / 3) * v) weights_tot) / v_tot + yield w_tot / v_tot From e89d3f956047f6eca5b5a8965303f22cd4bb50ce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Dec 2020 17:14:28 -0500 Subject: [PATCH 04/10] Use `numpy.divide()` to avoid divide-by-zero --- piker/fsp/_volume.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 735f3d8b..30397920 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -30,7 +30,20 @@ def wap( """ cum_weights = np.cumsum(weights) cum_weighted_input = np.cumsum(signal * weights) - return cum_weighted_input / cum_weights, cum_weighted_input, cum_weights + + # cum_weighted_input / cum_weights + # but, avoid divide by zero errors + avg = np.divide( + cum_weighted_input, + cum_weights, + where=cum_weights != 0 + ) + + return ( + avg, + cum_weighted_input, + cum_weights, + ) async def _tina_vwap( @@ -40,7 +53,6 @@ async def _tina_vwap( ) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? """Streaming volume weighted moving average. - Calling this "tina" for now since we're using HLC3 instead of tick. """ From c01382294e22e3e3e00d12ce64d48159bd7e9ef3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Dec 2020 12:30:40 -0500 Subject: [PATCH 05/10] Add signal backfilling via trio task respawn --- piker/fsp/__init__.py | 141 ++++++++++++++++++++++++++++-------------- piker/fsp/_momo.py | 8 +-- piker/ui/_chart.py | 24 +++++-- 3 files changed, 119 insertions(+), 54 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 4b77db0b..ec169c42 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -20,6 +20,7 @@ Financial signal processing for the peeps. from typing import AsyncIterator, Callable, Tuple import trio +from trio_typing import TaskStatus import tractor import numpy as np @@ -75,6 +76,7 @@ async def increment_signals( # write new slot to the buffer dst_shm.push(last) + len(dst_shm.array) @tractor.stream @@ -99,60 +101,107 @@ async def cascade( async with data.open_feed(brokername, [symbol]) as feed: assert src.token == feed.shm.token - # TODO: load appropriate fsp with input args - async def filter_by_sym(sym, stream): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes + async def fsp_compute( + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + ) -> None: - out_stream = func( - filter_by_sym(symbol, feed.stream), - feed.shm, - ) + # TODO: load appropriate fsp with input args - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for derivatives. + async def filter_by_sym( + sym: str, + stream, + ): + # task cancellation won't kill the channel + async with stream.shield(): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() + out_stream = func( + filter_by_sym(symbol, feed.stream), + feed.shm, + ) - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[fsp_func_name] = history_output + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + dst._first.value = src._first.value + dst._last.value = src._first.value - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff >= 0: - print(f"WTF DIFF SIGNAL to HISTORY {diff}") - for _ in range(diff): - dst.push(history[:1]) + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() - # compare with source signal and time align - index = dst.push(history) + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[fsp_func_name] = history_output - yield index + + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff >= 0: + print(f"WTF DIFF SIGNAL to HISTORY {diff}") + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + await ctx.send_yield(index) + + # setup a respawn handle + with trio.CancelScope() as cs: + task_status.started(cs) + + # rt stream + async for processed in out_stream: + log.debug(f"{fsp_func_name}: {processed}") + index = src.index + dst.array[-1][fsp_func_name] = processed + + # stream latest shm array index entry + await ctx.send_yield(index) + + last_len = new_len = len(src.array) async with trio.open_nursery() as n: - n.start_soon(increment_signals, feed, dst) - async for processed in out_stream: - log.debug(f"{fsp_func_name}: {processed}") - index = src.index - dst.array[-1][fsp_func_name] = processed - await ctx.send_yield(index) + cs = await n.start(fsp_compute) + + # Increment the underlying shared memory buffer on every "increment" + # msg received from the underlying data feed. + + async for msg in await feed.index_stream(): + + new_len = len(src.array) + + if new_len > last_len + 1: + # respawn the signal compute task if the source + # signal has been updated + cs.cancel() + cs = await n.start(fsp_compute) + + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + + array = dst.array + last = array[-1:].copy() + + # write new slot to the buffer + dst.push(last) + + last_len = new_len diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 13bad728..f8811afa 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -151,8 +151,8 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp( - # aggregates=[60, 60*5, 60*60, '4H', '1D'], +# @piker.fsp.signal( +# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], # ) async def _rsi( source: 'QuoteStream[Dict[str, Any]]', # noqa @@ -171,8 +171,8 @@ async def _rsi( # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) - up_ema_last = last_up_ema_close - down_ema_last = last_down_ema_close + up_ema_last = last_up_ema_close + down_ema_last = last_down_ema_close # deliver history yield rsi_h diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a708a82e..fedfdaa5 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -558,7 +558,9 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: see how this handles with custom ohlcv bars graphics # and/or if we can implement something similar for OHLC graphics - clipToView=True, + # clipToView=True, + autoDownsample=True, + downsampleMethod='subsample', **pdi_kwargs, ) @@ -1221,9 +1223,23 @@ async def update_signals( # update chart graphics async for value in stream: - # read last - array = shm.array - value = array[-1][fsp_func_name] + # TODO: provide a read sync mechanism to avoid this polling. + # the underlying issue is that a backfill and subsequent shm + # array first/last index update could result in an empty array + # read here since the stream is never torn down on the + # re-compute steps. + read_tries = 2 + while read_tries > 0: + + try: + # read last + array = shm.array + value = array[-1][fsp_func_name] + break + + except IndexError: + read_tries -= 1 + continue if last_val_sticky: last_val_sticky.update_from_data(-1, value) From 48da729364ad5bcd9e94a38aab305cf398fe3204 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Dec 2020 09:27:18 -0500 Subject: [PATCH 06/10] Use .shield() meth name from tractor --- piker/fsp/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index ec169c42..c798bf28 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -113,7 +113,7 @@ async def cascade( stream, ): # task cancellation won't kill the channel - async with stream.shield(): + with stream.shield(): async for quotes in stream: for symbol, quotes in quotes.items(): if symbol == sym: From 714c757e3ebf6e12199e52fafdbbe0ea28771d50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Dec 2020 17:01:57 -0500 Subject: [PATCH 07/10] Add vwap back to fsp conf on feature branch --- piker/ui/_chart.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index fedfdaa5..ca127eac 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -841,10 +841,10 @@ async def _async_main( # eventually we'll support some kind of n-compose syntax fsp_conf = { - # 'vwap': { - # 'overlay': True, - # 'anchor': 'session', - # }, + 'vwap': { + 'overlay': True, + 'anchor': 'session', + }, 'rsi': { 'period': 14, 'chart_kwargs': { From bc96c5847c059fd7acf16562e4df494aaa9c7a88 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Dec 2020 13:02:16 -0500 Subject: [PATCH 08/10] Port to new tractor api --- piker/ui/_chart.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index ca127eac..c27f57e0 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1101,11 +1101,11 @@ async def spawn_fsps( print(f'FSP NAME: {fsp_name}') portal = await n.run_in_actor( - # name as title of sub-chart - display_name, - # subactor entrypoint fsp.cascade, + + # name as title of sub-chart + name=display_name, brokername=brokermod.name, src_shm_token=src_shm.token, dst_shm_token=conf['shm'].token, From 42761438f69ae796f788df56925c1b1c9dbcdffe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Dec 2020 18:04:32 -0500 Subject: [PATCH 09/10] Pass backfiller explicit symbol str, don't require volume ticks --- piker/brokers/ib.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 9731e37f..34f2b17d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -192,6 +192,7 @@ class Client: # barSizeSetting='5 secs', durationStr='{count} S'.format(count=period_count), + # barSizeSetting='5 secs', barSizeSetting='1 secs', # barSizeSetting='1 min', @@ -328,8 +329,9 @@ class Client: exch = 'SMART' if not exch else exch contract = (await self.ib.qualifyContractsAsync(con))[0] - head = await self.get_head_time(contract) - print(head) + # head = await self.get_head_time(contract) + # print(head) + except IndexError: raise ValueError(f"No contract could be found {con}") return contract @@ -617,9 +619,11 @@ async def activate_writer(key: str) -> (bool, trio.Nursery): async def fill_bars( - first_bars, - shm, + sym: str, + first_bars: list, + shm: 'ShmArray', # type: ignore # noqa count: int = 21, + # count: int = 1, ) -> None: """Fill historical bars into shared mem / storage afap. @@ -635,9 +639,7 @@ async def fill_bars( try: bars, bars_array = await _trio_run_client_method( method='bars', - symbol='.'.join( - (first_bars.contract.symbol, first_bars.contract.exchange) - ), + symbol=sym, end_dt=next_dt, ) @@ -723,7 +725,7 @@ async def stream_quotes( # TODO: generalize this for other brokers # start bar filler task in bg - ln.start_soon(fill_bars, bars, shm) + ln.start_soon(fill_bars, sym, bars, shm) times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] @@ -808,7 +810,7 @@ async def stream_quotes( ['open', 'high', 'low', 'volume'] ] - new_v = tick['size'] + new_v = tick.get('size', 0) if v == 0 and new_v: # no trades for this bar yet so the open From bd73d02cfb1e0ab2a0520fa92775edf8936ea0a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Dec 2020 13:31:15 -0500 Subject: [PATCH 10/10] Avoid loading volume FSPs on symbols without any data --- piker/ui/_chart.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index c27f57e0..7ba23f5b 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -841,10 +841,6 @@ async def _async_main( # eventually we'll support some kind of n-compose syntax fsp_conf = { - 'vwap': { - 'overlay': True, - 'anchor': 'session', - }, 'rsi': { 'period': 14, 'chart_kwargs': { @@ -854,6 +850,24 @@ async def _async_main( } + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and derivatives) + volm = ohlcv.array['volume'] + if ( + np.all(np.isin(volm, -1)) or + np.all(np.isnan(volm)) + ): + log.warning( + f"{sym} does not seem to have volume info," + " dropping volume signals") + else: + fsp_conf.update({ + 'vwap': { + 'overlay': True, + 'anchor': 'session', + }, + }) + async with trio.open_nursery() as n: # load initial fsp chain (otherwise known as "indicators")