Prep for manual rate calcs, handle non-ib backends XD

windows_fixes_yo
Tyler Goodlet 2022-02-04 11:54:44 -05:00
parent ebf3e00438
commit 1fc6429f75
1 changed files with 53 additions and 6 deletions
piker/fsp

View File

@ -30,9 +30,10 @@ def wap(
weights: np.ndarray,
) -> np.ndarray:
"""Weighted average price from signal and weights.
'''
Weighted average price from signal and weights.
"""
'''
cum_weights = np.cumsum(weights)
cum_weighted_input = np.cumsum(signal * weights)
@ -130,6 +131,8 @@ async def dolla_vlm(
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
from ._momo import wma
# on first iteration yield history
yield {
'dolla_vlm': chl3 * v,
@ -189,9 +192,22 @@ async def dolla_vlm(
@fsp(
# TODO: eventually I guess we should support some kinda declarative
# graphics config syntax per output yah? That seems like a clean way
# to let users configure things? Not sure how exactly to offer that
# api as well as how to expose such a thing *inside* the body?
outputs=(
# pulled verbatim from `ib` for now
'1m_trade_rate',
'1m_vlm_rate',
# our own instantaneous rate calcs which are all
# parameterized by a samples count (bars) period
# 'trade_rate',
# 'dark_trade_rate',
# 'dvlm_rate',
# 'dark_dvlm_rate',
),
curve_style='line',
)
@ -199,9 +215,30 @@ async def flow_rates(
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
# TODO (idea): a dynamic generic / boxing type that can be updated by other
# FSPs, user input, and possibly any general event stream in
# real-time. Hint: ideally implemented with caching until mutated
# ;)
period: 'Param[int]' = 16, # noqa
# TODO (idea): a generic for declaring boxed fsps much like ``pytest``
# fixtures? This probably needs a lot of thought if we want to offer
# a higher level composition syntax eventually (oh right gotta make
# an issue for that).
# ideas for how to allow composition / intercalling:
# - offer a `Fsp.get_history()` to do the first yield output?
# * err wait can we just have shm access directly?
# - how would it work if some consumer fsp wanted to dynamically
# change params which are input to the callee fsp? i guess we could
# lazy copy in that case?
# dvlm: 'Fsp[dolla_vlm]'
) -> AsyncIterator[
tuple[str, Union[np.ndarray, float]],
]:
# dvlm_shm = dolla_vlm.get_shm(ohlcv)
# generally no history available prior to real-time calcs
yield {
'1m_trade_rate': None,
@ -210,17 +247,27 @@ async def flow_rates(
ltr = 0
lvr = 0
# TODO: 3.10 do ``anext()``
quote = await source.__anext__()
tr = quote.get('tradeRate')
yield '1m_trade_rate', tr or 0
vr = quote.get('volumeRate')
yield '1m_vlm_rate', vr or 0
async for quote in source:
if quote:
tr = quote['tradeRate']
if tr != ltr:
# XXX: ib specific schema we should
# probably pre-pack ourselves.
tr = quote.get('tradeRate')
if tr is not None and tr != ltr:
# print(f'trade rate: {tr}')
yield '1m_trade_rate', tr
ltr = tr
vr = quote['volumeRate']
if vr != lvr:
vr = quote.get('volumeRate')
if vr is not None and vr != lvr:
# print(f'vlm rate: {vr}')
yield '1m_vlm_rate', vr
lvr = vr