Compare commits

..

No commits in common. "ae1773d6e5a89315375c4453765df67b19ba09ab" and "6100bd19c738976728181da25aad8db74ec7e0d9" have entirely different histories.

3 changed files with 48 additions and 113 deletions

View File

@ -1037,11 +1037,12 @@ async def allocate_persistent_feed(
flume = Flume( flume = Flume(
symbol=symbol, symbol=symbol,
first_quote=first_quote,
_rt_shm_token=rt_shm.token,
_hist_shm_token=hist_shm.token, _hist_shm_token=hist_shm.token,
_rt_shm_token=rt_shm.token,
first_quote=first_quote,
izero_hist=izero_hist, izero_hist=izero_hist,
izero_rt=izero_rt, izero_rt=izero_rt,
# throttle_rate=tick_throttle,
) )
# for ambiguous names we simply apply the retreived # for ambiguous names we simply apply the retreived

View File

@ -36,7 +36,6 @@ from PyQt5.QtCore import QLineF
from ..data._sharedmem import ( from ..data._sharedmem import (
ShmArray, ShmArray,
) )
from ..data.feed import Flume
from .._profile import ( from .._profile import (
pg_profile_enabled, pg_profile_enabled,
# ms_slower_then, # ms_slower_then,
@ -209,16 +208,13 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
name: str name: str
plot: pg.PlotItem plot: pg.PlotItem
_shm: ShmArray
flume: Flume
graphics: Curve | BarItems graphics: Curve | BarItems
_shm: ShmArray
# for tracking y-mn/mx for y-axis auto-ranging
yrange: tuple[float, float] = None yrange: tuple[float, float] = None
# in some cases a flow may want to change its # in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling, to # graphical "type" or, "form" when downsampling,
# start this is only ever an interpolation line. # normally this is just a plain line.
ds_graphics: Optional[Curve] = None ds_graphics: Optional[Curve] = None
is_ohlc: bool = False is_ohlc: bool = False
@ -253,9 +249,9 @@ class Flow(msgspec.Struct): # , frozen=True):
# TODO: remove this and only allow setting through # TODO: remove this and only allow setting through
# private ``._shm`` attr? # private ``._shm`` attr?
# @shm.setter @shm.setter
# def shm(self, shm: ShmArray) -> ShmArray: def shm(self, shm: ShmArray) -> ShmArray:
# self._shm = shm self._shm = shm
def maxmin( def maxmin(
self, self,
@ -322,15 +318,9 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
vr = self.plot.viewRect() vr = self.plot.viewRect()
return ( return int(vr.left()), int(vr.right())
vr.left(),
vr.right(),
)
def datums_range( def datums_range(self) -> tuple[
self,
index_field: str = 'index',
) -> tuple[
int, int, int, int, int, int int, int, int, int, int, int
]: ]:
''' '''
@ -338,8 +328,6 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
l, r = self.view_range() l, r = self.view_range()
l = round(l)
r = round(r)
# TODO: avoid this and have shm passed # TODO: avoid this and have shm passed
# in earlier. # in earlier.
@ -360,23 +348,15 @@ class Flow(msgspec.Struct): # , frozen=True):
def read( def read(
self, self,
array_field: Optional[str] = None, array_field: Optional[str] = None,
index_field: str = 'index',
) -> tuple[ ) -> tuple[
int, int, np.ndarray, int, int, np.ndarray,
int, int, np.ndarray, int, int, np.ndarray,
]: ]:
''' # read call
Read the underlying shm array buffer and
return the data plus indexes for the first
and last
which has been written to.
'''
# readable data
array = self.shm.array array = self.shm.array
indexes = array[index_field] indexes = array['index']
ifirst = indexes[0] ifirst = indexes[0]
ilast = indexes[-1] ilast = indexes[-1]

View File

@ -42,8 +42,6 @@ from ..data._sharedmem import (
_Token, _Token,
try_read, try_read,
) )
from ..data.feed import Flume
from ..data._source import Symbol
from ._chart import ( from ._chart import (
ChartPlotWidget, ChartPlotWidget,
LinkedSplits, LinkedSplits,
@ -113,7 +111,7 @@ def update_fsp_chart(
# read from last calculated value and update any label # read from last calculated value and update any label
last_val_sticky = chart.plotItem.getAxis( last_val_sticky = chart.plotItem.getAxis(
'right')._stickies.get(graphics_name) 'right')._stickies.get(chart.name)
if last_val_sticky: if last_val_sticky:
last = last_row[array_key] last = last_row[array_key]
last_val_sticky.update_from_data(-1, last) last_val_sticky.update_from_data(-1, last)
@ -214,7 +212,7 @@ async def open_fsp_actor_cluster(
async def run_fsp_ui( async def run_fsp_ui(
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
flume: Flume, shm: ShmArray,
started: trio.Event, started: trio.Event,
target: Fsp, target: Fsp,
conf: dict[str, dict], conf: dict[str, dict],
@ -251,11 +249,9 @@ async def run_fsp_ui(
else: else:
chart = linkedsplits.subplots[overlay_with] chart = linkedsplits.subplots[overlay_with]
shm = flume.rt_shm
chart.draw_curve( chart.draw_curve(
name, name=name,
shm, shm=shm,
flume,
overlay=True, overlay=True,
color='default_light', color='default_light',
array_key=name, array_key=name,
@ -265,9 +261,8 @@ async def run_fsp_ui(
else: else:
# create a new sub-chart widget for this fsp # create a new sub-chart widget for this fsp
chart = linkedsplits.add_plot( chart = linkedsplits.add_plot(
name, name=name,
shm, shm=shm,
flume,
array_key=name, array_key=name,
sidepane=sidepane, sidepane=sidepane,
@ -357,9 +352,6 @@ async def run_fsp_ui(
# last = time.time() # last = time.time()
# TODO: maybe this should be our ``Flow`` type since it maps
# one flume to the next? The machinery for task/actor mgmt should
# be part of the instantiation API?
class FspAdmin: class FspAdmin:
''' '''
Client API for orchestrating FSP actors and displaying Client API for orchestrating FSP actors and displaying
@ -371,7 +363,7 @@ class FspAdmin:
tn: trio.Nursery, tn: trio.Nursery,
cluster: dict[str, tractor.Portal], cluster: dict[str, tractor.Portal],
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, src_shm: ShmArray,
) -> None: ) -> None:
self.tn = tn self.tn = tn
@ -383,11 +375,7 @@ class FspAdmin:
tuple[tractor.MsgStream, ShmArray] tuple[tractor.MsgStream, ShmArray]
] = {} ] = {}
self._flow_registry: dict[_Token, str] = {} self._flow_registry: dict[_Token, str] = {}
self.src_shm = src_shm
# TODO: make this a `.src_flume` and add
# a `dst_flume`?
# (=> but then wouldn't this be the most basic `Flow`?)
self.flume = flume
def rr_next_portal(self) -> tractor.Portal: def rr_next_portal(self) -> tractor.Portal:
name, portal = next(self._rr_next_actor) name, portal = next(self._rr_next_actor)
@ -400,7 +388,7 @@ class FspAdmin:
complete: trio.Event, complete: trio.Event,
started: trio.Event, started: trio.Event,
fqsn: str, fqsn: str,
dst_fsp_flume: Flume, dst_shm: ShmArray,
conf: dict, conf: dict,
target: Fsp, target: Fsp,
loglevel: str, loglevel: str,
@ -421,10 +409,9 @@ class FspAdmin:
# data feed key # data feed key
fqsn=fqsn, fqsn=fqsn,
# TODO: pass `Flume.to_msg()`s here?
# mems # mems
src_shm_token=self.flume.rt_shm.token, src_shm_token=self.src_shm.token,
dst_shm_token=dst_fsp_flume.rt_shm.token, dst_shm_token=dst_shm.token,
# target # target
ns_path=ns_path, ns_path=ns_path,
@ -441,14 +428,12 @@ class FspAdmin:
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
dst_fsp_flume.stream: tractor.MsgStream = stream
# register output data # register output data
self._registry[ self._registry[
(fqsn, ns_path) (fqsn, ns_path)
] = ( ] = (
stream, stream,
dst_fsp_flume.rt_shm, dst_shm,
complete complete
) )
@ -483,9 +468,9 @@ class FspAdmin:
worker_name: Optional[str] = None, worker_name: Optional[str] = None,
loglevel: str = 'info', loglevel: str = 'info',
) -> (Flume, trio.Event): ) -> (ShmArray, trio.Event):
fqsn = self.flume.symbol.fqsn fqsn = self.linked.symbol.front_fqsn()
# allocate an output shm array # allocate an output shm array
key, dst_shm, opened = maybe_mk_fsp_shm( key, dst_shm, opened = maybe_mk_fsp_shm(
@ -493,28 +478,8 @@ class FspAdmin:
target=target, target=target,
readonly=True, readonly=True,
) )
portal = self.cluster.get(worker_name) or self.rr_next_portal()
provider_tag = portal.channel.uid
symbol = Symbol(
key=key,
broker_info={
provider_tag: {'asset_type': 'fsp'},
},
)
dst_fsp_flume = Flume(
symbol=symbol,
_rt_shm_token=dst_shm.token,
first_quote={},
# set to 0 presuming for now that we can't load
# FSP history (though we should eventually).
izero_hist=0,
izero_rt=0,
)
self._flow_registry[( self._flow_registry[(
self.flume.rt_shm._token, self.src_shm._token,
target.name target.name
)] = dst_shm._token )] = dst_shm._token
@ -523,6 +488,7 @@ class FspAdmin:
# f'Already started FSP `{fqsn}:{func_name}`' # f'Already started FSP `{fqsn}:{func_name}`'
# ) # )
portal = self.cluster.get(worker_name) or self.rr_next_portal()
complete = trio.Event() complete = trio.Event()
started = trio.Event() started = trio.Event()
self.tn.start_soon( self.tn.start_soon(
@ -531,13 +497,13 @@ class FspAdmin:
complete, complete,
started, started,
fqsn, fqsn,
dst_fsp_flume, dst_shm,
conf, conf,
target, target,
loglevel, loglevel,
) )
return dst_fsp_flume, started return dst_shm, started
async def open_fsp_chart( async def open_fsp_chart(
self, self,
@ -549,7 +515,7 @@ class FspAdmin:
) -> (trio.Event, ChartPlotWidget): ) -> (trio.Event, ChartPlotWidget):
flume, started = await self.start_engine_task( shm, started = await self.start_engine_task(
target, target,
conf, conf,
loglevel, loglevel,
@ -561,7 +527,7 @@ class FspAdmin:
run_fsp_ui, run_fsp_ui,
self.linked, self.linked,
flume, shm,
started, started,
target, target,
@ -575,7 +541,7 @@ class FspAdmin:
@acm @acm
async def open_fsp_admin( async def open_fsp_admin(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, src_shm: ShmArray,
**kwargs, **kwargs,
) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: ) -> AsyncGenerator[dict, dict[str, tractor.Portal]]:
@ -596,7 +562,7 @@ async def open_fsp_admin(
tn, tn,
cluster_map, cluster_map,
linked, linked,
flume, src_shm,
) )
try: try:
yield admin yield admin
@ -610,7 +576,7 @@ async def open_fsp_admin(
async def open_vlm_displays( async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, ohlcv: ShmArray,
dvlm: bool = True, dvlm: bool = True,
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
@ -632,8 +598,6 @@ async def open_vlm_displays(
sig = inspect.signature(flow_rates.func) sig = inspect.signature(flow_rates.func)
params = sig.parameters params = sig.parameters
ohlcv: ShmArray = flume.rt_shm
async with ( async with (
open_fsp_sidepane( open_fsp_sidepane(
linked, { linked, {
@ -653,7 +617,7 @@ async def open_vlm_displays(
} }
}, },
) as sidepane, ) as sidepane,
open_fsp_admin(linked, flume) as admin, open_fsp_admin(linked, ohlcv) as admin,
): ):
# TODO: support updates # TODO: support updates
# period_field = sidepane.fields['period'] # period_field = sidepane.fields['period']
@ -669,7 +633,6 @@ async def open_vlm_displays(
chart = linked.add_plot( chart = linked.add_plot(
name='volume', name='volume',
shm=shm, shm=shm,
flume=flume,
array_key='volume', array_key='volume',
sidepane=sidepane, sidepane=sidepane,
@ -682,8 +645,6 @@ async def open_vlm_displays(
# the curve item internals are pretty convoluted. # the curve item internals are pretty convoluted.
style='step', style='step',
) )
# back-link the volume chart to trigger y-autoranging
# in the ohlc (parent) chart.
ohlc_chart.view.enable_auto_yrange( ohlc_chart.view.enable_auto_yrange(
src_vb=chart.view, src_vb=chart.view,
) )
@ -749,7 +710,7 @@ async def open_vlm_displays(
tasks_ready = [] tasks_ready = []
# spawn and overlay $ vlm on the same subchart # spawn and overlay $ vlm on the same subchart
dvlm_flume, started = await admin.start_engine_task( dvlm_shm, started = await admin.start_engine_task(
dolla_vlm, dolla_vlm,
{ # fsp engine conf { # fsp engine conf
@ -846,13 +807,9 @@ async def open_vlm_displays(
else: else:
color = 'bracket' color = 'bracket'
assert isinstance(shm, ShmArray) curve, _ = chart.draw_curve(
assert isinstance(flume, Flume) name=name,
shm=shm,
flow = chart.draw_curve(
name,
shm,
flume,
array_key=name, array_key=name,
overlay=pi, overlay=pi,
color=color, color=color,
@ -865,20 +822,20 @@ async def open_vlm_displays(
# specially store ref to shm for lookup in display loop # specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in # since only a placeholder of `None` is entered in
# ``.draw_curve()``. # ``.draw_curve()``.
# flow = chart._flows[name] flow = chart._flows[name]
assert flow.plot is pi assert flow.plot is pi
chart_curves( chart_curves(
fields, fields,
dvlm_pi, dvlm_pi,
dvlm_flume.rt_shm, dvlm_shm,
step_mode=True, step_mode=True,
) )
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since this one depends on it. # up since this one depends on it.
fr_flume, started = await admin.start_engine_task( fr_shm, started = await admin.start_engine_task(
flow_rates, flow_rates,
{ # fsp engine conf { # fsp engine conf
'func_name': 'flow_rates', 'func_name': 'flow_rates',
@ -891,7 +848,7 @@ async def open_vlm_displays(
# chart_curves( # chart_curves(
# dvlm_rate_fields, # dvlm_rate_fields,
# dvlm_pi, # dvlm_pi,
# fr_flume.rt_shm, # fr_shm,
# ) # )
# TODO: is there a way to "sync" the dual axes such that only # TODO: is there a way to "sync" the dual axes such that only
@ -939,7 +896,7 @@ async def open_vlm_displays(
chart_curves( chart_curves(
trade_rate_fields, trade_rate_fields,
tr_pi, tr_pi,
fr_flume.rt_shm, fr_shm,
# step_mode=True, # step_mode=True,
# dashed line to represent "individual trades" being # dashed line to represent "individual trades" being
@ -973,7 +930,7 @@ async def open_vlm_displays(
async def start_fsp_displays( async def start_fsp_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, ohlcv: ShmArray,
group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
@ -1016,10 +973,7 @@ async def start_fsp_displays(
async with ( async with (
# NOTE: this admin internally opens an actor cluster # NOTE: this admin internally opens an actor cluster
open_fsp_admin( open_fsp_admin(linked, ohlcv) as admin,
linked,
flume,
) as admin,
): ):
statuses = [] statuses = []
for target, conf in fsp_conf.items(): for target, conf in fsp_conf.items():