diff --git a/piker/data/flows.py b/piker/data/flows.py index 38345cea..f857d2f0 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -42,26 +42,6 @@ if TYPE_CHECKING: from .feed import Feed -# TODO: ideas for further abstractions as per -# https://github.com/pikers/piker/issues/216 and -# https://github.com/pikers/piker/issues/270: -# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes`` -# as per circuit parlance: -# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection -# - could cover the combination of our `FspAdmin` and the -# backend `.fsp._engine` related machinery to "connect" one flume -# to another? -# - a (financial signal) ``Flow`` would be the a "collection" of such -# minmial cascades. Some engineering based jargon concepts: -# - https://en.wikipedia.org/wiki/Signal_chain -# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering) -# - https://en.wikipedia.org/wiki/Audio_signal_flow -# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation -# - https://en.wikipedia.org/wiki/Dataflow_programming -# - https://en.wikipedia.org/wiki/Signal_programming -# - https://en.wikipedia.org/wiki/Incremental_computing - - class Flume(Struct): ''' Composite reference type which points to all the addressing handles diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index b4cccdae..29b93631 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -18,7 +18,8 @@ core task logic for processing chains ''' -from dataclasses import dataclass +from __future__ import annotations +from contextlib import asynccontextmanager as acm from functools import partial from typing import ( AsyncIterator, @@ -33,6 +34,7 @@ from trio_typing import TaskStatus import tractor from tractor.msg import NamespacePath +from piker.types import Struct from ..log import get_logger, get_console_log from .. import data from ..data import attach_shm_array @@ -56,12 +58,6 @@ from ..toolz import Profiler log = get_logger(__name__) -@dataclass -class TaskTracker: - complete: trio.Event - cs: trio.CancelScope - - async def filter_quotes_by_sym( sym: str, @@ -82,9 +78,133 @@ async def filter_quotes_by_sym( if quote: yield quote +# TODO: unifying the abstractions in this FSP subsys/layer: +# -[ ] move the `.data.flows.Flume` type into this +# module/subsys/pkg? +# -[ ] ideas for further abstractions as per +# - https://github.com/pikers/piker/issues/216, +# - https://github.com/pikers/piker/issues/270: +# - a (financial signal) ``Flow`` would be the a "collection" of such +# minmial cascades. Some engineering based jargon concepts: +# - https://en.wikipedia.org/wiki/Signal_chain +# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering) +# - https://en.wikipedia.org/wiki/Audio_signal_flow +# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation +# - https://en.wikipedia.org/wiki/Dataflow_programming +# - https://en.wikipedia.org/wiki/Signal_programming +# - https://en.wikipedia.org/wiki/Incremental_computing +# - https://en.wikipedia.org/wiki/Signal-flow_graph +# - https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components -async def fsp_compute( +# -[ ] we probably want to eval THE BELOW design and unify with the +# proto `TaskManager` in the `tractor` dev branch as well as with +# our below idea for `Cascade`: +# - https://github.com/goodboy/tractor/pull/363 +class Cascade(Struct): + ''' + As per sig-proc engineering parlance, this is a chaining of + `Flume`s, which are themselves collections of "Streams" + implemented currently via `ShmArray`s. + A `Cascade` is be the minimal "connection" of 2 `Flumes` + as per circuit parlance: + https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection + + TODO: + -[ ] could cover the combination of our `FspAdmin` and the + backend `.fsp._engine` related machinery to "connect" one flume + to another? + + ''' + # TODO: make these `Flume`s + src: ShmArray + dst: ShmArray + tn: trio.Nursery + fsp: Fsp # UI-side middleware ctl API + + # filled during cascade/.bind_func() (fsp_compute) init phases + bind_func: Callable | None = None + complete: trio.Event | None = None + cs: trio.CancelScope | None = None + client_stream: tractor.MsgStream | None = None + + async def resync(self) -> int: + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + log.info(f're-syncing fsp {self.fsp.name} to source') + self.cs.cancel() + await self.complete.wait() + index: int = await self.tn.start(self.bind_func) + + # always trigger UI refresh after history update, + # see ``piker.ui._fsp.FspAdmin.open_chain()`` and + # ``piker.ui._display.trigger_update()``. + await self.client_stream.send({ + 'fsp_update': { + 'key': self.dst.token, + 'first': self.dst._first.value, + 'last': self.dst._last.value, + } + }) + return index + + def is_synced(self) -> tuple[bool, int, int]: + ''' + Predicate to dertmine if a destination FSP + output array is aligned to its source array. + + ''' + src: ShmArray = self.src + dst: ShmArray = self.dst + step_diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + synced: bool = not ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 + + # we aren't step synced to the source and may be + # leading/lagging by a step + or step_diff > 1 + or step_diff < 0 + ) + if not synced: + fsp: Fsp = self.fsp + log.warning( + '***DESYNCED FSP***\n' + f'{fsp.ns_path}@{src.token}\n' + f'step_diff: {step_diff}\n' + f'len_diff: {len_diff}\n' + ) + return ( + synced, + step_diff, + len_diff, + ) + + async def poll_and_sync_to_step(self) -> int: + synced, step_diff, _ = self.is_synced() #src, dst) + while not synced: + await self.resync() + synced, step_diff, _ = self.is_synced() #src, dst) + + return step_diff + + @acm + async def open_edge( + self, + bind_func: Callable, + ) -> int: + self.bind_func = bind_func + index = await self.tn.start(bind_func) + yield index + # TODO: what do we want on teardown/error? + # -[ ] dynamic reconnection after update? + + +async def connect_streams( + + casc: Cascade, mkt: MktPair, flume: Flume, quote_stream: trio.abc.ReceiveChannel, @@ -98,13 +218,27 @@ async def fsp_compute( task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: + ''' + Stream and per-sample compute and write the cascade of + 2 `Flumes`/streams given some operating `func`. + https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components + + Not literally, but something like: + + func(Flume_in) -> Flume_out + + ''' profiler = Profiler( delayed=False, disabled=True ) - fqme = mkt.fqme + fqme: str = mkt.fqme + + # TODO: dynamic introspection of what the underlying (vertex) + # function actually requires from input node (flumes) then + # deliver those inputs as part of a graph "compilation" step? out_stream = func( # TODO: do we even need this if we do the feed api right? @@ -113,7 +247,8 @@ async def fsp_compute( # async itertools style? filter_quotes_by_sym(fqme, quote_stream), - # XXX: currently the ``ohlcv`` arg + # XXX: currently the ``ohlcv`` arg, but we should allow + # (dynamic) requests for src flume (node) streams? flume.rt_shm, ) @@ -216,12 +351,9 @@ async def fsp_compute( # setup a respawn handle with trio.CancelScope() as cs: - # TODO: might be better to just make a "restart" method where - # the target task is spawned implicitly and then the event is - # set via some higher level api? At that poing we might as well - # be writing a one-cancels-one nursery though right? - tracker = TaskTracker(trio.Event(), cs) - task_status.started((tracker, index)) + casc.cs = cs + casc.complete = trio.Event() + task_status.started(index) profiler(f'{func_name} yield last index') @@ -262,7 +394,7 @@ async def fsp_compute( # log.info(f'FSP quote too fast: {hz}') # last = time.time() finally: - tracker.complete.set() + casc.complete.set() @tractor.context @@ -273,6 +405,7 @@ async def cascade( # data feed key fqme: str, + # TODO: expect and attach from `Flume.to_msg()`s! src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], @@ -297,8 +430,8 @@ async def cascade( if loglevel: get_console_log(loglevel) - src = attach_shm_array(token=src_shm_token) - dst = attach_shm_array(readonly=False, token=dst_shm_token) + src: ShmArray = attach_shm_array(token=src_shm_token) + dst: ShmArray = attach_shm_array(readonly=False, token=dst_shm_token) reg = _load_builtins() lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg]) @@ -320,7 +453,7 @@ async def cascade( fsp: Fsp = reg.get( NamespacePath(ns_path) ) - func = fsp.func + func: Callable = fsp.func if not func: # TODO: assume it's a func target path @@ -341,17 +474,32 @@ async def cascade( flume = feed.flumes[fqme] mkt = flume.mkt + + # TODO: make an equivalent `Flume` around the Fsp output + # streams and chain them using a `Cascade` Bo assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') - func_name = func.__name__ + func_name: str = func.__name__ async with ( - trio.open_nursery() as n, + trio.open_nursery() as tn, ): + # TODO: might be better to just make a "restart" method where + # the target task is spawned implicitly and then the event is + # set via some higher level api? At that poing we might as well + # be writing a one-cancels-one nursery though right? + casc = Cascade( + src, + dst, + tn, + fsp, + ) + # TODO: this seems like it should be wrapped somewhere? fsp_target = partial( - fsp_compute, + connect_streams, + casc=casc, mkt=mkt, flume=flume, quote_stream=flume.stream, @@ -360,156 +508,95 @@ async def cascade( src=src, dst=dst, - # target + # chain function which takes src flume input(s) + # and renders dst flume output(s) func=func ) + async with casc.open_edge( + bind_func=fsp_target, + ) as index: + # casc.bind_func = fsp_target + # index = await tn.start(fsp_target) - tracker, index = await n.start(fsp_target) + if zero_on_step: + last = dst.array[-1:] + zeroed = np.zeros(last.shape, dtype=last.dtype) - if zero_on_step: - last = dst.array[-1:] - zeroed = np.zeros(last.shape, dtype=last.dtype) + profiler(f'{func_name}: fsp up') - profiler(f'{func_name}: fsp up') + # sync client + await ctx.started(index) - # sync client - await ctx.started(index) + # XXX: rt stream with client which we MUST + # open here (and keep it open) in order to make + # incremental "updates" as history prepends take + # place. + async with ctx.open_stream() as client_stream: + casc.client_stream = client_stream - # XXX: rt stream with client which we MUST - # open here (and keep it open) in order to make - # incremental "updates" as history prepends take - # place. - async with ctx.open_stream() as client_stream: + s, step, ld = casc.is_synced() #src, dst) - # TODO: these likely should all become - # methods of this ``TaskLifetime`` or wtv - # abstraction.. - async def resync( - tracker: TaskTracker, + # detect sample period step for subscription to increment + # signal + times = src.array['time'] + if len(times) > 1: + last_ts = times[-1] + delay_s = float(last_ts - times[times != last_ts][-1]) + else: + # our default "HFT" sample rate. + delay_s = _default_delay_s - ) -> tuple[TaskTracker, int]: - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - log.info(f're-syncing fsp {func_name} to source') - tracker.cs.cancel() - await tracker.complete.wait() - tracker, index = await n.start(fsp_target) + # sub and increment the underlying shared memory buffer + # on every step msg received from the global `samplerd` + # service. + async with open_sample_stream(float(delay_s)) as istream: - # always trigger UI refresh after history update, - # see ``piker.ui._fsp.FspAdmin.open_chain()`` and - # ``piker.ui._display.trigger_update()``. - await client_stream.send({ - 'fsp_update': { - 'key': dst_shm_token, - 'first': dst._first.value, - 'last': dst._last.value, - } - }) - return tracker, index + profiler(f'{func_name}: sample stream up') + profiler.finish() - def is_synced( - src: ShmArray, - dst: ShmArray - ) -> tuple[bool, int, int]: - ''' - Predicate to dertmine if a destination FSP - output array is aligned to its source array. + async for i in istream: + # print(f'FSP incrementing {i}') - ''' - step_diff = src.index - dst.index - len_diff = abs(len(src.array) - len(dst.array)) - return not ( - # the source is likely backfilling and we must - # sync history calculations - len_diff > 2 + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + synced, step_diff, _ = casc.is_synced() #src, dst) + if not synced: + step_diff: int = await casc.poll_and_sync_to_step() - # we aren't step synced to the source and may be - # leading/lagging by a step - or step_diff > 1 - or step_diff < 0 - ), step_diff, len_diff + # skip adding a last bar since we should already + # be step alinged + if step_diff == 0: + continue - async def poll_and_sync_to_step( - tracker: TaskTracker, - src: ShmArray, - dst: ShmArray, + # read out last shm row, copy and write new row + array = dst.array - ) -> tuple[TaskTracker, int]: + # some metrics like vlm should be reset + # to zero every step. + if zero_on_step: + last = zeroed + else: + last = array[-1:].copy() - synced, step_diff, _ = is_synced(src, dst) - while not synced: - tracker, index = await resync(tracker) - synced, step_diff, _ = is_synced(src, dst) + dst.push(last) - return tracker, step_diff + # sync with source buffer's time step + src_l2 = src.array[-2:] + src_li, src_lt = src_l2[-1][['index', 'time']] + src_2li, src_2lt = src_l2[-2][['index', 'time']] + dst._array['time'][src_li] = src_lt + dst._array['time'][src_2li] = src_2lt - s, step, ld = is_synced(src, dst) - - # detect sample period step for subscription to increment - # signal - times = src.array['time'] - if len(times) > 1: - last_ts = times[-1] - delay_s = float(last_ts - times[times != last_ts][-1]) - else: - # our default "HFT" sample rate. - delay_s = _default_delay_s - - # sub and increment the underlying shared memory buffer - # on every step msg received from the global `samplerd` - # service. - async with open_sample_stream(float(delay_s)) as istream: - - profiler(f'{func_name}: sample stream up') - profiler.finish() - - async for i in istream: - # print(f'FSP incrementing {i}') - - # respawn the compute task if the source - # array has been updated such that we compute - # new history from the (prepended) source. - synced, step_diff, _ = is_synced(src, dst) - if not synced: - tracker, step_diff = await poll_and_sync_to_step( - tracker, - src, - dst, - ) - - # skip adding a last bar since we should already - # be step alinged - if step_diff == 0: - continue - - # read out last shm row, copy and write new row - array = dst.array - - # some metrics like vlm should be reset - # to zero every step. - if zero_on_step: - last = zeroed - else: - last = array[-1:].copy() - - dst.push(last) - - # sync with source buffer's time step - src_l2 = src.array[-2:] - src_li, src_lt = src_l2[-1][['index', 'time']] - src_2li, src_2lt = src_l2[-2][['index', 'time']] - dst._array['time'][src_li] = src_lt - dst._array['time'][src_2li] = src_2lt - - # last2 = dst.array[-2:] - # if ( - # last2[-1]['index'] != src_li - # or last2[-2]['index'] != src_2li - # ): - # dstl2 = list(last2) - # srcl2 = list(src_l2) - # print( - # # f'{dst.token}\n' - # f'src: {srcl2}\n' - # f'dst: {dstl2}\n' - # ) + # last2 = dst.array[-2:] + # if ( + # last2[-1]['index'] != src_li + # or last2[-2]['index'] != src_2li + # ): + # dstl2 = list(last2) + # srcl2 = list(src_l2) + # print( + # # f'{dst.token}\n' + # f'src: {srcl2}\n' + # f'dst: {dstl2}\n' + # )