diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 7cf7d7b4..e4928dd1 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -108,7 +108,6 @@ async def tina_vwap( @fsp( outputs=('dolla_vlm', 'dark_vlm'), - ohlc=False, curve_style='step', ) async def dolla_vlm( @@ -132,14 +131,24 @@ async def dolla_vlm( v = a['volume'] # on first iteration yield history - yield chl3 * v + yield { + 'dolla_vlm': chl3 * v, + 'dark_vlm': None, + } i = ohlcv.index output = vlm = 0 dvlm = 0 async for quote in source: - for tick in iterticks(quote): + for tick in iterticks( + quote, + types=( + 'trade', + 'dark_trade', + ), + deduplicate_darks=True, + ): # this computes tick-by-tick weightings from here forward size = tick['size'] @@ -156,13 +165,15 @@ async def dolla_vlm( # is reported in the sym info. ttype = tick.get('type') + if ttype == 'dark_trade': - print(f'dark_trade: {tick}') + # print(f'dark_trade: {tick}') key = 'dark_vlm' dvlm += price * size output = dvlm else: + # print(f'vlm: {tick}') key = 'dolla_vlm' vlm += price * size output = vlm @@ -175,3 +186,41 @@ async def dolla_vlm( # print(f' tinal vlm: {tina_lvlm}') yield key, output + + +@fsp( + outputs=( + '1m_trade_rate', + '1m_vlm_rate', + ), + curve_style='step', +) +async def flow_rates( + source: AsyncReceiver[dict], + ohlcv: ShmArray, # OHLC sampled history + +) -> AsyncIterator[ + tuple[str, Union[np.ndarray, float]], +]: + # generally no history available prior to real-time calcs + yield { + '1m_trade_rate': None, + '1m_vlm_rate': None, + } + + ltr = 0 + lvr = 0 + async for quote in source: + if quote: + + tr = quote['tradeRate'], + if tr != ltr: + print(f'trade rate: {tr}') + yield '1m_trade_rate', tr + ltr = tr + + vr = quote['volumeRate'], + if vr != lvr: + print(f'vlm rate: {tr}') + yield '1m_vlm_rate', tr + ltr = tr