diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 9731e37f..34f2b17d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -192,6 +192,7 @@ class Client: # barSizeSetting='5 secs', durationStr='{count} S'.format(count=period_count), + # barSizeSetting='5 secs', barSizeSetting='1 secs', # barSizeSetting='1 min', @@ -328,8 +329,9 @@ class Client: exch = 'SMART' if not exch else exch contract = (await self.ib.qualifyContractsAsync(con))[0] - head = await self.get_head_time(contract) - print(head) + # head = await self.get_head_time(contract) + # print(head) + except IndexError: raise ValueError(f"No contract could be found {con}") return contract @@ -617,9 +619,11 @@ async def activate_writer(key: str) -> (bool, trio.Nursery): async def fill_bars( - first_bars, - shm, + sym: str, + first_bars: list, + shm: 'ShmArray', # type: ignore # noqa count: int = 21, + # count: int = 1, ) -> None: """Fill historical bars into shared mem / storage afap. @@ -635,9 +639,7 @@ async def fill_bars( try: bars, bars_array = await _trio_run_client_method( method='bars', - symbol='.'.join( - (first_bars.contract.symbol, first_bars.contract.exchange) - ), + symbol=sym, end_dt=next_dt, ) @@ -723,7 +725,7 @@ async def stream_quotes( # TODO: generalize this for other brokers # start bar filler task in bg - ln.start_soon(fill_bars, bars, shm) + ln.start_soon(fill_bars, sym, bars, shm) times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] @@ -808,7 +810,7 @@ async def stream_quotes( ['open', 'high', 'low', 'volume'] ] - new_v = tick['size'] + new_v = tick.get('size', 0) if v == 0 and new_v: # no trades for this bar yet so the open diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 6e56c667..c798bf28 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -20,18 +20,24 @@ Financial signal processing for the peeps. from typing import AsyncIterator, Callable, Tuple import trio +from trio_typing import TaskStatus import tractor import numpy as np from ..log import get_logger from .. import data -from ._momo import _rsi +from ._momo import _rsi, _wma +from ._volume import _tina_vwap from ..data import attach_shm_array, Feed log = get_logger(__name__) -_fsps = {'rsi': _rsi} +_fsps = { + 'rsi': _rsi, + 'wma': _wma, + 'vwap': _tina_vwap, +} async def latency( @@ -70,7 +76,7 @@ async def increment_signals( # write new slot to the buffer dst_shm.push(last) - + len(dst_shm.array) @tractor.stream @@ -95,66 +101,107 @@ async def cascade( async with data.open_feed(brokername, [symbol]) as feed: assert src.token == feed.shm.token - # TODO: load appropriate fsp with input args - async def filter_by_sym(sym, stream): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes + async def fsp_compute( + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + ) -> None: - out_stream = func( - filter_by_sym(symbol, feed.stream), - feed.shm, - ) + # TODO: load appropriate fsp with input args - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for derivatives. + async def filter_by_sym( + sym: str, + stream, + ): + # task cancellation won't kill the channel + with stream.shield(): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() + out_stream = func( + filter_by_sym(symbol, feed.stream), + feed.shm, + ) - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[fsp_func_name] = history_output + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + dst._first.value = src._first.value + dst._last.value = src._first.value - # TODO: talk to ``pyqtgraph`` core about proper way to solve this: - # XXX: hack to get curves aligned with bars graphics: prepend - # a copy of the first datum.. - # dst.push(history[:1]) + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff >= 0: - print(f"WTF DIFFZZZ {diff}") - for _ in range(diff): - dst.push(history[:1]) + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[fsp_func_name] = history_output - # compare with source signal and time align - index = dst.push(history) - yield index + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff >= 0: + print(f"WTF DIFF SIGNAL to HISTORY {diff}") + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + await ctx.send_yield(index) + + # setup a respawn handle + with trio.CancelScope() as cs: + task_status.started(cs) + + # rt stream + async for processed in out_stream: + log.debug(f"{fsp_func_name}: {processed}") + index = src.index + dst.array[-1][fsp_func_name] = processed + + # stream latest shm array index entry + await ctx.send_yield(index) + + last_len = new_len = len(src.array) async with trio.open_nursery() as n: - n.start_soon(increment_signals, feed, dst) - async for processed in out_stream: - log.debug(f"{fsp_func_name}: {processed}") - index = src.index - dst.array[-1][fsp_func_name] = processed - await ctx.send_yield(index) + cs = await n.start(fsp_compute) + + # Increment the underlying shared memory buffer on every "increment" + # msg received from the underlying data feed. + + async for msg in await feed.index_stream(): + + new_len = len(src.array) + + if new_len > last_len + 1: + # respawn the signal compute task if the source + # signal has been updated + cs.cancel() + cs = await n.start(fsp_compute) + + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + + array = dst.array + last = array[-1:].copy() + + # write new slot to the buffer + dst.push(last) + + last_len = new_len diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 13bad728..f8811afa 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -151,8 +151,8 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp( - # aggregates=[60, 60*5, 60*60, '4H', '1D'], +# @piker.fsp.signal( +# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], # ) async def _rsi( source: 'QuoteStream[Dict[str, Any]]', # noqa @@ -171,8 +171,8 @@ async def _rsi( # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) - up_ema_last = last_up_ema_close - down_ema_last = last_down_ema_close + up_ema_last = last_up_ema_close + down_ema_last = last_down_ema_close # deliver history yield rsi_h diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py new file mode 100644 index 00000000..30397920 --- /dev/null +++ b/piker/fsp/_volume.py @@ -0,0 +1,93 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import AsyncIterator, Optional + +import numpy as np + +from ..data._normalize import iterticks + + +def wap( + signal: np.ndarray, + weights: np.ndarray, +) -> np.ndarray: + """Weighted average price from signal and weights. + + """ + cum_weights = np.cumsum(weights) + cum_weighted_input = np.cumsum(signal * weights) + + # cum_weighted_input / cum_weights + # but, avoid divide by zero errors + avg = np.divide( + cum_weighted_input, + cum_weights, + where=cum_weights != 0 + ) + + return ( + avg, + cum_weighted_input, + cum_weights, + ) + + +async def _tina_vwap( + source, #: AsyncStream[np.ndarray], + ohlcv: np.ndarray, # price time-frame "aware" + anchors: Optional[np.ndarray] = None, +) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? + """Streaming volume weighted moving average. + + Calling this "tina" for now since we're using HLC3 instead of tick. + + """ + if anchors is None: + # TODO: + # anchor to session start of data if possible + pass + + a = ohlcv.array + chl3 = (a['close'] + a['high'] + a['low']) / 3 + v = a['volume'] + + h_vwap, cum_wp, cum_v = wap(chl3, v) + + # deliver historical output as "first yield" + yield h_vwap + + w_tot = cum_wp[-1] + v_tot = cum_v[-1] + # vwap_tot = h_vwap[-1] + + async for quote in source: + + for tick in iterticks(quote, types=['trade']): + + # c, h, l, v = ohlcv.array[-1][ + # ['closes', 'high', 'low', 'volume'] + # ] + + # this computes tick-by-tick weightings from here forward + size = tick['size'] + price = tick['price'] + + v_tot += size + w_tot += price * size + + # yield ((((o + h + l) / 3) * v) weights_tot) / v_tot + yield w_tot / v_tot diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a708a82e..7ba23f5b 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -558,7 +558,9 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: see how this handles with custom ohlcv bars graphics # and/or if we can implement something similar for OHLC graphics - clipToView=True, + # clipToView=True, + autoDownsample=True, + downsampleMethod='subsample', **pdi_kwargs, ) @@ -839,10 +841,6 @@ async def _async_main( # eventually we'll support some kind of n-compose syntax fsp_conf = { - # 'vwap': { - # 'overlay': True, - # 'anchor': 'session', - # }, 'rsi': { 'period': 14, 'chart_kwargs': { @@ -852,6 +850,24 @@ async def _async_main( } + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and derivatives) + volm = ohlcv.array['volume'] + if ( + np.all(np.isin(volm, -1)) or + np.all(np.isnan(volm)) + ): + log.warning( + f"{sym} does not seem to have volume info," + " dropping volume signals") + else: + fsp_conf.update({ + 'vwap': { + 'overlay': True, + 'anchor': 'session', + }, + }) + async with trio.open_nursery() as n: # load initial fsp chain (otherwise known as "indicators") @@ -1099,11 +1115,11 @@ async def spawn_fsps( print(f'FSP NAME: {fsp_name}') portal = await n.run_in_actor( - # name as title of sub-chart - display_name, - # subactor entrypoint fsp.cascade, + + # name as title of sub-chart + name=display_name, brokername=brokermod.name, src_shm_token=src_shm.token, dst_shm_token=conf['shm'].token, @@ -1221,9 +1237,23 @@ async def update_signals( # update chart graphics async for value in stream: - # read last - array = shm.array - value = array[-1][fsp_func_name] + # TODO: provide a read sync mechanism to avoid this polling. + # the underlying issue is that a backfill and subsequent shm + # array first/last index update could result in an empty array + # read here since the stream is never torn down on the + # re-compute steps. + read_tries = 2 + while read_tries > 0: + + try: + # read last + array = shm.array + value = array[-1][fsp_func_name] + break + + except IndexError: + read_tries -= 1 + continue if last_val_sticky: last_val_sticky.update_from_data(-1, value)