diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index f3f2d5de..ac6ac0d7 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -36,7 +36,7 @@ from ..data._sharedmem import ShmArray log = get_logger(__name__) -_fsps = { +_fsp_builtins = { 'rsi': _rsi, 'wma': _wma, 'vwap': _tina_vwap, @@ -65,16 +65,39 @@ async def latency( yield value +async def filter_quotes_by_sym( + + sym: str, + quote_stream, + +) -> AsyncIterator[dict]: + '''Filter quote stream by target symbol. + + ''' + # TODO: make this the actualy first quote from feed + # XXX: this allows for a single iteration to run for history + # processing without waiting on the real-time feed for a new quote + yield {} + + # task cancellation won't kill the channel + # since we shielded at the `open_feed()` call + async for quotes in quote_stream: + for symbol, quote in quotes.items(): + if symbol == sym: + yield quote + + async def fsp_compute( - ctx: tractor.Context, + + stream: tractor.MsgStream, symbol: str, feed: Feed, - stream: trio.abc.ReceiveChannel, + quote_stream: trio.abc.ReceiveChannel, src: ShmArray, dst: ShmArray, - fsp_func_name: str, + func_name: str, func: Callable, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, @@ -83,25 +106,13 @@ async def fsp_compute( # TODO: load appropriate fsp with input args - async def filter_by_sym( - sym: str, - stream, - ): - - # TODO: make this the actualy first quote from feed - # XXX: this allows for a single iteration to run for history - # processing without waiting on the real-time feed for a new quote - yield {} - - # task cancellation won't kill the channel - # since we shielded at the `open_feed()` call - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes - out_stream = func( - filter_by_sym(symbol, stream), + + # TODO: do we even need this if we do the feed api right? + # shouldn't a local stream do this before we get a handle + # to the async iterable? it's that or we do some kinda + # async itertools style? + filter_quotes_by_sym(symbol, quote_stream), feed.shm, ) @@ -129,11 +140,11 @@ async def fsp_compute( np.arange(len(history_output)), dtype=dst.array.dtype ) - history[fsp_func_name] = history_output + history[func_name] = history_output # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) - if diff >= 0: + if diff > 0: log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") for _ in range(diff): dst.push(history[:1]) @@ -141,36 +152,44 @@ async def fsp_compute( # compare with source signal and time align index = dst.push(history) - await ctx.send_yield(index) - # setup a respawn handle with trio.CancelScope() as cs: - task_status.started(cs) + task_status.started((cs, index)) + + import time + last = time.time() # rt stream async for processed in out_stream: - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') + period = time.time() - last + hz = 1/period if period else float('nan') + if hz > 60: + log.info(f'FSP quote too fast: {hz}') - log.debug(f"{fsp_func_name}: {processed}") + log.debug(f"{func_name}: {processed}") index = src.index - dst.array[-1][fsp_func_name] = processed + dst.array[-1][func_name] = processed - # stream latest shm array index entry - await ctx.send_yield(index) + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + await stream.send(index) + + last = time.time() -@tractor.stream +@tractor.context async def cascade( + ctx: tractor.Context, brokername: str, + src_shm_token: dict, dst_shm_token: Tuple[str, np.dtype], + symbol: str, - fsp_func_name: str, + func_name: str, + loglevel: Optional[str] = None, ) -> None: @@ -184,39 +203,50 @@ async def cascade( src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) - func: Callable = _fsps[fsp_func_name] + func: Callable = _fsp_builtins.get(func_name) + if not func: + # TODO: assume it's a func target path + raise ValueError('Unknown fsp target: {func_name}') # open a data feed stream with requested broker async with data.feed.maybe_open_feed( brokername, [symbol], - # TODO: + # TODO throttle tick outputs from *this* daemon since + # it'll emit tons of ticks due to the throttle only + # limits quote arrival periods, so the consumer of *this* + # needs to get throttled the ticks we generate. # tick_throttle=60, - ) as (feed, stream): + ) as (feed, quote_stream): assert src.token == feed.shm.token - last_len = new_len = len(src.array) - fsp_target = partial( - fsp_compute, - ctx=ctx, - symbol=symbol, - feed=feed, - stream=stream, + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): - src=src, - dst=dst, + fsp_target = partial( - fsp_func_name=fsp_func_name, - func=func - ) + fsp_compute, + stream=stream, + symbol=symbol, + feed=feed, + quote_stream=quote_stream, - async with trio.open_nursery() as n: + # shm + src=src, + dst=dst, - cs = await n.start(fsp_target) + func_name=func_name, + func=func + ) + + cs, index = await n.start(fsp_target) + await ctx.started(index) # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. @@ -229,17 +259,15 @@ async def cascade( if new_len > last_len + 1: # respawn the signal compute task if the source # signal has been updated + log.warning(f'Re-spawning fsp {func_name}') cs.cancel() - cs = await n.start(fsp_target) + cs, index = await n.start(fsp_target) # TODO: adopt an incremental update engine/approach # where possible here eventually! - # read out last shm row + # read out last shm row, copy and write new row array = dst.array last = array[-1:].copy() - - # write new row to the shm buffer dst.push(last) - last_len = new_len