# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' core task logic for processing chains ''' from dataclasses import dataclass from functools import partial from typing import ( AsyncIterator, Callable, Optional, Union, ) import numpy as np import trio from trio_typing import TaskStatus import tractor from tractor.msg import NamespacePath from ..log import get_logger, get_console_log from .. import data from ..data import attach_shm_array from ..data.feed import ( Flume, Feed, ) from ..data._sharedmem import ShmArray from ..data._sampling import ( _default_delay_s, open_sample_stream, ) from ..accounting._mktinfo import Symbol from ._api import ( Fsp, _load_builtins, _Token, ) from .._profile import Profiler log = get_logger(__name__) @dataclass class TaskTracker: complete: trio.Event cs: trio.CancelScope async def filter_quotes_by_sym( sym: str, quote_stream: tractor.MsgStream, ) -> AsyncIterator[dict]: ''' Filter quote stream by target symbol. ''' # TODO: make this the actual first quote from feed # XXX: this allows for a single iteration to run for history # processing without waiting on the real-time feed for a new quote yield {} async for quotes in quote_stream: quote = quotes.get(sym) if quote: yield quote async def fsp_compute( symbol: Symbol, flume: Flume, quote_stream: trio.abc.ReceiveChannel, src: ShmArray, dst: ShmArray, func: Callable, # attach_stream: bool = False, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: profiler = Profiler( delayed=False, disabled=True ) fqsn = symbol.fqme out_stream = func( # TODO: do we even need this if we do the feed api right? # shouldn't a local stream do this before we get a handle # to the async iterable? it's that or we do some kinda # async itertools style? filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg flume.rt_shm, ) # HISTORY COMPUTE PHASE # conduct a single iteration of fsp with historical bars input # and get historical output. history_output: Union[ dict[str, np.ndarray], # multi-output case np.ndarray, # single output case ] history_output = await anext(out_stream) func_name = func.__name__ profiler(f'{func_name} generated history') # build struct array with an 'index' field to push as history # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # if the output array is multi-field then push # each respective field. fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') history_by_field: Optional[np.ndarray] = None src_time = src.array['time'] if ( fields and len(fields) > 1 ): if not isinstance(history_output, dict): raise ValueError( f'`{func_name}` is a multi-output FSP and should yield a ' '`dict[str, np.ndarray]` for history' ) for key in fields.keys(): if key in history_output: output = history_output[key] if history_by_field is None: if output is None: length = len(src.array) else: length = len(output) # using the first output, determine # the length of the struct-array that # will be pushed to shm. history_by_field = np.zeros( length, dtype=dst.array.dtype ) if output is None: continue history_by_field[key] = output # single-key output stream else: if not isinstance(history_output, np.ndarray): raise ValueError( f'`{func_name}` is a single output FSP and should yield an ' '`np.ndarray` for history' ) history_by_field = np.zeros( len(history_output), dtype=dst.array.dtype ) history_by_field[func_name] = history_output history_by_field['time'] = src_time[-len(history_by_field):] history_output['time'] = src.array['time'] # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're # prepending a copy of the first value a few times to make # sub-curves align with the parent bar chart. # This likely needs to be fixed either by, # - manually assigning the index and historical data # seperately to the shm array (i.e. not using .push()) # - developing some system on top of the shared mem array that # is `index` aware such that historical data can be indexed # relative to the true first datum? Not sure if this is sane # for incremental compuations. first = dst._first.value = src._first.value # TODO: can we use this `start` flag instead of the manual # setting above? index = dst.push( history_by_field, start=first, ) profiler(f'{func_name} pushed history') profiler.finish() # setup a respawn handle with trio.CancelScope() as cs: # TODO: might be better to just make a "restart" method where # the target task is spawned implicitly and then the event is # set via some higher level api? At that poing we might as well # be writing a one-cancels-one nursery though right? tracker = TaskTracker(trio.Event(), cs) task_status.started((tracker, index)) profiler(f'{func_name} yield last index') # import time # last = time.time() try: async for processed in out_stream: log.debug(f"{func_name}: {processed}") key, output = processed # dst.array[-1][key] = output dst.array[[key, 'time']][-1] = ( output, # TODO: what about pushing ``time.time_ns()`` # in which case we'll need to round at the graphics # processing / sampling layer? src.array[-1]['time'] ) # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts # as trigger msg to tell the consumer to read from shm # TODO: further this should likely be implemented much # like our `Feed` api where there is one background # "service" task which computes output and then sends to # N-consumers who subscribe for the real-time output, # which we'll likely want to implement using local-mem # chans for the fan out? # index = src.index # if attach_stream: # await client_stream.send(index) # period = time.time() - last # hz = 1/period if period else float('nan') # if hz > 60: # log.info(f'FSP quote too fast: {hz}') # last = time.time() finally: tracker.complete.set() @tractor.context async def cascade( ctx: tractor.Context, # data feed key fqsn: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], ns_path: NamespacePath, shm_registry: dict[str, _Token], zero_on_step: bool = False, loglevel: Optional[str] = None, ) -> None: ''' Chain streaming signal processors and deliver output to destination shm array buffer. ''' profiler = Profiler( delayed=False, disabled=False ) if loglevel: get_console_log(loglevel) src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) reg = _load_builtins() lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg]) log.info( f'Registered FSP set:\n{lines}' ) # update actorlocal flows table which registers # readonly "instances" of this fsp for symbol/source # so that consumer fsps can look it up by source + fsp. # TODO: ugh i hate this wind/unwind to list over the wire # but not sure how else to do it. for (token, fsp_name, dst_token) in shm_registry: Fsp._flow_registry[( _Token.from_msg(token), fsp_name, )] = _Token.from_msg(dst_token), None fsp: Fsp = reg.get( NamespacePath(ns_path) ) func = fsp.func if not func: # TODO: assume it's a func target path raise ValueError(f'Unknown fsp target: {ns_path}') # open a data feed stream with requested broker feed: Feed async with data.feed.maybe_open_feed( [fqsn], # TODO throttle tick outputs from *this* daemon since # it'll emit tons of ticks due to the throttle only # limits quote arrival periods, so the consumer of *this* # needs to get throttled the ticks we generate. # tick_throttle=60, ) as feed: flume = feed.flumes[fqsn] symbol = flume.symbol assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') func_name = func.__name__ async with ( trio.open_nursery() as n, ): fsp_target = partial( fsp_compute, symbol=symbol, flume=flume, quote_stream=flume.stream, # shm src=src, dst=dst, # target func=func ) tracker, index = await n.start(fsp_target) if zero_on_step: last = dst.array[-1:] zeroed = np.zeros(last.shape, dtype=last.dtype) profiler(f'{func_name}: fsp up') # sync client await ctx.started(index) # XXX: rt stream with client which we MUST # open here (and keep it open) in order to make # incremental "updates" as history prepends take # place. async with ctx.open_stream() as client_stream: # TODO: these likely should all become # methods of this ``TaskLifetime`` or wtv # abstraction.. async def resync( tracker: TaskTracker, ) -> tuple[TaskTracker, int]: # TODO: adopt an incremental update engine/approach # where possible here eventually! log.info(f're-syncing fsp {func_name} to source') tracker.cs.cancel() await tracker.complete.wait() tracker, index = await n.start(fsp_target) # always trigger UI refresh after history update, # see ``piker.ui._fsp.FspAdmin.open_chain()`` and # ``piker.ui._display.trigger_update()``. await client_stream.send({ 'fsp_update': { 'key': dst_shm_token, 'first': dst._first.value, 'last': dst._last.value, } }) return tracker, index def is_synced( src: ShmArray, dst: ShmArray ) -> tuple[bool, int, int]: ''' Predicate to dertmine if a destination FSP output array is aligned to its source array. ''' step_diff = src.index - dst.index len_diff = abs(len(src.array) - len(dst.array)) return not ( # the source is likely backfilling and we must # sync history calculations len_diff > 2 # we aren't step synced to the source and may be # leading/lagging by a step or step_diff > 1 or step_diff < 0 ), step_diff, len_diff async def poll_and_sync_to_step( tracker: TaskTracker, src: ShmArray, dst: ShmArray, ) -> tuple[TaskTracker, int]: synced, step_diff, _ = is_synced(src, dst) while not synced: tracker, index = await resync(tracker) synced, step_diff, _ = is_synced(src, dst) return tracker, step_diff s, step, ld = is_synced(src, dst) # detect sample period step for subscription to increment # signal times = src.array['time'] if len(times) > 1: last_ts = times[-1] delay_s = float(last_ts - times[times != last_ts][-1]) else: # our default "HFT" sample rate. delay_s = _default_delay_s # sub and increment the underlying shared memory buffer # on every step msg received from the global `samplerd` # service. async with open_sample_stream(float(delay_s)) as istream: profiler(f'{func_name}: sample stream up') profiler.finish() async for i in istream: # print(f'FSP incrementing {i}') # respawn the compute task if the source # array has been updated such that we compute # new history from the (prepended) source. synced, step_diff, _ = is_synced(src, dst) if not synced: tracker, step_diff = await poll_and_sync_to_step( tracker, src, dst, ) # skip adding a last bar since we should already # be step alinged if step_diff == 0: continue # read out last shm row, copy and write new row array = dst.array # some metrics like vlm should be reset # to zero every step. if zero_on_step: last = zeroed else: last = array[-1:].copy() dst.push(last) # sync with source buffer's time step src_l2 = src.array[-2:] src_li, src_lt = src_l2[-1][['index', 'time']] src_2li, src_2lt = src_l2[-2][['index', 'time']] dst._array['time'][src_li] = src_lt dst._array['time'][src_2li] = src_2lt # last2 = dst.array[-2:] # if ( # last2[-1]['index'] != src_li # or last2[-2]['index'] != src_2li # ): # dstl2 = list(last2) # srcl2 = list(src_l2) # print( # # f'{dst.token}\n' # f'src: {srcl2}\n' # f'dst: {dstl2}\n' # )