From 147207a0ad7d41633ab32b0c58b2994af0529e77 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Sep 2021 16:39:11 -0400 Subject: [PATCH] Add first draft of "dollar volume" fsp --- piker/fsp/_volume.py | 65 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 30397920..8dd93d4f 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -14,16 +14,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import AsyncIterator, Optional +from typing import AsyncIterator, Optional, Union import numpy as np +from tractor._broadcast import AsyncReceiver from ..data._normalize import iterticks +from ..data._sharedmem import ShmArray def wap( + signal: np.ndarray, weights: np.ndarray, + ) -> np.ndarray: """Weighted average price from signal and weights. @@ -47,15 +51,22 @@ def wap( async def _tina_vwap( - source, #: AsyncStream[np.ndarray], - ohlcv: np.ndarray, # price time-frame "aware" + + source: AsyncReceiver[dict], + ohlcv: ShmArray, # OHLC sampled history + + # TODO: anchor logic (eg. to session start) anchors: Optional[np.ndarray] = None, -) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? - """Streaming volume weighted moving average. + +) -> Union[ + AsyncIterator[np.ndarray], + float +]: + '''Streaming volume weighted moving average. Calling this "tina" for now since we're using HLC3 instead of tick. - """ + ''' if anchors is None: # TODO: # anchor to session start of data if possible @@ -75,7 +86,6 @@ async def _tina_vwap( # vwap_tot = h_vwap[-1] async for quote in source: - for tick in iterticks(quote, types=['trade']): # c, h, l, v = ohlcv.array[-1][ @@ -91,3 +101,44 @@ async def _tina_vwap( # yield ((((o + h + l) / 3) * v) weights_tot) / v_tot yield w_tot / v_tot + + +async def dolla_vlm( + source: AsyncReceiver[dict], + ohlcv: ShmArray, # OHLC sampled history + +) -> Union[ + AsyncIterator[np.ndarray], + float +]: + a = ohlcv.array + chl3 = (a['close'] + a['high'] + a['low']) / 3 + v = a['volume'] + + # history + yield chl3 * v + + i = ohlcv.index + lvlm = 0 + + async for quote in source: + for tick in iterticks(quote): + + # this computes tick-by-tick weightings from here forward + size = tick['size'] + price = tick['price'] + + li = ohlcv.index + if li > i: + i = li + lvlm = 0 + + c, h, l, v = ohlcv.last()[ + ['close', 'high', 'low', 'volume'] + ] + + lvlm += price * size + tina_lvlm = c+h+l/3 * v + # print(f' tinal vlm: {tina_lvlm}') + + yield lvlm