diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index aaded701..1170fcc9 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -22,8 +22,15 @@ from tractor.trionics._broadcast import AsyncReceiver from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray +from ._momo import _wma +from ..log import get_logger + +log = get_logger(__name__) +# NOTE: is the same as our `wma` fsp, and if so which one is faster? +# Ohhh, this is an IIR style i think? So it has an anchor point +# effectively instead of a moving window/FIR style? def wap( signal: np.ndarray, @@ -131,8 +138,6 @@ async def dolla_vlm( chl3 = (a['close'] + a['high'] + a['low']) / 3 v = a['volume'] - from ._momo import wma - # on first iteration yield history yield { 'dolla_vlm': chl3 * v, @@ -203,11 +208,11 @@ async def dolla_vlm( # our own instantaneous rate calcs which are all # parameterized by a samples count (bars) period - # 'trade_rate', - # 'dark_trade_rate', + 'trade_rate', + 'dark_trade_rate', - # 'dvlm_rate', - # 'dark_dvlm_rate', + 'dvlm_rate', + 'dark_dvlm_rate', ), curve_style='line', ) @@ -236,13 +241,17 @@ async def flow_rates( ) -> AsyncIterator[ tuple[str, Union[np.ndarray, float]], ]: - - # dvlm_shm = dolla_vlm.get_shm(ohlcv) - # generally no history available prior to real-time calcs yield { + # from ib '1m_trade_rate': None, '1m_vlm_rate': None, + + 'trade_rate': None, + 'dark_trade_rate': None, + + 'dvlm_rate': None, + 'dark_dvlm_rate': None, } ltr = 0 @@ -255,19 +264,50 @@ async def flow_rates( vr = quote.get('volumeRate') yield '1m_vlm_rate', vr or 0 + # NOTE: in theory we could dynamically allocate a cascade based on + # this call but not sure if that's too "dynamic" in terms of + # validating cascade flows from message typing perspective. + + # attach to ``dolla_vlm`` fsp running + # on this same source flow. + dvlm_shm = dolla_vlm.get_shm(ohlcv) + + # precompute arithmetic mean weights (all ones) + seq = np.full((period,), 1) + weights = seq / seq.sum() + async for quote in source: - if quote: + if not quote: + log.error("OH WTF NO QUOTE IN FSP") + continue - # XXX: ib specific schema we should - # probably pre-pack ourselves. - tr = quote.get('tradeRate') - if tr is not None and tr != ltr: - # print(f'trade rate: {tr}') - yield '1m_trade_rate', tr - ltr = tr + dvlm_wma = _wma( + dvlm_shm.array['dolla_vlm'], + period, + weights=weights, + ) + yield 'dvlm_rate', dvlm_wma[-1] - vr = quote.get('volumeRate') - if vr is not None and vr != lvr: - # print(f'vlm rate: {vr}') - yield '1m_vlm_rate', vr - lvr = vr + # TODO: skip this if no dark vlm is declared + # by symbol info (eg. in crypto$) + dark_dvlm_wma = _wma( + dvlm_shm.array['dark_vlm'], + period, + weights=weights, + ) + yield 'dark_dvlm_rate', dark_dvlm_wma[-1] + + # XXX: ib specific schema we should + # probably pre-pack ourselves. + # tr = quote.get('tradeRate') + # if tr is not None and tr != ltr: + # # print(f'trade rate: {tr}') + # yield '1m_trade_rate', tr + # ltr = tr + + # # TODO: we *could* do an ohlc3 + # vr = quote.get('volumeRate') + # if vr is not None and vr != lvr: + # # print(f'vlm rate: {vr}') + # yield '1m_vlm_rate', vr + # lvr = vr