diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 5e144f58..aaded701 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -30,9 +30,10 @@ def wap( weights: np.ndarray, ) -> np.ndarray: - """Weighted average price from signal and weights. + ''' + Weighted average price from signal and weights. - """ + ''' cum_weights = np.cumsum(weights) cum_weighted_input = np.cumsum(signal * weights) @@ -130,6 +131,8 @@ async def dolla_vlm( chl3 = (a['close'] + a['high'] + a['low']) / 3 v = a['volume'] + from ._momo import wma + # on first iteration yield history yield { 'dolla_vlm': chl3 * v, @@ -189,9 +192,22 @@ async def dolla_vlm( @fsp( + # TODO: eventually I guess we should support some kinda declarative + # graphics config syntax per output yah? That seems like a clean way + # to let users configure things? Not sure how exactly to offer that + # api as well as how to expose such a thing *inside* the body? outputs=( + # pulled verbatim from `ib` for now '1m_trade_rate', '1m_vlm_rate', + + # our own instantaneous rate calcs which are all + # parameterized by a samples count (bars) period + # 'trade_rate', + # 'dark_trade_rate', + + # 'dvlm_rate', + # 'dark_dvlm_rate', ), curve_style='line', ) @@ -199,9 +215,30 @@ async def flow_rates( source: AsyncReceiver[dict], ohlcv: ShmArray, # OHLC sampled history + # TODO (idea): a dynamic generic / boxing type that can be updated by other + # FSPs, user input, and possibly any general event stream in + # real-time. Hint: ideally implemented with caching until mutated + # ;) + period: 'Param[int]' = 16, # noqa + + # TODO (idea): a generic for declaring boxed fsps much like ``pytest`` + # fixtures? This probably needs a lot of thought if we want to offer + # a higher level composition syntax eventually (oh right gotta make + # an issue for that). + # ideas for how to allow composition / intercalling: + # - offer a `Fsp.get_history()` to do the first yield output? + # * err wait can we just have shm access directly? + # - how would it work if some consumer fsp wanted to dynamically + # change params which are input to the callee fsp? i guess we could + # lazy copy in that case? + # dvlm: 'Fsp[dolla_vlm]' + ) -> AsyncIterator[ tuple[str, Union[np.ndarray, float]], ]: + + # dvlm_shm = dolla_vlm.get_shm(ohlcv) + # generally no history available prior to real-time calcs yield { '1m_trade_rate': None, @@ -210,17 +247,27 @@ async def flow_rates( ltr = 0 lvr = 0 + + # TODO: 3.10 do ``anext()`` + quote = await source.__anext__() + tr = quote.get('tradeRate') + yield '1m_trade_rate', tr or 0 + vr = quote.get('volumeRate') + yield '1m_vlm_rate', vr or 0 + async for quote in source: if quote: - tr = quote['tradeRate'] - if tr != ltr: + # XXX: ib specific schema we should + # probably pre-pack ourselves. + tr = quote.get('tradeRate') + if tr is not None and tr != ltr: # print(f'trade rate: {tr}') yield '1m_trade_rate', tr ltr = tr - vr = quote['volumeRate'] - if vr != lvr: + vr = quote.get('volumeRate') + if vr is not None and vr != lvr: # print(f'vlm rate: {vr}') yield '1m_vlm_rate', vr lvr = vr