Add first draft of "dollar volume" fsp

windows_testing_volume
Tyler Goodlet 2021-09-28 16:39:11 -04:00
parent dde8697f71
commit 4057209900
1 changed files with 58 additions and 7 deletions

View File

@ -14,16 +14,20 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import AsyncIterator, Optional from typing import AsyncIterator, Optional, Union
import numpy as np import numpy as np
from tractor._broadcast import AsyncReceiver
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._sharedmem import ShmArray
def wap( def wap(
signal: np.ndarray, signal: np.ndarray,
weights: np.ndarray, weights: np.ndarray,
) -> np.ndarray: ) -> np.ndarray:
"""Weighted average price from signal and weights. """Weighted average price from signal and weights.
@ -47,15 +51,22 @@ def wap(
async def _tina_vwap( async def _tina_vwap(
source, #: AsyncStream[np.ndarray],
ohlcv: np.ndarray, # price time-frame "aware" source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
# TODO: anchor logic (eg. to session start)
anchors: Optional[np.ndarray] = None, anchors: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming volume weighted moving average. ) -> Union[
AsyncIterator[np.ndarray],
float
]:
'''Streaming volume weighted moving average.
Calling this "tina" for now since we're using HLC3 instead of tick. Calling this "tina" for now since we're using HLC3 instead of tick.
""" '''
if anchors is None: if anchors is None:
# TODO: # TODO:
# anchor to session start of data if possible # anchor to session start of data if possible
@ -75,7 +86,6 @@ async def _tina_vwap(
# vwap_tot = h_vwap[-1] # vwap_tot = h_vwap[-1]
async for quote in source: async for quote in source:
for tick in iterticks(quote, types=['trade']): for tick in iterticks(quote, types=['trade']):
# c, h, l, v = ohlcv.array[-1][ # c, h, l, v = ohlcv.array[-1][
@ -91,3 +101,44 @@ async def _tina_vwap(
# yield ((((o + h + l) / 3) * v) weights_tot) / v_tot # yield ((((o + h + l) / 3) * v) weights_tot) / v_tot
yield w_tot / v_tot yield w_tot / v_tot
async def dolla_vlm(
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
) -> Union[
AsyncIterator[np.ndarray],
float
]:
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
# history
yield chl3 * v
i = ohlcv.index
lvlm = 0
async for quote in source:
for tick in iterticks(quote):
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
li = ohlcv.index
if li > i:
i = li
lvlm = 0
c, h, l, v = ohlcv.last()[
['close', 'high', 'low', 'volume']
]
lvlm += price * size
tina_lvlm = c+h+l/3 * v
# print(f' tinal vlm: {tina_lvlm}')
yield lvlm