piker/piker/fsp/_volume.py

274 lines
7.4 KiB
Python
Raw Normal View History

2020-11-11 01:16:24 +00:00
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of pikers)
2020-11-11 01:16:24 +00:00
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2021-09-28 20:39:11 +00:00
from typing import AsyncIterator, Optional, Union
2020-11-11 01:16:24 +00:00
import numpy as np
2021-10-22 16:13:08 +00:00
from tractor.trionics._broadcast import AsyncReceiver
2020-11-11 01:16:24 +00:00
from ._api import fsp
2020-11-11 01:16:24 +00:00
from ..data._normalize import iterticks
2021-09-28 20:39:11 +00:00
from ..data._sharedmem import ShmArray
2020-11-11 01:16:24 +00:00
def wap(
2021-09-28 20:39:11 +00:00
signal: np.ndarray,
weights: np.ndarray,
2021-09-28 20:39:11 +00:00
) -> np.ndarray:
'''
Weighted average price from signal and weights.
'''
cum_weights = np.cumsum(weights)
cum_weighted_input = np.cumsum(signal * weights)
# cum_weighted_input / cum_weights
# but, avoid divide by zero errors
avg = np.divide(
cum_weighted_input,
cum_weights,
where=cum_weights != 0
)
return (
avg,
cum_weighted_input,
cum_weights,
)
@fsp
async def tina_vwap(
2021-09-28 20:39:11 +00:00
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
# TODO: anchor logic (eg. to session start)
anchors: Optional[np.ndarray] = None,
2021-09-28 20:39:11 +00:00
) -> Union[
AsyncIterator[np.ndarray],
float
]:
'''
Streaming volume weighted moving average.
2020-11-11 01:16:24 +00:00
Calling this "tina" for now since we're using HLC3 instead of tick.
2020-11-11 01:16:24 +00:00
2021-09-28 20:39:11 +00:00
'''
if anchors is None:
# TODO:
# anchor to session start of data if possible
pass
2020-11-11 01:16:24 +00:00
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
2020-11-11 01:16:24 +00:00
v = a['volume']
h_vwap, cum_wp, cum_v = wap(chl3, v)
2020-11-11 01:16:24 +00:00
# deliver historical output as "first yield"
yield h_vwap
2020-11-11 01:16:24 +00:00
w_tot = cum_wp[-1]
2020-11-11 01:16:24 +00:00
v_tot = cum_v[-1]
# vwap_tot = h_vwap[-1]
2020-11-11 01:16:24 +00:00
async for quote in source:
for tick in iterticks(quote, types=['trade']):
# c, h, l, v = ohlcv.array[-1][
# ['closes', 'high', 'low', 'volume']
# ]
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
v_tot += size
w_tot += price * size
2020-11-11 01:16:24 +00:00
# yield ((((o + h + l) / 3) * v) weights_tot) / v_tot
yield 'tina_vwap', w_tot / v_tot
2021-09-28 20:39:11 +00:00
@fsp(
outputs=('dolla_vlm', 'dark_vlm'),
curve_style='step',
)
2021-09-28 20:39:11 +00:00
async def dolla_vlm(
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
) -> AsyncIterator[
tuple[str, Union[np.ndarray, float]],
2021-09-28 20:39:11 +00:00
]:
2021-12-22 13:35:00 +00:00
'''
"Dollar Volume", aka the volume in asset-currency-units (usually
a fiat) computed from some price function for the sample step
*multiplied* (*) by the asset unit volume.
2021-12-22 13:35:00 +00:00
Useful for comparing cross asset "money flow" in #s that are
asset-currency-independent.
'''
2021-09-28 20:39:11 +00:00
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
from ._momo import wma
# on first iteration yield history
yield {
'dolla_vlm': chl3 * v,
'dark_vlm': None,
}
2021-09-28 20:39:11 +00:00
i = ohlcv.index
output = vlm = 0
dvlm = 0
2021-09-28 20:39:11 +00:00
async for quote in source:
for tick in iterticks(
quote,
types=(
'trade',
'dark_trade',
),
deduplicate_darks=True,
):
2021-09-28 20:39:11 +00:00
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
li = ohlcv.index
if li > i:
i = li
vlm = 0
dvlm = 0
2021-09-28 20:39:11 +00:00
# TODO: for marginned instruments (futes, etfs?) we need to
# show the margin $vlm by multiplying by whatever multiplier
# is reported in the sym info.
2021-09-28 20:39:11 +00:00
ttype = tick.get('type')
if ttype == 'dark_trade':
# print(f'dark_trade: {tick}')
key = 'dark_vlm'
dvlm += price * size
output = dvlm
else:
# print(f'vlm: {tick}')
key = 'dolla_vlm'
vlm += price * size
output = vlm
# TODO: plot both to compare?
# c, h, l, v = ohlcv.last()[
# ['close', 'high', 'low', 'volume']
# ][0]
# tina_lvlm = c+h+l/3 * v
2021-09-28 20:39:11 +00:00
# print(f' tinal vlm: {tina_lvlm}')
yield key, output
@fsp(
# TODO: eventually I guess we should support some kinda declarative
# graphics config syntax per output yah? That seems like a clean way
# to let users configure things? Not sure how exactly to offer that
# api as well as how to expose such a thing *inside* the body?
outputs=(
# pulled verbatim from `ib` for now
'1m_trade_rate',
'1m_vlm_rate',
# our own instantaneous rate calcs which are all
# parameterized by a samples count (bars) period
# 'trade_rate',
# 'dark_trade_rate',
# 'dvlm_rate',
# 'dark_dvlm_rate',
),
curve_style='line',
)
async def flow_rates(
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
# TODO (idea): a dynamic generic / boxing type that can be updated by other
# FSPs, user input, and possibly any general event stream in
# real-time. Hint: ideally implemented with caching until mutated
# ;)
period: 'Param[int]' = 16, # noqa
# TODO (idea): a generic for declaring boxed fsps much like ``pytest``
# fixtures? This probably needs a lot of thought if we want to offer
# a higher level composition syntax eventually (oh right gotta make
# an issue for that).
# ideas for how to allow composition / intercalling:
# - offer a `Fsp.get_history()` to do the first yield output?
# * err wait can we just have shm access directly?
# - how would it work if some consumer fsp wanted to dynamically
# change params which are input to the callee fsp? i guess we could
# lazy copy in that case?
# dvlm: 'Fsp[dolla_vlm]'
) -> AsyncIterator[
tuple[str, Union[np.ndarray, float]],
]:
# dvlm_shm = dolla_vlm.get_shm(ohlcv)
# generally no history available prior to real-time calcs
yield {
'1m_trade_rate': None,
'1m_vlm_rate': None,
}
ltr = 0
lvr = 0
# TODO: 3.10 do ``anext()``
quote = await source.__anext__()
tr = quote.get('tradeRate')
yield '1m_trade_rate', tr or 0
vr = quote.get('volumeRate')
yield '1m_vlm_rate', vr or 0
async for quote in source:
if quote:
# XXX: ib specific schema we should
# probably pre-pack ourselves.
tr = quote.get('tradeRate')
if tr is not None and tr != ltr:
2022-02-02 19:03:01 +00:00
# print(f'trade rate: {tr}')
yield '1m_trade_rate', tr
ltr = tr
vr = quote.get('volumeRate')
if vr is not None and vr != lvr:
2022-02-02 19:03:01 +00:00
# print(f'vlm rate: {vr}')
yield '1m_vlm_rate', vr
lvr = vr