From 92d7ffd332c2bb624e371116d6bbc776e98ab166 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Sep 2021 10:47:41 -0400 Subject: [PATCH 01/16] WIP fsp output throttling - not working yet --- piker/data/_sampling.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 4c5aaded..160f3fc9 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -246,7 +246,7 @@ async def sample_and_broadcast( if tick_throttle: # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. - await stream.send(quote) + await stream.send((sym, quote)) else: await stream.send({sym: quote}) @@ -285,10 +285,14 @@ async def uniform_rate_send( sleep_period = 1/rate - 0.000616 last_send = time.time() + aname = stream._ctx.chan.uid[0] + fsp = False + if 'fsp' in aname: + fsp = True while True: - first_quote = await quote_stream.receive() + sym, first_quote = await quote_stream.receive() start = time.time() # append quotes since last iteration into the last quote's @@ -301,7 +305,7 @@ async def uniform_rate_send( # while True: try: - next_quote = quote_stream.receive_nowait() + sym, next_quote = quote_stream.receive_nowait() ticks = next_quote.get('ticks') if ticks: @@ -312,12 +316,12 @@ async def uniform_rate_send( rate = 1 / (now - last_send) last_send = now - # print(f'{rate} Hz sending quotes') # \n{first_quote}') + # log.info(f'{rate} Hz sending quotes') # \n{first_quote}') # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({first_quote['symbol']: first_quote}) + await stream.send({sym: first_quote}) break except trio.ClosedResourceError: # if the feed consumer goes down then drop From 31f4dbef7de1ee9ec17fbb5040aafb48c01995aa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Sep 2021 10:08:01 -0400 Subject: [PATCH 02/16] More explicit error on shm push overruns --- piker/data/_sharedmem.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index f7f9f904..53c40423 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -31,7 +31,7 @@ import tractor import numpy as np from ..log import get_logger -from ._source import base_ohlc_dtype, base_iohlc_dtype +from ._source import base_iohlc_dtype log = get_logger(__name__) @@ -221,6 +221,11 @@ class ShmArray: if prepend: index = self._first.value - length + if index < 0: + raise ValueError( + f'Array size of {self._len} was overrun during prepend.\n' + 'You have passed {abs(index)} too many datums.' + ) else: index = self._last.value @@ -290,8 +295,10 @@ class ShmArray: # how much is probably dependent on lifestyle -_secs_in_day = int(60 * 60 * 12) -_default_size = 2 * _secs_in_day +_secs_in_day = int(60 * 60 * 24) +# we try for 3 times but only on a run-every-other-day kinda week. +_default_size = 3 * _secs_in_day + def open_shm_array( key: Optional[str] = None, From 33d1f5644099a601bba5efe6fbef5921a0bc5182 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 Sep 2021 17:47:43 -0400 Subject: [PATCH 03/16] Port fsp daemon to tractor's context api --- piker/fsp/__init__.py | 146 +++++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 59 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index f3f2d5de..ac6ac0d7 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -36,7 +36,7 @@ from ..data._sharedmem import ShmArray log = get_logger(__name__) -_fsps = { +_fsp_builtins = { 'rsi': _rsi, 'wma': _wma, 'vwap': _tina_vwap, @@ -65,16 +65,39 @@ async def latency( yield value +async def filter_quotes_by_sym( + + sym: str, + quote_stream, + +) -> AsyncIterator[dict]: + '''Filter quote stream by target symbol. + + ''' + # TODO: make this the actualy first quote from feed + # XXX: this allows for a single iteration to run for history + # processing without waiting on the real-time feed for a new quote + yield {} + + # task cancellation won't kill the channel + # since we shielded at the `open_feed()` call + async for quotes in quote_stream: + for symbol, quote in quotes.items(): + if symbol == sym: + yield quote + + async def fsp_compute( - ctx: tractor.Context, + + stream: tractor.MsgStream, symbol: str, feed: Feed, - stream: trio.abc.ReceiveChannel, + quote_stream: trio.abc.ReceiveChannel, src: ShmArray, dst: ShmArray, - fsp_func_name: str, + func_name: str, func: Callable, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, @@ -83,25 +106,13 @@ async def fsp_compute( # TODO: load appropriate fsp with input args - async def filter_by_sym( - sym: str, - stream, - ): - - # TODO: make this the actualy first quote from feed - # XXX: this allows for a single iteration to run for history - # processing without waiting on the real-time feed for a new quote - yield {} - - # task cancellation won't kill the channel - # since we shielded at the `open_feed()` call - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes - out_stream = func( - filter_by_sym(symbol, stream), + + # TODO: do we even need this if we do the feed api right? + # shouldn't a local stream do this before we get a handle + # to the async iterable? it's that or we do some kinda + # async itertools style? + filter_quotes_by_sym(symbol, quote_stream), feed.shm, ) @@ -129,11 +140,11 @@ async def fsp_compute( np.arange(len(history_output)), dtype=dst.array.dtype ) - history[fsp_func_name] = history_output + history[func_name] = history_output # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) - if diff >= 0: + if diff > 0: log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") for _ in range(diff): dst.push(history[:1]) @@ -141,36 +152,44 @@ async def fsp_compute( # compare with source signal and time align index = dst.push(history) - await ctx.send_yield(index) - # setup a respawn handle with trio.CancelScope() as cs: - task_status.started(cs) + task_status.started((cs, index)) + + import time + last = time.time() # rt stream async for processed in out_stream: - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') + period = time.time() - last + hz = 1/period if period else float('nan') + if hz > 60: + log.info(f'FSP quote too fast: {hz}') - log.debug(f"{fsp_func_name}: {processed}") + log.debug(f"{func_name}: {processed}") index = src.index - dst.array[-1][fsp_func_name] = processed + dst.array[-1][func_name] = processed - # stream latest shm array index entry - await ctx.send_yield(index) + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + await stream.send(index) + + last = time.time() -@tractor.stream +@tractor.context async def cascade( + ctx: tractor.Context, brokername: str, + src_shm_token: dict, dst_shm_token: Tuple[str, np.dtype], + symbol: str, - fsp_func_name: str, + func_name: str, + loglevel: Optional[str] = None, ) -> None: @@ -184,39 +203,50 @@ async def cascade( src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) - func: Callable = _fsps[fsp_func_name] + func: Callable = _fsp_builtins.get(func_name) + if not func: + # TODO: assume it's a func target path + raise ValueError('Unknown fsp target: {func_name}') # open a data feed stream with requested broker async with data.feed.maybe_open_feed( brokername, [symbol], - # TODO: + # TODO throttle tick outputs from *this* daemon since + # it'll emit tons of ticks due to the throttle only + # limits quote arrival periods, so the consumer of *this* + # needs to get throttled the ticks we generate. # tick_throttle=60, - ) as (feed, stream): + ) as (feed, quote_stream): assert src.token == feed.shm.token - last_len = new_len = len(src.array) - fsp_target = partial( - fsp_compute, - ctx=ctx, - symbol=symbol, - feed=feed, - stream=stream, + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): - src=src, - dst=dst, + fsp_target = partial( - fsp_func_name=fsp_func_name, - func=func - ) + fsp_compute, + stream=stream, + symbol=symbol, + feed=feed, + quote_stream=quote_stream, - async with trio.open_nursery() as n: + # shm + src=src, + dst=dst, - cs = await n.start(fsp_target) + func_name=func_name, + func=func + ) + + cs, index = await n.start(fsp_target) + await ctx.started(index) # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. @@ -229,17 +259,15 @@ async def cascade( if new_len > last_len + 1: # respawn the signal compute task if the source # signal has been updated + log.warning(f'Re-spawning fsp {func_name}') cs.cancel() - cs = await n.start(fsp_target) + cs, index = await n.start(fsp_target) # TODO: adopt an incremental update engine/approach # where possible here eventually! - # read out last shm row + # read out last shm row, copy and write new row array = dst.array last = array[-1:].copy() - - # write new row to the shm buffer dst.push(last) - last_len = new_len From d4b00d74f804c66562b7626a4bcb64754423daa6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 25 Sep 2021 10:06:37 -0400 Subject: [PATCH 04/16] Move top level fsp pkg code into an `_engine` module --- piker/fsp/__init__.py | 240 ++-------------------------------------- piker/fsp/_engine.py | 251 ++++++++++++++++++++++++++++++++++++++++++ piker/ui/_display.py | 2 +- 3 files changed, 260 insertions(+), 233 deletions(-) create mode 100644 piker/fsp/_engine.py diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index ac6ac0d7..5e88ed69 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship of piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -14,33 +14,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Financial signal processing for the peeps. -""" -from functools import partial -from typing import AsyncIterator, Callable, Tuple, Optional +''' +Fin-sig-proc for the peeps! + +''' +from typing import AsyncIterator -import trio -from trio_typing import TaskStatus -import tractor import numpy as np -from ..log import get_logger, get_console_log -from .. import data -from ._momo import _rsi, _wma -from ._volume import _tina_vwap -from ..data import attach_shm_array -from ..data.feed import Feed -from ..data._sharedmem import ShmArray +from ._engine import cascade -log = get_logger(__name__) - - -_fsp_builtins = { - 'rsi': _rsi, - 'wma': _wma, - 'vwap': _tina_vwap, -} +__all__ = ['cascade'] async def latency( @@ -63,211 +47,3 @@ async def latency( # stack tracing. value = quote['brokerd_ts'] - quote['broker_ts'] yield value - - -async def filter_quotes_by_sym( - - sym: str, - quote_stream, - -) -> AsyncIterator[dict]: - '''Filter quote stream by target symbol. - - ''' - # TODO: make this the actualy first quote from feed - # XXX: this allows for a single iteration to run for history - # processing without waiting on the real-time feed for a new quote - yield {} - - # task cancellation won't kill the channel - # since we shielded at the `open_feed()` call - async for quotes in quote_stream: - for symbol, quote in quotes.items(): - if symbol == sym: - yield quote - - -async def fsp_compute( - - stream: tractor.MsgStream, - symbol: str, - feed: Feed, - quote_stream: trio.abc.ReceiveChannel, - - src: ShmArray, - dst: ShmArray, - - func_name: str, - func: Callable, - - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - -) -> None: - - # TODO: load appropriate fsp with input args - - out_stream = func( - - # TODO: do we even need this if we do the feed api right? - # shouldn't a local stream do this before we get a handle - # to the async iterable? it's that or we do some kinda - # async itertools style? - filter_quotes_by_sym(symbol, quote_stream), - feed.shm, - ) - - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for incremental compuations. - dst._first.value = src._first.value - dst._last.value = src._first.value - - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() - - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[func_name] = history_output - - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff > 0: - log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") - for _ in range(diff): - dst.push(history[:1]) - - # compare with source signal and time align - index = dst.push(history) - - # setup a respawn handle - with trio.CancelScope() as cs: - task_status.started((cs, index)) - - import time - last = time.time() - - # rt stream - async for processed in out_stream: - - period = time.time() - last - hz = 1/period if period else float('nan') - if hz > 60: - log.info(f'FSP quote too fast: {hz}') - - log.debug(f"{func_name}: {processed}") - index = src.index - dst.array[-1][func_name] = processed - - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - await stream.send(index) - - last = time.time() - - -@tractor.context -async def cascade( - - ctx: tractor.Context, - brokername: str, - - src_shm_token: dict, - dst_shm_token: Tuple[str, np.dtype], - - symbol: str, - func_name: str, - - loglevel: Optional[str] = None, - -) -> None: - '''Chain streaming signal processors and deliver output to - destination mem buf. - - ''' - if loglevel: - get_console_log(loglevel) - - src = attach_shm_array(token=src_shm_token) - dst = attach_shm_array(readonly=False, token=dst_shm_token) - - func: Callable = _fsp_builtins.get(func_name) - if not func: - # TODO: assume it's a func target path - raise ValueError('Unknown fsp target: {func_name}') - - # open a data feed stream with requested broker - async with data.feed.maybe_open_feed( - brokername, - [symbol], - - # TODO throttle tick outputs from *this* daemon since - # it'll emit tons of ticks due to the throttle only - # limits quote arrival periods, so the consumer of *this* - # needs to get throttled the ticks we generate. - # tick_throttle=60, - - ) as (feed, quote_stream): - - assert src.token == feed.shm.token - last_len = new_len = len(src.array) - - async with ( - ctx.open_stream() as stream, - trio.open_nursery() as n, - ): - - fsp_target = partial( - - fsp_compute, - stream=stream, - symbol=symbol, - feed=feed, - quote_stream=quote_stream, - - # shm - src=src, - dst=dst, - - func_name=func_name, - func=func - ) - - cs, index = await n.start(fsp_target) - await ctx.started(index) - - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - - async with feed.index_stream() as stream: - async for msg in stream: - - new_len = len(src.array) - - if new_len > last_len + 1: - # respawn the signal compute task if the source - # signal has been updated - log.warning(f'Re-spawning fsp {func_name}') - cs.cancel() - cs, index = await n.start(fsp_target) - - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - - # read out last shm row, copy and write new row - array = dst.array - last = array[-1:].copy() - dst.push(last) - last_len = new_len diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py new file mode 100644 index 00000000..9e9f1370 --- /dev/null +++ b/piker/fsp/_engine.py @@ -0,0 +1,251 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +core task logic for processing chains + +''' +from functools import partial +from typing import AsyncIterator, Callable, Optional + +import trio +from trio_typing import TaskStatus +import tractor +import numpy as np + +from ..log import get_logger, get_console_log +from .. import data +from ..data import attach_shm_array +from ..data.feed import Feed +from ..data._sharedmem import ShmArray +from ._momo import _rsi, _wma +from ._volume import _tina_vwap + +log = get_logger(__name__) + +_fsp_builtins = { + 'rsi': _rsi, + 'wma': _wma, + 'vwap': _tina_vwap, +} + + +async def filter_quotes_by_sym( + + sym: str, + quote_stream, + +) -> AsyncIterator[dict]: + '''Filter quote stream by target symbol. + + ''' + # TODO: make this the actual first quote from feed + # XXX: this allows for a single iteration to run for history + # processing without waiting on the real-time feed for a new quote + yield {} + + async for quotes in quote_stream: + quote = quotes.get(sym) + if quote: + yield quote + # for symbol, quote in quotes.items(): + # if symbol == sym: + + +async def fsp_compute( + + stream: tractor.MsgStream, + symbol: str, + feed: Feed, + quote_stream: trio.abc.ReceiveChannel, + + src: ShmArray, + dst: ShmArray, + + func_name: str, + func: Callable, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> None: + + # TODO: load appropriate fsp with input args + + out_stream = func( + + # TODO: do we even need this if we do the feed api right? + # shouldn't a local stream do this before we get a handle + # to the async iterable? it's that or we do some kinda + # async itertools style? + filter_quotes_by_sym(symbol, quote_stream), + feed.shm, + ) + + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + dst._first.value = src._first.value + dst._last.value = src._first.value + + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() + + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[func_name] = history_output + + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff > 0: + log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + # setup a respawn handle + with trio.CancelScope() as cs: + task_status.started((cs, index)) + + import time + last = time.time() + + # rt stream + async for processed in out_stream: + + period = time.time() - last + hz = 1/period if period else float('nan') + if hz > 60: + log.info(f'FSP quote too fast: {hz}') + + log.debug(f"{func_name}: {processed}") + index = src.index + dst.array[-1][func_name] = processed + + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + await stream.send(index) + + last = time.time() + + +@tractor.context +async def cascade( + + ctx: tractor.Context, + brokername: str, + + src_shm_token: dict, + dst_shm_token: tuple[str, np.dtype], + + symbol: str, + func_name: str, + + loglevel: Optional[str] = None, + +) -> None: + '''Chain streaming signal processors and deliver output to + destination mem buf. + + ''' + if loglevel: + get_console_log(loglevel) + + src = attach_shm_array(token=src_shm_token) + dst = attach_shm_array(readonly=False, token=dst_shm_token) + + func: Callable = _fsp_builtins.get(func_name) + if not func: + # TODO: assume it's a func target path + raise ValueError('Unknown fsp target: {func_name}') + + # open a data feed stream with requested broker + async with data.feed.maybe_open_feed( + brokername, + [symbol], + + # TODO throttle tick outputs from *this* daemon since + # it'll emit tons of ticks due to the throttle only + # limits quote arrival periods, so the consumer of *this* + # needs to get throttled the ticks we generate. + # tick_throttle=60, + + ) as (feed, quote_stream): + + assert src.token == feed.shm.token + last_len = new_len = len(src.array) + + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + + fsp_target = partial( + + fsp_compute, + stream=stream, + symbol=symbol, + feed=feed, + quote_stream=quote_stream, + + # shm + src=src, + dst=dst, + + func_name=func_name, + func=func + ) + + cs, index = await n.start(fsp_target) + await ctx.started(index) + + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. + + async with feed.index_stream() as stream: + async for msg in stream: + + new_len = len(src.array) + + if new_len > last_len + 1: + # respawn the signal compute task if the source + # signal has been updated + log.warning(f'Re-spawning fsp {func_name}') + cs.cancel() + cs, index = await n.start(fsp_target) + + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + + # read out last shm row, copy and write new row + array = dst.array + last = array[-1:].copy() + dst.push(last) + last_len = new_len diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 483db8a8..ab85e761 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -323,7 +323,7 @@ async def fan_out_spawn_fsp_daemons( conf['shm'] = shm portal = await n.start_actor( - enable_modules=['piker.fsp'], + enable_modules=['piker.fsp._engine'], name='fsp.' + display_name, ) From 2cd594ed3529a1d010ee4616d2846f15a406a568 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Sep 2021 07:41:09 -0400 Subject: [PATCH 05/16] Add profiling to fsp engine Litter the engine code with `pyqtgraph` profiling to see if we can improve startup times - likely it'll mean pre-allocating a small fsp daemon cluster at startup. --- piker/fsp/_engine.py | 76 +++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 9e9f1370..e246e0b7 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -21,10 +21,11 @@ core task logic for processing chains from functools import partial from typing import AsyncIterator, Callable, Optional +import numpy as np +import pyqtgraph as pg import trio from trio_typing import TaskStatus import tractor -import numpy as np from ..log import get_logger, get_console_log from .. import data @@ -61,8 +62,6 @@ async def filter_quotes_by_sym( quote = quotes.get(sym) if quote: yield quote - # for symbol, quote in quotes.items(): - # if symbol == sym: async def fsp_compute( @@ -78,11 +77,15 @@ async def fsp_compute( func_name: str, func: Callable, + attach_stream: bool = False, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: - # TODO: load appropriate fsp with input args + profiler = pg.debug.Profiler( + delayed=False, + disabled=True + ) out_stream = func( @@ -94,6 +97,21 @@ async def fsp_compute( feed.shm, ) + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() + + # await tractor.breakpoint() + profiler(f'{func_name} generated history') + + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[func_name] = history_output + # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're # prepending a copy of the first value a few times to make @@ -108,31 +126,25 @@ async def fsp_compute( dst._first.value = src._first.value dst._last.value = src._first.value - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() - - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[func_name] = history_output - + # compare with source signal and time align # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff > 0: - log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") + log.warning(f"WTF DIFF fsp to ohlc history {diff}") for _ in range(diff): dst.push(history[:1]) - # compare with source signal and time align - index = dst.push(history) + # TODO: can we use this `start` flag instead of the manual + # setting above? + index = dst.push(history) #, start=src._first.value) + + profiler(f'{func_name} pushed history') + profiler.finish() # setup a respawn handle with trio.CancelScope() as cs: task_status.started((cs, index)) + profiler(f'{func_name} yield last index') import time last = time.time() @@ -140,20 +152,21 @@ async def fsp_compute( # rt stream async for processed in out_stream: - period = time.time() - last - hz = 1/period if period else float('nan') - if hz > 60: - log.info(f'FSP quote too fast: {hz}') - log.debug(f"{func_name}: {processed}") index = src.index dst.array[-1][func_name] = processed + # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts # as trigger msg to tell the consumer to read from shm - await stream.send(index) + if attach_stream: + await stream.send(index) - last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() @tractor.context @@ -175,6 +188,8 @@ async def cascade( destination mem buf. ''' + profiler = pg.debug.Profiler(delayed=False, disabled=False) + if loglevel: get_console_log(loglevel) @@ -199,6 +214,8 @@ async def cascade( ) as (feed, quote_stream): + profiler(f'{func_name}: feed up') + assert src.token == feed.shm.token last_len = new_len = len(src.array) @@ -225,11 +242,16 @@ async def cascade( cs, index = await n.start(fsp_target) await ctx.started(index) + profiler(f'{func_name}: fsp up') # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. async with feed.index_stream() as stream: + + profiler(f'{func_name}: sample stream up') + profiler.finish() + async for msg in stream: new_len = len(src.array) @@ -246,6 +268,8 @@ async def cascade( # read out last shm row, copy and write new row array = dst.array + # TODO: some signals, like vlm should be reset to + # zero every step. last = array[-1:].copy() dst.push(last) last_len = new_len From 2b9fb952a91921f728131f6f3968f92b12bc433a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Sep 2021 07:33:43 -0400 Subject: [PATCH 06/16] Fix shm index update race There was a lingering issue where the fsp daemon would sync its shm array with the source data and we'd set the start/end indices to the same value. Under some races a reader would then read an empty `.array` which it wasn't expecting. This fixes that as well as tidies up the `ShmArray.push()` logic and adds a temporary check in `.array` for zero length if the array hasn't been written yet. We can now start removing read array length checks in consumer code and hopefully no more races will show up. --- piker/data/_sharedmem.py | 46 ++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 53c40423..8afa3214 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -168,6 +168,7 @@ class ShmArray: self._len = len(shmarr) self._shm = shm + self._post_init: bool = False # pushing data does not write the index (aka primary key) self._write_fields = list(shmarr.dtype.fields.keys())[1:] @@ -196,19 +197,37 @@ class ShmArray: @property def array(self) -> np.ndarray: - return self._array[self._first.value:self._last.value] + '''Return an up-to-date ``np.ndarray`` view of the + so-far-written data to the underlying shm buffer. + + ''' + a = self._array[self._first.value:self._last.value] + + # first, last = self._first.value, self._last.value + # a = self._array[first:last] + + # TODO: eventually comment this once we've not seen it in the + # wild in a long time.. + # XXX: race where first/last indexes cause a reader + # to load an empty array.. + if len(a) == 0 and self._post_init: + raise RuntimeError('Empty array race condition hit!?') + # breakpoint() + + return a def last( self, length: int = 1, ) -> np.ndarray: - return self.array[-length:] + return self.array[-length] def push( self, data: np.ndarray, prepend: bool = False, + start: Optional[int] = None, ) -> int: '''Ring buffer like "push" to append data @@ -217,17 +236,18 @@ class ShmArray: NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. ''' + self._post_init = True length = len(data) + index = start or self._last.value if prepend: index = self._first.value - length + if index < 0: raise ValueError( f'Array size of {self._len} was overrun during prepend.\n' 'You have passed {abs(index)} too many datums.' ) - else: - index = self._last.value end = index + length @@ -235,11 +255,22 @@ class ShmArray: try: self._array[fields][index:end] = data[fields][:] + + # NOTE: there was a race here between updating + # the first and last indices and when the next reader + # tries to access ``.array`` (which due to the index + # overlap will be empty). Pretty sure we've fixed it now + # but leaving this here as a reminder. if prepend: + assert index < self._first.value + + if index < self._first.value: self._first.value = index else: self._last.value = end + return end + except ValueError as err: # shoudl raise if diff detected self.diff_err_fields(data) @@ -301,16 +332,19 @@ _default_size = 3 * _secs_in_day def open_shm_array( + key: Optional[str] = None, size: int = _default_size, dtype: Optional[np.dtype] = None, readonly: bool = False, + ) -> ShmArray: - """Open a memory shared ``numpy`` using the standard library. + '''Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown and thus should be used from the parent-most accessor (process). - """ + + ''' # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype) From 5b1be8a8daff90befdf65a7cb1a82cf548fa41f2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Sep 2021 10:46:44 -0400 Subject: [PATCH 07/16] Do fsp sync-to-source in sample step task --- piker/fsp/_engine.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index e246e0b7..0494ef3c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -123,20 +123,11 @@ async def fsp_compute( # is `index` aware such that historical data can be indexed # relative to the true first datum? Not sure if this is sane # for incremental compuations. - dst._first.value = src._first.value - dst._last.value = src._first.value - - # compare with source signal and time align - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff > 0: - log.warning(f"WTF DIFF fsp to ohlc history {diff}") - for _ in range(diff): - dst.push(history[:1]) + first = dst._first.value = src._first.value # TODO: can we use this `start` flag instead of the manual # setting above? - index = dst.push(history) #, start=src._first.value) + index = dst.push(history, start=first) profiler(f'{func_name} pushed history') profiler.finish() @@ -146,8 +137,8 @@ async def fsp_compute( task_status.started((cs, index)) profiler(f'{func_name} yield last index') - import time - last = time.time() + # import time + # last = time.time() # rt stream async for processed in out_stream: @@ -254,12 +245,12 @@ async def cascade( async for msg in stream: + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. new_len = len(src.array) - if new_len > last_len + 1: - # respawn the signal compute task if the source - # signal has been updated - log.warning(f'Re-spawning fsp {func_name}') + log.warning(f're-syncing fsp {func_name} to source') cs.cancel() cs, index = await n.start(fsp_target) @@ -268,8 +259,22 @@ async def cascade( # read out last shm row, copy and write new row array = dst.array + # TODO: some signals, like vlm should be reset to # zero every step. last = array[-1:].copy() dst.push(last) last_len = new_len + + # compare again with source and make sure + # histories are index aligned. + diff = src.index - dst.index + if diff: + if abs(diff) < 10: + log.warning( + f'syncing fsp to source by offset: {diff}') + history = dst.array + dst.push(history[:-1], start=src._first.value) + else: + log.warning( + f'large offset {diff} re-spawn ongoing?..') From 6f83e358fe74c52dbacf674355d4cfa5f0304260 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Sep 2021 11:19:50 -0400 Subject: [PATCH 08/16] Add zero on increment support --- piker/fsp/_engine.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 0494ef3c..21688b8b 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -171,6 +171,7 @@ async def cascade( symbol: str, func_name: str, + zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -232,6 +233,11 @@ async def cascade( ) cs, index = await n.start(fsp_target) + + if zero_on_step: + last = dst.array[-1:] + zeroed = np.zeros(last.shape, dtype=last.dtype) + await ctx.started(index) profiler(f'{func_name}: fsp up') @@ -263,6 +269,9 @@ async def cascade( # TODO: some signals, like vlm should be reset to # zero every step. last = array[-1:].copy() + if zero_on_step: + last = zeroed + dst.push(last) last_len = new_len From 1981b113b759038a11ae41c0aec536b40db5a808 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Oct 2021 17:46:20 -0400 Subject: [PATCH 09/16] Drunkfix: finally solve the fsp alignment race? --- piker/fsp/_engine.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 21688b8b..7bc4510c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -254,8 +254,15 @@ async def cascade( # respawn the compute task if the source # array has been updated such that we compute # new history from the (prepended) source. + diff = src.index - dst.index new_len = len(src.array) - if new_len > last_len + 1: + + # XXX: ok no idea why this works but "drunk fix" + # says it don't matter. + if ( + new_len > last_len + 1 or + abs(diff) > 1 + ): log.warning(f're-syncing fsp {func_name} to source') cs.cancel() cs, index = await n.start(fsp_target) @@ -266,24 +273,12 @@ async def cascade( # read out last shm row, copy and write new row array = dst.array - # TODO: some signals, like vlm should be reset to - # zero every step. - last = array[-1:].copy() + # some metrics, like vlm should be reset + # to zero every step. if zero_on_step: last = zeroed + else: + last = array[-1:].copy() dst.push(last) last_len = new_len - - # compare again with source and make sure - # histories are index aligned. - diff = src.index - dst.index - if diff: - if abs(diff) < 10: - log.warning( - f'syncing fsp to source by offset: {diff}') - history = dst.array - dst.push(history[:-1], start=src._first.value) - else: - log.warning( - f'large offset {diff} re-spawn ongoing?..') From f68671b6142128e1b8470816deb0ffce573ece70 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Oct 2021 17:47:02 -0400 Subject: [PATCH 10/16] Revert to old shm "last" meaning last row --- piker/data/_sharedmem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 8afa3214..2169e262 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -220,7 +220,7 @@ class ShmArray: self, length: int = 1, ) -> np.ndarray: - return self.array[-length] + return self.array[-length:] def push( self, From 086aaf1d16e2cc25321fb7e7fb3bfcc5d2614725 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 12:58:29 -0400 Subject: [PATCH 11/16] Sync history recalcs to diff checks via a "task tracker" --- piker/fsp/_engine.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 7bc4510c..2df3c126 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -18,6 +18,7 @@ core task logic for processing chains ''' +from dataclasses import dataclass from functools import partial from typing import AsyncIterator, Callable, Optional @@ -44,6 +45,12 @@ _fsp_builtins = { } +@dataclass +class TaskTracker: + complete: trio.Event + cs: trio.CancelScope + + async def filter_quotes_by_sym( sym: str, @@ -134,7 +141,8 @@ async def fsp_compute( # setup a respawn handle with trio.CancelScope() as cs: - task_status.started((cs, index)) + tracker = TaskTracker(trio.Event(), cs) + task_status.started((tracker, index)) profiler(f'{func_name} yield last index') # import time @@ -232,7 +240,7 @@ async def cascade( func=func ) - cs, index = await n.start(fsp_target) + tracker, index = await n.start(fsp_target) if zero_on_step: last = dst.array[-1:] @@ -263,17 +271,20 @@ async def cascade( new_len > last_len + 1 or abs(diff) > 1 ): - log.warning(f're-syncing fsp {func_name} to source') - cs.cancel() - cs, index = await n.start(fsp_target) - # TODO: adopt an incremental update engine/approach # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + tracker, index = await n.start(fsp_target) + + # skip adding a new bar since we should be fully aligned. + continue # read out last shm row, copy and write new row array = dst.array - # some metrics, like vlm should be reset + # some metrics like vlm should be reset # to zero every step. if zero_on_step: last = zeroed From 3dd82c8d31665233845ffe3b13756090e45159db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 16:34:54 -0400 Subject: [PATCH 12/16] Fix the drunk fix This should finally be correct fsp src-to-dst array syncing now.. There's a few edge cases but mostly we need to be sure we sync both back-filled history diffs and avoid current step lag/leads. Use a polling routine and the more stringent task re-spawn system to get this right. --- piker/fsp/_engine.py | 94 +++++++++++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 32 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 2df3c126..70e86481 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -148,24 +148,27 @@ async def fsp_compute( # import time # last = time.time() - # rt stream - async for processed in out_stream: + try: + # rt stream + async for processed in out_stream: - log.debug(f"{func_name}: {processed}") - index = src.index - dst.array[-1][func_name] = processed + log.debug(f"{func_name}: {processed}") + index = src.index + dst.array[-1][func_name] = processed - # NOTE: for now we aren't streaming this to the consumer - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - if attach_stream: - await stream.send(index) + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + if attach_stream: + await stream.send(index) - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - # last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() + finally: + tracker.complete.set() @tractor.context @@ -217,7 +220,7 @@ async def cascade( profiler(f'{func_name}: feed up') assert src.token == feed.shm.token - last_len = new_len = len(src.array) + # last_len = new_len = len(src.array) async with ( ctx.open_stream() as stream, @@ -249,9 +252,16 @@ async def cascade( await ctx.started(index) profiler(f'{func_name}: fsp up') + async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + return await n.start(fsp_target) + # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async with feed.index_stream() as stream: profiler(f'{func_name}: sample stream up') @@ -263,23 +273,44 @@ async def cascade( # array has been updated such that we compute # new history from the (prepended) source. diff = src.index - dst.index - new_len = len(src.array) - # XXX: ok no idea why this works but "drunk fix" - # says it don't matter. + # new_len = len(src.array) + + async def poll_and_sync_to_step(tracker): + diff = src.index - dst.index + while True: + if diff in (0, 1): + break + + tracker, index = await resync(tracker) + diff = src.index - dst.index + # log.info( + # '\n'.join(( + # f'history index after sync: {index}', + # f'diff after sync: {diff}', + # )) + # ) + + return tracker, diff + + # log.debug(f'diff {diff}') + if ( - new_len > last_len + 1 or - abs(diff) > 1 - ): - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - log.warning(f're-syncing fsp {func_name} to source') - tracker.cs.cancel() - await tracker.complete.wait() - tracker, index = await n.start(fsp_target) + # the source is likely backfilling and we must + # sync history calculations + abs(len(src.array) - len(dst.array)) > 0 or - # skip adding a new bar since we should be fully aligned. - continue + # we aren't step synced to the source and may be + # leading/lagging by a step + diff > 1 or + diff < 0 + ): + tracker, diff = await poll_and_sync_to_step(tracker) + + # skip adding a last bar since we should be + # source alinged + if diff == 0: + continue # read out last shm row, copy and write new row array = dst.array @@ -292,4 +323,3 @@ async def cascade( last = array[-1:].copy() dst.push(last) - last_len = new_len From 53dedbd6458e80be9acd2d0f4c6bbd20e14101f7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 08:27:03 -0400 Subject: [PATCH 13/16] Move "desynced" logic into a predicate --- piker/fsp/_engine.py | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 70e86481..afd6aa2c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -269,21 +269,23 @@ async def cascade( async for msg in stream: - # respawn the compute task if the source - # array has been updated such that we compute - # new history from the (prepended) source. - diff = src.index - dst.index + def desynced(src: ShmArray, dst: ShmArray) -> bool: + diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or - # new_len = len(src.array) + # we aren't step synced to the source and may be + # leading/lagging by a step + diff > 1 or + diff < 0 + ) async def poll_and_sync_to_step(tracker): - diff = src.index - dst.index - while True: - if diff in (0, 1): - break - + while desynced(src, dst): tracker, index = await resync(tracker) - diff = src.index - dst.index # log.info( # '\n'.join(( # f'history index after sync: {index}', @@ -293,18 +295,10 @@ async def cascade( return tracker, diff - # log.debug(f'diff {diff}') - - if ( - # the source is likely backfilling and we must - # sync history calculations - abs(len(src.array) - len(dst.array)) > 0 or - - # we aren't step synced to the source and may be - # leading/lagging by a step - diff > 1 or - diff < 0 - ): + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + if desynced(src, dst): tracker, diff = await poll_and_sync_to_step(tracker) # skip adding a last bar since we should be From dd9f6e8a7c5d127f310877a66409a556603accbf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 12:27:53 -0400 Subject: [PATCH 14/16] Move sync diffing helpers out of index loop --- piker/fsp/_engine.py | 96 ++++++++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 35 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index afd6aa2c..cb51a86f 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -44,6 +44,11 @@ _fsp_builtins = { 'vwap': _tina_vwap, } +# TODO: things to figure the heck out: +# - how to handle non-plottable values (pyqtgraph has facility for this +# now in `arrayToQPath()`) +# - composition of fsps / implicit chaining syntax (we need an issue) + @dataclass class TaskTracker: @@ -54,10 +59,11 @@ class TaskTracker: async def filter_quotes_by_sym( sym: str, - quote_stream, + quote_stream: tractor.MsgStream, ) -> AsyncIterator[dict]: - '''Filter quote stream by target symbol. + ''' + Filter quote stream by target symbol. ''' # TODO: make this the actual first quote from feed @@ -187,8 +193,9 @@ async def cascade( loglevel: Optional[str] = None, ) -> None: - '''Chain streaming signal processors and deliver output to - destination mem buf. + ''' + Chain streaming signal processors and deliver output to + destination shm array buffer. ''' profiler = pg.debug.Profiler(delayed=False, disabled=False) @@ -260,6 +267,46 @@ async def cascade( await tracker.complete.wait() return await n.start(fsp_target) + def is_synced( + src: ShmArray, + dst: ShmArray + ) -> tuple[bool, int, int]: + '''Predicate to dertmine if a destination FSP + output array is aligned to its source array. + + ''' + step_diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return not ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or + + # we aren't step synced to the source and may be + # leading/lagging by a step + step_diff > 1 or + step_diff < 0 + ), step_diff, len_diff + + async def poll_and_sync_to_step( + + tracker: TaskTracker, + src: ShmArray, + dst: ShmArray, + + ) -> tuple[TaskTracker, int]: + + synced, step_diff, _ = is_synced(src, dst) + while not synced: + tracker, index = await resync(tracker) + synced, step_diff, _ = is_synced(src, dst) + + return tracker, step_diff + + s, step, ld = is_synced(src, dst) + if step or ld: + await tractor.breakpoint() + # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. async with feed.index_stream() as stream: @@ -269,41 +316,20 @@ async def cascade( async for msg in stream: - def desynced(src: ShmArray, dst: ShmArray) -> bool: - diff = src.index - dst.index - len_diff = abs(len(src.array) - len(dst.array)) - return ( - # the source is likely backfilling and we must - # sync history calculations - len_diff > 2 or - - # we aren't step synced to the source and may be - # leading/lagging by a step - diff > 1 or - diff < 0 - ) - - async def poll_and_sync_to_step(tracker): - while desynced(src, dst): - tracker, index = await resync(tracker) - # log.info( - # '\n'.join(( - # f'history index after sync: {index}', - # f'diff after sync: {diff}', - # )) - # ) - - return tracker, diff - # respawn the compute task if the source # array has been updated such that we compute # new history from the (prepended) source. - if desynced(src, dst): - tracker, diff = await poll_and_sync_to_step(tracker) + synced, step_diff, _ = is_synced(src, dst) + if not synced: + tracker, step_diff = await poll_and_sync_to_step( + tracker, + src, + dst, + ) - # skip adding a last bar since we should be - # source alinged - if diff == 0: + # skip adding a last bar since we should already + # be step alinged + if step_diff == 0: continue # read out last shm row, copy and write new row From c9136e049440f4936308dae83d6bd5f0d1ea8be7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 12:28:27 -0400 Subject: [PATCH 15/16] Fix rsi history off-by-one due to `np.diff()` --- piker/fsp/_momo.py | 60 +++++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index f8811afa..78461d8a 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -16,6 +16,7 @@ """ Momentum bby. + """ from typing import AsyncIterator, Optional @@ -23,12 +24,9 @@ import numpy as np from numba import jit, float64, optional, int64 from ..data._normalize import iterticks +from ..data._sharedmem import ShmArray -# TODO: things to figure the fuck out: -# - how to handle non-plottable values -# - composition of fsps / implicit chaining - @jit( float64[:]( float64[:], @@ -39,11 +37,14 @@ from ..data._normalize import iterticks nogil=True ) def ema( + y: 'np.ndarray[float64]', alpha: optional(float64) = None, ylast: optional(float64) = None, + ) -> 'np.ndarray[float64]': - r"""Exponential weighted moving average owka 'Exponential smoothing'. + r''' + Exponential weighted moving average owka 'Exponential smoothing'. - https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - https://en.wikipedia.org/wiki/Exponential_smoothing @@ -68,7 +69,8 @@ def ema( More discussion here: https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm - """ + + ''' n = y.shape[0] if alpha is None: @@ -105,14 +107,21 @@ def ema( # nogil=True # ) def rsi( + + # TODO: use https://github.com/ramonhagenaars/nptyping signal: 'np.ndarray[float64]', period: int64 = 14, up_ema_last: float64 = None, down_ema_last: float64 = None, + ) -> 'np.ndarray[float64]': + ''' + relative strengggth. + + ''' alpha = 1/period - df = np.diff(signal) + df = np.diff(signal, prepend=0) up = np.where(df > 0, df, 0) up_ema = ema(up, alpha, up_ema_last) @@ -120,11 +129,12 @@ def rsi( down = np.where(df < 0, -df, 0) down_ema = ema(down, alpha, down_ema_last) - # avoid dbz errors + # avoid dbz errors, this leaves the first + # index == 0 right? rs = np.divide( up_ema, down_ema, - out=np.zeros_like(up_ema), + out=np.zeros_like(signal), where=down_ema != 0 ) @@ -137,10 +147,18 @@ def rsi( def wma( + signal: np.ndarray, length: int, weights: Optional[np.ndarray] = None, + ) -> np.ndarray: + ''' + Compute a windowed moving average of ``signal`` with window + ``length`` and optional ``weights`` (must be same size as + ``signal``). + + ''' if weights is None: # default is a standard arithmetic mean seq = np.full((length,), 1) @@ -151,18 +169,22 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp.signal( +# @piker.fsp.emit( # timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], # ) async def _rsi( + source: 'QuoteStream[Dict[str, Any]]', # noqa - ohlcv: "ShmArray[T<'close'>]", + ohlcv: ShmArray, period: int = 14, + ) -> AsyncIterator[np.ndarray]: - """Multi-timeframe streaming RSI. + ''' + Multi-timeframe streaming RSI. https://en.wikipedia.org/wiki/Relative_strength_index - """ + + ''' sig = ohlcv.array['close'] # wilder says to seed the RSI EMAs with the SMA for the "period" @@ -170,7 +192,8 @@ async def _rsi( # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) + rsi_h, last_up_ema_close, last_down_ema_close = rsi( + sig, period, seed, seed) up_ema_last = last_up_ema_close down_ema_last = last_down_ema_close @@ -178,7 +201,6 @@ async def _rsi( yield rsi_h index = ohlcv.index - async for quote in source: # tick based updates for tick in iterticks(quote): @@ -206,16 +228,20 @@ async def _rsi( async def _wma( + source, #: AsyncStream[np.ndarray], length: int, ohlcv: np.ndarray, # price time-frame "aware" + ) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? - """Streaming weighted moving average. + ''' + Streaming weighted moving average. ``weights`` is a sequence of already scaled values. As an example for the WMA often found in "techincal analysis": ``weights = np.arange(1, N) * N*(N-1)/2``. - """ + + ''' # deliver historical output as "first yield" yield wma(ohlcv.array['close'], length) From cbec7df225e9ca26c58c9bc765f9658e578f51c8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Oct 2021 12:08:51 -0400 Subject: [PATCH 16/16] Drop old bps from fsp engine --- piker/fsp/_engine.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index cb51a86f..883f5853 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -114,7 +114,6 @@ async def fsp_compute( # and get historical output history_output = await out_stream.__anext__() - # await tractor.breakpoint() profiler(f'{func_name} generated history') # build a struct array which includes an 'index' field to push @@ -304,8 +303,6 @@ async def cascade( return tracker, step_diff s, step, ld = is_synced(src, dst) - if step or ld: - await tractor.breakpoint() # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed.