fsp: intro a `Cascade` type that connects `Flume`s of streams

distribute_dis
Tyler Goodlet 2023-12-05 16:59:07 -05:00
parent b8065a413b
commit 656e2c6a88
2 changed files with 245 additions and 178 deletions

View File

@ -42,26 +42,6 @@ if TYPE_CHECKING:
from .feed import Feed from .feed import Feed
# TODO: ideas for further abstractions as per
# https://github.com/pikers/piker/issues/216 and
# https://github.com/pikers/piker/issues/270:
# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes``
# as per circuit parlance:
# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
# - could cover the combination of our `FspAdmin` and the
# backend `.fsp._engine` related machinery to "connect" one flume
# to another?
# - a (financial signal) ``Flow`` would be the a "collection" of such
# minmial cascades. Some engineering based jargon concepts:
# - https://en.wikipedia.org/wiki/Signal_chain
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
# - https://en.wikipedia.org/wiki/Audio_signal_flow
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
# - https://en.wikipedia.org/wiki/Dataflow_programming
# - https://en.wikipedia.org/wiki/Signal_programming
# - https://en.wikipedia.org/wiki/Incremental_computing
class Flume(Struct): class Flume(Struct):
''' '''
Composite reference type which points to all the addressing handles Composite reference type which points to all the addressing handles

View File

@ -18,7 +18,8 @@
core task logic for processing chains core task logic for processing chains
''' '''
from dataclasses import dataclass from __future__ import annotations
from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from typing import ( from typing import (
AsyncIterator, AsyncIterator,
@ -33,6 +34,7 @@ from trio_typing import TaskStatus
import tractor import tractor
from tractor.msg import NamespacePath from tractor.msg import NamespacePath
from piker.types import Struct
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .. import data from .. import data
from ..data import attach_shm_array from ..data import attach_shm_array
@ -56,12 +58,6 @@ from ..toolz import Profiler
log = get_logger(__name__) log = get_logger(__name__)
@dataclass
class TaskTracker:
complete: trio.Event
cs: trio.CancelScope
async def filter_quotes_by_sym( async def filter_quotes_by_sym(
sym: str, sym: str,
@ -82,9 +78,133 @@ async def filter_quotes_by_sym(
if quote: if quote:
yield quote yield quote
# TODO: unifying the abstractions in this FSP subsys/layer:
# -[ ] move the `.data.flows.Flume` type into this
# module/subsys/pkg?
# -[ ] ideas for further abstractions as per
# - https://github.com/pikers/piker/issues/216,
# - https://github.com/pikers/piker/issues/270:
# - a (financial signal) ``Flow`` would be the a "collection" of such
# minmial cascades. Some engineering based jargon concepts:
# - https://en.wikipedia.org/wiki/Signal_chain
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
# - https://en.wikipedia.org/wiki/Audio_signal_flow
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
# - https://en.wikipedia.org/wiki/Dataflow_programming
# - https://en.wikipedia.org/wiki/Signal_programming
# - https://en.wikipedia.org/wiki/Incremental_computing
# - https://en.wikipedia.org/wiki/Signal-flow_graph
# - https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
async def fsp_compute( # -[ ] we probably want to eval THE BELOW design and unify with the
# proto `TaskManager` in the `tractor` dev branch as well as with
# our below idea for `Cascade`:
# - https://github.com/goodboy/tractor/pull/363
class Cascade(Struct):
'''
As per sig-proc engineering parlance, this is a chaining of
`Flume`s, which are themselves collections of "Streams"
implemented currently via `ShmArray`s.
A `Cascade` is be the minimal "connection" of 2 `Flumes`
as per circuit parlance:
https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
TODO:
-[ ] could cover the combination of our `FspAdmin` and the
backend `.fsp._engine` related machinery to "connect" one flume
to another?
'''
# TODO: make these `Flume`s
src: ShmArray
dst: ShmArray
tn: trio.Nursery
fsp: Fsp # UI-side middleware ctl API
# filled during cascade/.bind_func() (fsp_compute) init phases
bind_func: Callable | None = None
complete: trio.Event | None = None
cs: trio.CancelScope | None = None
client_stream: tractor.MsgStream | None = None
async def resync(self) -> int:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.info(f're-syncing fsp {self.fsp.name} to source')
self.cs.cancel()
await self.complete.wait()
index: int = await self.tn.start(self.bind_func)
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await self.client_stream.send({
'fsp_update': {
'key': self.dst.token,
'first': self.dst._first.value,
'last': self.dst._last.value,
}
})
return index
def is_synced(self) -> tuple[bool, int, int]:
'''
Predicate to dertmine if a destination FSP
output array is aligned to its source array.
'''
src: ShmArray = self.src
dst: ShmArray = self.dst
step_diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
synced: bool = not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2
# we aren't step synced to the source and may be
# leading/lagging by a step
or step_diff > 1
or step_diff < 0
)
if not synced:
fsp: Fsp = self.fsp
log.warning(
'***DESYNCED FSP***\n'
f'{fsp.ns_path}@{src.token}\n'
f'step_diff: {step_diff}\n'
f'len_diff: {len_diff}\n'
)
return (
synced,
step_diff,
len_diff,
)
async def poll_and_sync_to_step(self) -> int:
synced, step_diff, _ = self.is_synced() #src, dst)
while not synced:
await self.resync()
synced, step_diff, _ = self.is_synced() #src, dst)
return step_diff
@acm
async def open_edge(
self,
bind_func: Callable,
) -> int:
self.bind_func = bind_func
index = await self.tn.start(bind_func)
yield index
# TODO: what do we want on teardown/error?
# -[ ] dynamic reconnection after update?
async def connect_streams(
casc: Cascade,
mkt: MktPair, mkt: MktPair,
flume: Flume, flume: Flume,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -98,13 +218,27 @@ async def fsp_compute(
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Stream and per-sample compute and write the cascade of
2 `Flumes`/streams given some operating `func`.
https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
Not literally, but something like:
func(Flume_in) -> Flume_out
'''
profiler = Profiler( profiler = Profiler(
delayed=False, delayed=False,
disabled=True disabled=True
) )
fqme = mkt.fqme fqme: str = mkt.fqme
# TODO: dynamic introspection of what the underlying (vertex)
# function actually requires from input node (flumes) then
# deliver those inputs as part of a graph "compilation" step?
out_stream = func( out_stream = func(
# TODO: do we even need this if we do the feed api right? # TODO: do we even need this if we do the feed api right?
@ -113,7 +247,8 @@ async def fsp_compute(
# async itertools style? # async itertools style?
filter_quotes_by_sym(fqme, quote_stream), filter_quotes_by_sym(fqme, quote_stream),
# XXX: currently the ``ohlcv`` arg # XXX: currently the ``ohlcv`` arg, but we should allow
# (dynamic) requests for src flume (node) streams?
flume.rt_shm, flume.rt_shm,
) )
@ -216,12 +351,9 @@ async def fsp_compute(
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where casc.cs = cs
# the target task is spawned implicitly and then the event is casc.complete = trio.Event()
# set via some higher level api? At that poing we might as well task_status.started(index)
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
@ -262,7 +394,7 @@ async def fsp_compute(
# log.info(f'FSP quote too fast: {hz}') # log.info(f'FSP quote too fast: {hz}')
# last = time.time() # last = time.time()
finally: finally:
tracker.complete.set() casc.complete.set()
@tractor.context @tractor.context
@ -273,6 +405,7 @@ async def cascade(
# data feed key # data feed key
fqme: str, fqme: str,
# TODO: expect and attach from `Flume.to_msg()`s!
src_shm_token: dict, src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype], dst_shm_token: tuple[str, np.dtype],
@ -297,8 +430,8 @@ async def cascade(
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token) src: ShmArray = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token) dst: ShmArray = attach_shm_array(readonly=False, token=dst_shm_token)
reg = _load_builtins() reg = _load_builtins()
lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg]) lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg])
@ -320,7 +453,7 @@ async def cascade(
fsp: Fsp = reg.get( fsp: Fsp = reg.get(
NamespacePath(ns_path) NamespacePath(ns_path)
) )
func = fsp.func func: Callable = fsp.func
if not func: if not func:
# TODO: assume it's a func target path # TODO: assume it's a func target path
@ -341,17 +474,32 @@ async def cascade(
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
mkt = flume.mkt mkt = flume.mkt
# TODO: make an equivalent `Flume` around the Fsp output
# streams and chain them using a `Cascade` Bo
assert src.token == flume.rt_shm.token assert src.token == flume.rt_shm.token
profiler(f'{func}: feed up') profiler(f'{func}: feed up')
func_name = func.__name__ func_name: str = func.__name__
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as tn,
): ):
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
casc = Cascade(
src,
dst,
tn,
fsp,
)
# TODO: this seems like it should be wrapped somewhere?
fsp_target = partial( fsp_target = partial(
fsp_compute, connect_streams,
casc=casc,
mkt=mkt, mkt=mkt,
flume=flume, flume=flume,
quote_stream=flume.stream, quote_stream=flume.stream,
@ -360,156 +508,95 @@ async def cascade(
src=src, src=src,
dst=dst, dst=dst,
# target # chain function which takes src flume input(s)
# and renders dst flume output(s)
func=func func=func
) )
async with casc.open_edge(
bind_func=fsp_target,
) as index:
# casc.bind_func = fsp_target
# index = await tn.start(fsp_target)
tracker, index = await n.start(fsp_target) if zero_on_step:
last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
if zero_on_step: profiler(f'{func_name}: fsp up')
last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
profiler(f'{func_name}: fsp up') # sync client
await ctx.started(index)
# sync client # XXX: rt stream with client which we MUST
await ctx.started(index) # open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
casc.client_stream = client_stream
# XXX: rt stream with client which we MUST s, step, ld = casc.is_synced() #src, dst)
# open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
# TODO: these likely should all become # detect sample period step for subscription to increment
# methods of this ``TaskLifetime`` or wtv # signal
# abstraction.. times = src.array['time']
async def resync( if len(times) > 1:
tracker: TaskTracker, last_ts = times[-1]
delay_s = float(last_ts - times[times != last_ts][-1])
else:
# our default "HFT" sample rate.
delay_s = _default_delay_s
) -> tuple[TaskTracker, int]: # sub and increment the underlying shared memory buffer
# TODO: adopt an incremental update engine/approach # on every step msg received from the global `samplerd`
# where possible here eventually! # service.
log.info(f're-syncing fsp {func_name} to source') async with open_sample_stream(float(delay_s)) as istream:
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
# always trigger UI refresh after history update, profiler(f'{func_name}: sample stream up')
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and profiler.finish()
# ``piker.ui._display.trigger_update()``.
await client_stream.send({
'fsp_update': {
'key': dst_shm_token,
'first': dst._first.value,
'last': dst._last.value,
}
})
return tracker, index
def is_synced( async for i in istream:
src: ShmArray, # print(f'FSP incrementing {i}')
dst: ShmArray
) -> tuple[bool, int, int]:
'''
Predicate to dertmine if a destination FSP
output array is aligned to its source array.
''' # respawn the compute task if the source
step_diff = src.index - dst.index # array has been updated such that we compute
len_diff = abs(len(src.array) - len(dst.array)) # new history from the (prepended) source.
return not ( synced, step_diff, _ = casc.is_synced() #src, dst)
# the source is likely backfilling and we must if not synced:
# sync history calculations step_diff: int = await casc.poll_and_sync_to_step()
len_diff > 2
# we aren't step synced to the source and may be # skip adding a last bar since we should already
# leading/lagging by a step # be step alinged
or step_diff > 1 if step_diff == 0:
or step_diff < 0 continue
), step_diff, len_diff
async def poll_and_sync_to_step( # read out last shm row, copy and write new row
tracker: TaskTracker, array = dst.array
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]: # some metrics like vlm should be reset
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
synced, step_diff, _ = is_synced(src, dst) dst.push(last)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff # sync with source buffer's time step
src_l2 = src.array[-2:]
src_li, src_lt = src_l2[-1][['index', 'time']]
src_2li, src_2lt = src_l2[-2][['index', 'time']]
dst._array['time'][src_li] = src_lt
dst._array['time'][src_2li] = src_2lt
s, step, ld = is_synced(src, dst) # last2 = dst.array[-2:]
# if (
# detect sample period step for subscription to increment # last2[-1]['index'] != src_li
# signal # or last2[-2]['index'] != src_2li
times = src.array['time'] # ):
if len(times) > 1: # dstl2 = list(last2)
last_ts = times[-1] # srcl2 = list(src_l2)
delay_s = float(last_ts - times[times != last_ts][-1]) # print(
else: # # f'{dst.token}\n'
# our default "HFT" sample rate. # f'src: {srcl2}\n'
delay_s = _default_delay_s # f'dst: {dstl2}\n'
# )
# sub and increment the underlying shared memory buffer
# on every step msg received from the global `samplerd`
# service.
async with open_sample_stream(float(delay_s)) as istream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for i in istream:
# print(f'FSP incrementing {i}')
# respawn the compute task if the source
# array has been updated such that we compute
# new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst)
if not synced:
tracker, step_diff = await poll_and_sync_to_step(
tracker,
src,
dst,
)
# skip adding a last bar since we should already
# be step alinged
if step_diff == 0:
continue
# read out last shm row, copy and write new row
array = dst.array
# some metrics like vlm should be reset
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
dst.push(last)
# sync with source buffer's time step
src_l2 = src.array[-2:]
src_li, src_lt = src_l2[-1][['index', 'time']]
src_2li, src_2lt = src_l2[-2][['index', 'time']]
dst._array['time'][src_li] = src_lt
dst._array['time'][src_2li] = src_2lt
# last2 = dst.array[-2:]
# if (
# last2[-1]['index'] != src_li
# or last2[-2]['index'] != src_2li
# ):
# dstl2 = list(last2)
# srcl2 = list(src_l2)
# print(
# # f'{dst.token}\n'
# f'src: {srcl2}\n'
# f'dst: {dstl2}\n'
# )