Merge pull request #449 from pikers/multi_symbol_input

Multi symbol input (support)
kraken_deposits_fixes
goodboy 2023-02-09 16:20:34 -05:00 committed by GitHub
commit a7d02ecec8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1083 additions and 759 deletions

View File

@ -1037,12 +1037,11 @@ async def allocate_persistent_feed(
flume = Flume( flume = Flume(
symbol=symbol, symbol=symbol,
_hist_shm_token=hist_shm.token,
_rt_shm_token=rt_shm.token,
first_quote=first_quote, first_quote=first_quote,
_rt_shm_token=rt_shm.token,
_hist_shm_token=hist_shm.token,
izero_hist=izero_hist, izero_hist=izero_hist,
izero_rt=izero_rt, izero_rt=izero_rt,
# throttle_rate=tick_throttle,
) )
# for ambiguous names we simply apply the retreived # for ambiguous names we simply apply the retreived

View File

@ -38,7 +38,6 @@ from PyQt5.QtWidgets import (
QVBoxLayout, QVBoxLayout,
QSplitter, QSplitter,
) )
import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
import trio import trio
@ -63,7 +62,10 @@ from ._style import (
_xaxis_at, _xaxis_at,
_min_points_to_show, _min_points_to_show,
) )
from ..data.feed import Feed from ..data.feed import (
Feed,
Flume,
)
from ..data._source import Symbol from ..data._source import Symbol
from ..log import get_logger from ..log import get_logger
from ._interaction import ChartView from ._interaction import ChartView
@ -537,6 +539,7 @@ class LinkedSplits(QWidget):
symbol: Symbol, symbol: Symbol,
shm: ShmArray, shm: ShmArray,
flume: Flume,
sidepane: FieldsForm, sidepane: FieldsForm,
style: str = 'ohlc_bar', style: str = 'ohlc_bar',
@ -561,6 +564,7 @@ class LinkedSplits(QWidget):
self.chart = self.add_plot( self.chart = self.add_plot(
name=symbol.fqsn, name=symbol.fqsn,
shm=shm, shm=shm,
flume=flume,
style=style, style=style,
_is_main=True, _is_main=True,
sidepane=sidepane, sidepane=sidepane,
@ -581,6 +585,7 @@ class LinkedSplits(QWidget):
name: str, name: str,
shm: ShmArray, shm: ShmArray,
flume: Flume,
array_key: Optional[str] = None, array_key: Optional[str] = None,
style: str = 'line', style: str = 'line',
@ -704,9 +709,11 @@ class LinkedSplits(QWidget):
# draw curve graphics # draw curve graphics
if style == 'ohlc_bar': if style == 'ohlc_bar':
graphics, data_key = cpw.draw_ohlc( # graphics, data_key = cpw.draw_ohlc(
flow = cpw.draw_ohlc(
name, name,
shm, shm,
flume=flume,
array_key=array_key array_key=array_key
) )
self.cursor.contents_labels.add_label( self.cursor.contents_labels.add_label(
@ -718,18 +725,22 @@ class LinkedSplits(QWidget):
elif style == 'line': elif style == 'line':
add_label = True add_label = True
graphics, data_key = cpw.draw_curve( # graphics, data_key = cpw.draw_curve(
flow = cpw.draw_curve(
name, name,
shm, shm,
flume,
array_key=array_key, array_key=array_key,
color='default_light', color='default_light',
) )
elif style == 'step': elif style == 'step':
add_label = True add_label = True
graphics, data_key = cpw.draw_curve( # graphics, data_key = cpw.draw_curve(
flow = cpw.draw_curve(
name, name,
shm, shm,
flume,
array_key=array_key, array_key=array_key,
step_mode=True, step_mode=True,
color='davies', color='davies',
@ -739,6 +750,9 @@ class LinkedSplits(QWidget):
else: else:
raise ValueError(f"Chart style {style} is currently unsupported") raise ValueError(f"Chart style {style} is currently unsupported")
graphics = flow.graphics
data_key = flow.name
if _is_main: if _is_main:
assert style == 'ohlc_bar', 'main chart must be OHLC' assert style == 'ohlc_bar', 'main chart must be OHLC'
else: else:
@ -895,7 +909,7 @@ class ChartPlotWidget(pg.PlotWidget):
# registry of overlay curve names # registry of overlay curve names
self._flows: dict[str, Flow] = {} self._flows: dict[str, Flow] = {}
self._feeds: dict[Symbol, Feed] = {} self.feed: Feed | None = None
self._labels = {} # registry of underlying graphics self._labels = {} # registry of underlying graphics
self._ysticks = {} # registry of underlying graphics self._ysticks = {} # registry of underlying graphics
@ -916,20 +930,18 @@ class ChartPlotWidget(pg.PlotWidget):
self._on_screen: bool = False self._on_screen: bool = False
def resume_all_feeds(self): def resume_all_feeds(self):
... feed = self.feed
# try: if feed:
# for feed in self._feeds.values(): try:
# for flume in feed.flumes.values(): self.linked.godwidget._root_n.start_soon(feed.resume)
# self.linked.godwidget._root_n.start_soon(flume.resume) except RuntimeError:
# except RuntimeError: # TODO: cancel the qtractor runtime here?
# # TODO: cancel the qtractor runtime here? raise
# raise
def pause_all_feeds(self): def pause_all_feeds(self):
... feed = self.feed
# for feed in self._feeds.values(): if feed:
# for flume in feed.flumes.values(): self.linked.godwidget._root_n.start_soon(feed.pause)
# self.linked.godwidget._root_n.start_soon(flume.pause)
@property @property
def view(self) -> ChartView: def view(self) -> ChartView:
@ -938,12 +950,6 @@ class ChartPlotWidget(pg.PlotWidget):
def focus(self) -> None: def focus(self) -> None:
self.view.setFocus() self.view.setFocus()
def last_bar_in_view(self) -> int:
self._arrays[self.name][-1]['index']
def is_valid_index(self, index: int) -> bool:
return index >= 0 and index < self._arrays[self.name][-1]['index']
def _set_xlimits( def _set_xlimits(
self, self,
xfirst: int, xfirst: int,
@ -1036,9 +1042,14 @@ class ChartPlotWidget(pg.PlotWidget):
log.warning(f'`Flow` for {self.name} not loaded yet?') log.warning(f'`Flow` for {self.name} not loaded yet?')
return return
index = flow.shm.array['index'] arr = flow.shm.array
index = arr['index']
# times = arr['time']
# these will be epoch time floats
xfirst, xlast = index[0], index[-1] xfirst, xlast = index[0], index[-1]
l, lbar, rbar, r = self.bars_range() l, lbar, rbar, r = self.bars_range()
view = self.view view = self.view
if ( if (
@ -1195,6 +1206,7 @@ class ChartPlotWidget(pg.PlotWidget):
name: str, name: str,
shm: ShmArray, shm: ShmArray,
flume: Flume,
array_key: Optional[str] = None, array_key: Optional[str] = None,
overlay: bool = False, overlay: bool = False,
@ -1207,10 +1219,7 @@ class ChartPlotWidget(pg.PlotWidget):
**graphics_kwargs, **graphics_kwargs,
) -> tuple[ ) -> Flow:
pg.GraphicsObject,
str,
]:
''' '''
Draw a "curve" (line plot graphics) for the provided data in Draw a "curve" (line plot graphics) for the provided data in
the input shm array ``shm``. the input shm array ``shm``.
@ -1225,7 +1234,6 @@ class ChartPlotWidget(pg.PlotWidget):
graphics = BarItems( graphics = BarItems(
linked=self.linked, linked=self.linked,
plotitem=pi, plotitem=pi,
# pen_color=self.pen_color,
color=color, color=color,
name=name, name=name,
**graphics_kwargs, **graphics_kwargs,
@ -1245,14 +1253,17 @@ class ChartPlotWidget(pg.PlotWidget):
**graphics_kwargs, **graphics_kwargs,
) )
self._flows[data_key] = Flow( flow = self._flows[data_key] = Flow(
name=name, data_key,
plot=pi, pi,
_shm=shm, shm,
flume,
is_ohlc=is_ohlc, is_ohlc=is_ohlc,
# register curve graphics with this flow # register curve graphics with this flow
graphics=graphics, graphics=graphics,
) )
assert isinstance(flow.shm, ShmArray)
# TODO: this probably needs its own method? # TODO: this probably needs its own method?
if overlay: if overlay:
@ -1309,24 +1320,26 @@ class ChartPlotWidget(pg.PlotWidget):
# understand. # understand.
pi.addItem(graphics) pi.addItem(graphics)
return graphics, data_key return flow
def draw_ohlc( def draw_ohlc(
self, self,
name: str, name: str,
shm: ShmArray, shm: ShmArray,
flume: Flume,
array_key: Optional[str] = None, array_key: Optional[str] = None,
**draw_curve_kwargs, **draw_curve_kwargs,
) -> (pg.GraphicsObject, str): ) -> Flow:
''' '''
Draw OHLC datums to chart. Draw OHLC datums to chart.
''' '''
return self.draw_curve( return self.draw_curve(
name=name, name,
shm=shm, shm,
flume,
array_key=array_key, array_key=array_key,
is_ohlc=True, is_ohlc=True,
**draw_curve_kwargs, **draw_curve_kwargs,
@ -1391,37 +1404,6 @@ class ChartPlotWidget(pg.PlotWidget):
self.sig_mouse_leave.emit(self) self.sig_mouse_leave.emit(self)
self.scene().leaveEvent(ev) self.scene().leaveEvent(ev)
def get_index(self, time: float) -> int:
# TODO: this should go onto some sort of
# data-view thinger..right?
ohlc = self._flows[self.name].shm.array
# XXX: not sure why the time is so off here
# looks like we're gonna have to do some fixing..
indexes = ohlc['time'] >= time
if any(indexes):
return ohlc['index'][indexes][-1]
else:
return ohlc['index'][-1]
def in_view(
self,
array: np.ndarray,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
l, lbar, rbar, r = self.bars_range()
ifirst = array[0]['index']
# slice data by offset from the first index
# available in the passed datum set.
return array[lbar - ifirst:(rbar - ifirst) + 1]
def maxmin( def maxmin(
self, self,
name: Optional[str] = None, name: Optional[str] = None,

File diff suppressed because it is too large Load Diff

View File

@ -36,6 +36,7 @@ from PyQt5.QtCore import QLineF
from ..data._sharedmem import ( from ..data._sharedmem import (
ShmArray, ShmArray,
) )
from ..data.feed import Flume
from .._profile import ( from .._profile import (
pg_profile_enabled, pg_profile_enabled,
# ms_slower_then, # ms_slower_then,
@ -208,13 +209,16 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
name: str name: str
plot: pg.PlotItem plot: pg.PlotItem
graphics: Curve | BarItems
_shm: ShmArray _shm: ShmArray
flume: Flume
graphics: Curve | BarItems
# for tracking y-mn/mx for y-axis auto-ranging
yrange: tuple[float, float] = None yrange: tuple[float, float] = None
# in some cases a flow may want to change its # in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling, # graphical "type" or, "form" when downsampling, to
# normally this is just a plain line. # start this is only ever an interpolation line.
ds_graphics: Optional[Curve] = None ds_graphics: Optional[Curve] = None
is_ohlc: bool = False is_ohlc: bool = False
@ -249,9 +253,9 @@ class Flow(msgspec.Struct): # , frozen=True):
# TODO: remove this and only allow setting through # TODO: remove this and only allow setting through
# private ``._shm`` attr? # private ``._shm`` attr?
@shm.setter # @shm.setter
def shm(self, shm: ShmArray) -> ShmArray: # def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm # self._shm = shm
def maxmin( def maxmin(
self, self,
@ -318,9 +322,15 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
vr = self.plot.viewRect() vr = self.plot.viewRect()
return int(vr.left()), int(vr.right()) return (
vr.left(),
vr.right(),
)
def datums_range(self) -> tuple[ def datums_range(
self,
index_field: str = 'index',
) -> tuple[
int, int, int, int, int, int int, int, int, int, int, int
]: ]:
''' '''
@ -328,6 +338,8 @@ class Flow(msgspec.Struct): # , frozen=True):
''' '''
l, r = self.view_range() l, r = self.view_range()
l = round(l)
r = round(r)
# TODO: avoid this and have shm passed # TODO: avoid this and have shm passed
# in earlier. # in earlier.
@ -348,15 +360,23 @@ class Flow(msgspec.Struct): # , frozen=True):
def read( def read(
self, self,
array_field: Optional[str] = None, array_field: Optional[str] = None,
index_field: str = 'index',
) -> tuple[ ) -> tuple[
int, int, np.ndarray, int, int, np.ndarray,
int, int, np.ndarray, int, int, np.ndarray,
]: ]:
# read call '''
Read the underlying shm array buffer and
return the data plus indexes for the first
and last
which has been written to.
'''
# readable data
array = self.shm.array array = self.shm.array
indexes = array['index'] indexes = array[index_field]
ifirst = indexes[0] ifirst = indexes[0]
ilast = indexes[-1] ilast = indexes[-1]

View File

@ -42,6 +42,8 @@ from ..data._sharedmem import (
_Token, _Token,
try_read, try_read,
) )
from ..data.feed import Flume
from ..data._source import Symbol
from ._chart import ( from ._chart import (
ChartPlotWidget, ChartPlotWidget,
LinkedSplits, LinkedSplits,
@ -111,7 +113,7 @@ def update_fsp_chart(
# read from last calculated value and update any label # read from last calculated value and update any label
last_val_sticky = chart.plotItem.getAxis( last_val_sticky = chart.plotItem.getAxis(
'right')._stickies.get(chart.name) 'right')._stickies.get(graphics_name)
if last_val_sticky: if last_val_sticky:
last = last_row[array_key] last = last_row[array_key]
last_val_sticky.update_from_data(-1, last) last_val_sticky.update_from_data(-1, last)
@ -212,7 +214,7 @@ async def open_fsp_actor_cluster(
async def run_fsp_ui( async def run_fsp_ui(
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
shm: ShmArray, flume: Flume,
started: trio.Event, started: trio.Event,
target: Fsp, target: Fsp,
conf: dict[str, dict], conf: dict[str, dict],
@ -249,9 +251,11 @@ async def run_fsp_ui(
else: else:
chart = linkedsplits.subplots[overlay_with] chart = linkedsplits.subplots[overlay_with]
shm = flume.rt_shm
chart.draw_curve( chart.draw_curve(
name=name, name,
shm=shm, shm,
flume,
overlay=True, overlay=True,
color='default_light', color='default_light',
array_key=name, array_key=name,
@ -261,8 +265,9 @@ async def run_fsp_ui(
else: else:
# create a new sub-chart widget for this fsp # create a new sub-chart widget for this fsp
chart = linkedsplits.add_plot( chart = linkedsplits.add_plot(
name=name, name,
shm=shm, shm,
flume,
array_key=name, array_key=name,
sidepane=sidepane, sidepane=sidepane,
@ -352,6 +357,9 @@ async def run_fsp_ui(
# last = time.time() # last = time.time()
# TODO: maybe this should be our ``Flow`` type since it maps
# one flume to the next? The machinery for task/actor mgmt should
# be part of the instantiation API?
class FspAdmin: class FspAdmin:
''' '''
Client API for orchestrating FSP actors and displaying Client API for orchestrating FSP actors and displaying
@ -363,7 +371,7 @@ class FspAdmin:
tn: trio.Nursery, tn: trio.Nursery,
cluster: dict[str, tractor.Portal], cluster: dict[str, tractor.Portal],
linked: LinkedSplits, linked: LinkedSplits,
src_shm: ShmArray, flume: Flume,
) -> None: ) -> None:
self.tn = tn self.tn = tn
@ -375,7 +383,11 @@ class FspAdmin:
tuple[tractor.MsgStream, ShmArray] tuple[tractor.MsgStream, ShmArray]
] = {} ] = {}
self._flow_registry: dict[_Token, str] = {} self._flow_registry: dict[_Token, str] = {}
self.src_shm = src_shm
# TODO: make this a `.src_flume` and add
# a `dst_flume`?
# (=> but then wouldn't this be the most basic `Flow`?)
self.flume = flume
def rr_next_portal(self) -> tractor.Portal: def rr_next_portal(self) -> tractor.Portal:
name, portal = next(self._rr_next_actor) name, portal = next(self._rr_next_actor)
@ -388,7 +400,7 @@ class FspAdmin:
complete: trio.Event, complete: trio.Event,
started: trio.Event, started: trio.Event,
fqsn: str, fqsn: str,
dst_shm: ShmArray, dst_fsp_flume: Flume,
conf: dict, conf: dict,
target: Fsp, target: Fsp,
loglevel: str, loglevel: str,
@ -409,9 +421,10 @@ class FspAdmin:
# data feed key # data feed key
fqsn=fqsn, fqsn=fqsn,
# TODO: pass `Flume.to_msg()`s here?
# mems # mems
src_shm_token=self.src_shm.token, src_shm_token=self.flume.rt_shm.token,
dst_shm_token=dst_shm.token, dst_shm_token=dst_fsp_flume.rt_shm.token,
# target # target
ns_path=ns_path, ns_path=ns_path,
@ -428,12 +441,14 @@ class FspAdmin:
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
dst_fsp_flume.stream: tractor.MsgStream = stream
# register output data # register output data
self._registry[ self._registry[
(fqsn, ns_path) (fqsn, ns_path)
] = ( ] = (
stream, stream,
dst_shm, dst_fsp_flume.rt_shm,
complete complete
) )
@ -468,9 +483,9 @@ class FspAdmin:
worker_name: Optional[str] = None, worker_name: Optional[str] = None,
loglevel: str = 'info', loglevel: str = 'info',
) -> (ShmArray, trio.Event): ) -> (Flume, trio.Event):
fqsn = self.linked.symbol.front_fqsn() fqsn = self.flume.symbol.fqsn
# allocate an output shm array # allocate an output shm array
key, dst_shm, opened = maybe_mk_fsp_shm( key, dst_shm, opened = maybe_mk_fsp_shm(
@ -478,8 +493,28 @@ class FspAdmin:
target=target, target=target,
readonly=True, readonly=True,
) )
portal = self.cluster.get(worker_name) or self.rr_next_portal()
provider_tag = portal.channel.uid
symbol = Symbol(
key=key,
broker_info={
provider_tag: {'asset_type': 'fsp'},
},
)
dst_fsp_flume = Flume(
symbol=symbol,
_rt_shm_token=dst_shm.token,
first_quote={},
# set to 0 presuming for now that we can't load
# FSP history (though we should eventually).
izero_hist=0,
izero_rt=0,
)
self._flow_registry[( self._flow_registry[(
self.src_shm._token, self.flume.rt_shm._token,
target.name target.name
)] = dst_shm._token )] = dst_shm._token
@ -488,7 +523,6 @@ class FspAdmin:
# f'Already started FSP `{fqsn}:{func_name}`' # f'Already started FSP `{fqsn}:{func_name}`'
# ) # )
portal = self.cluster.get(worker_name) or self.rr_next_portal()
complete = trio.Event() complete = trio.Event()
started = trio.Event() started = trio.Event()
self.tn.start_soon( self.tn.start_soon(
@ -497,13 +531,13 @@ class FspAdmin:
complete, complete,
started, started,
fqsn, fqsn,
dst_shm, dst_fsp_flume,
conf, conf,
target, target,
loglevel, loglevel,
) )
return dst_shm, started return dst_fsp_flume, started
async def open_fsp_chart( async def open_fsp_chart(
self, self,
@ -515,7 +549,7 @@ class FspAdmin:
) -> (trio.Event, ChartPlotWidget): ) -> (trio.Event, ChartPlotWidget):
shm, started = await self.start_engine_task( flume, started = await self.start_engine_task(
target, target,
conf, conf,
loglevel, loglevel,
@ -527,7 +561,7 @@ class FspAdmin:
run_fsp_ui, run_fsp_ui,
self.linked, self.linked,
shm, flume,
started, started,
target, target,
@ -541,7 +575,7 @@ class FspAdmin:
@acm @acm
async def open_fsp_admin( async def open_fsp_admin(
linked: LinkedSplits, linked: LinkedSplits,
src_shm: ShmArray, flume: Flume,
**kwargs, **kwargs,
) -> AsyncGenerator[dict, dict[str, tractor.Portal]]: ) -> AsyncGenerator[dict, dict[str, tractor.Portal]]:
@ -562,7 +596,7 @@ async def open_fsp_admin(
tn, tn,
cluster_map, cluster_map,
linked, linked,
src_shm, flume,
) )
try: try:
yield admin yield admin
@ -576,7 +610,7 @@ async def open_fsp_admin(
async def open_vlm_displays( async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
ohlcv: ShmArray, flume: Flume,
dvlm: bool = True, dvlm: bool = True,
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
@ -598,6 +632,8 @@ async def open_vlm_displays(
sig = inspect.signature(flow_rates.func) sig = inspect.signature(flow_rates.func)
params = sig.parameters params = sig.parameters
ohlcv: ShmArray = flume.rt_shm
async with ( async with (
open_fsp_sidepane( open_fsp_sidepane(
linked, { linked, {
@ -617,7 +653,7 @@ async def open_vlm_displays(
} }
}, },
) as sidepane, ) as sidepane,
open_fsp_admin(linked, ohlcv) as admin, open_fsp_admin(linked, flume) as admin,
): ):
# TODO: support updates # TODO: support updates
# period_field = sidepane.fields['period'] # period_field = sidepane.fields['period']
@ -633,6 +669,7 @@ async def open_vlm_displays(
chart = linked.add_plot( chart = linked.add_plot(
name='volume', name='volume',
shm=shm, shm=shm,
flume=flume,
array_key='volume', array_key='volume',
sidepane=sidepane, sidepane=sidepane,
@ -645,6 +682,8 @@ async def open_vlm_displays(
# the curve item internals are pretty convoluted. # the curve item internals are pretty convoluted.
style='step', style='step',
) )
# back-link the volume chart to trigger y-autoranging
# in the ohlc (parent) chart.
ohlc_chart.view.enable_auto_yrange( ohlc_chart.view.enable_auto_yrange(
src_vb=chart.view, src_vb=chart.view,
) )
@ -710,7 +749,7 @@ async def open_vlm_displays(
tasks_ready = [] tasks_ready = []
# spawn and overlay $ vlm on the same subchart # spawn and overlay $ vlm on the same subchart
dvlm_shm, started = await admin.start_engine_task( dvlm_flume, started = await admin.start_engine_task(
dolla_vlm, dolla_vlm,
{ # fsp engine conf { # fsp engine conf
@ -807,9 +846,13 @@ async def open_vlm_displays(
else: else:
color = 'bracket' color = 'bracket'
curve, _ = chart.draw_curve( assert isinstance(shm, ShmArray)
name=name, assert isinstance(flume, Flume)
shm=shm,
flow = chart.draw_curve(
name,
shm,
flume,
array_key=name, array_key=name,
overlay=pi, overlay=pi,
color=color, color=color,
@ -822,20 +865,20 @@ async def open_vlm_displays(
# specially store ref to shm for lookup in display loop # specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in # since only a placeholder of `None` is entered in
# ``.draw_curve()``. # ``.draw_curve()``.
flow = chart._flows[name] # flow = chart._flows[name]
assert flow.plot is pi assert flow.plot is pi
chart_curves( chart_curves(
fields, fields,
dvlm_pi, dvlm_pi,
dvlm_shm, dvlm_flume.rt_shm,
step_mode=True, step_mode=True,
) )
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since this one depends on it. # up since this one depends on it.
fr_shm, started = await admin.start_engine_task( fr_flume, started = await admin.start_engine_task(
flow_rates, flow_rates,
{ # fsp engine conf { # fsp engine conf
'func_name': 'flow_rates', 'func_name': 'flow_rates',
@ -848,7 +891,7 @@ async def open_vlm_displays(
# chart_curves( # chart_curves(
# dvlm_rate_fields, # dvlm_rate_fields,
# dvlm_pi, # dvlm_pi,
# fr_shm, # fr_flume.rt_shm,
# ) # )
# TODO: is there a way to "sync" the dual axes such that only # TODO: is there a way to "sync" the dual axes such that only
@ -896,7 +939,7 @@ async def open_vlm_displays(
chart_curves( chart_curves(
trade_rate_fields, trade_rate_fields,
tr_pi, tr_pi,
fr_shm, fr_flume.rt_shm,
# step_mode=True, # step_mode=True,
# dashed line to represent "individual trades" being # dashed line to represent "individual trades" being
@ -930,7 +973,7 @@ async def open_vlm_displays(
async def start_fsp_displays( async def start_fsp_displays(
linked: LinkedSplits, linked: LinkedSplits,
ohlcv: ShmArray, flume: Flume,
group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
@ -973,7 +1016,10 @@ async def start_fsp_displays(
async with ( async with (
# NOTE: this admin internally opens an actor cluster # NOTE: this admin internally opens an actor cluster
open_fsp_admin(linked, ohlcv) as admin, open_fsp_admin(
linked,
flume,
) as admin,
): ):
statuses = [] statuses = []
for target, conf in fsp_conf.items(): for target, conf in fsp_conf.items():

View File

@ -99,7 +99,7 @@ class BarItems(pg.GraphicsObject):
linked: LinkedSplits, linked: LinkedSplits,
plotitem: 'pg.PlotItem', # noqa plotitem: 'pg.PlotItem', # noqa
color: str = 'bracket', color: str = 'bracket',
last_bar_color: str = 'bracket', last_bar_color: str = 'original',
name: Optional[str] = None, name: Optional[str] = None,

View File

@ -41,7 +41,11 @@ from ._anchors import (
pp_tight_and_right, # wanna keep it straight in the long run pp_tight_and_right, # wanna keep it straight in the long run
gpath_pin, gpath_pin,
) )
from ..calc import humanize, pnl, puterize from ..calc import (
humanize,
pnl,
puterize,
)
from ..clearing._allocate import Allocator from ..clearing._allocate import Allocator
from ..pp import Position from ..pp import Position
from ..data._normalize import iterticks from ..data._normalize import iterticks
@ -80,9 +84,9 @@ async def update_pnl_from_feed(
''' '''
global _pnl_tasks global _pnl_tasks
pp = order_mode.current_pp pp: PositionTracker = order_mode.current_pp
live = pp.live_pp live: Position = pp.live_pp
key = live.symbol.front_fqsn() key: str = live.symbol.front_fqsn()
log.info(f'Starting pnl display for {pp.alloc.account}') log.info(f'Starting pnl display for {pp.alloc.account}')
@ -101,11 +105,22 @@ async def update_pnl_from_feed(
async with flume.stream.subscribe() as bstream: async with flume.stream.subscribe() as bstream:
# last_tick = time.time() # last_tick = time.time()
async for quotes in bstream: async for quotes in bstream:
# now = time.time() # now = time.time()
# period = now - last_tick # period = now - last_tick
for sym, quote in quotes.items(): for sym, quote in quotes.items():
# print(f'{key} PnL: sym:{sym}')
# TODO: uggggh we probably want a better state
# management then this sincce we want to enable
# updating whatever the current symbol is in
# real-time right?
if sym != key:
continue
# watch out for wrong quote msg-data if you muck
# with backend feed subs code..
# assert sym == quote['fqsn']
for tick in iterticks(quote, types): for tick in iterticks(quote, types):
# print(f'{1/period} Hz') # print(f'{1/period} Hz')
@ -119,13 +134,17 @@ async def update_pnl_from_feed(
else: else:
# compute and display pnl status # compute and display pnl status
order_mode.pane.pnl_label.format( pnl_val = (
pnl=copysign(1, size) * pnl( copysign(1, size)
# live.ppu, *
order_mode.current_pp.live_pp.ppu, pnl(
tick['price'], # live.ppu,
), order_mode.current_pp.live_pp.ppu,
) tick['price'],
)
)
# print(f'formatting PNL {sym} => {pnl_val}')
order_mode.pane.pnl_label.format(pnl=pnl_val)
# last_tick = time.time() # last_tick = time.time()
finally: finally:

View File

@ -240,12 +240,12 @@ def hcolor(name: str) -> str:
'gunmetal': '#91A3B0', 'gunmetal': '#91A3B0',
'battleship': '#848482', 'battleship': '#848482',
# default ohlc-bars/curve gray
'bracket': '#666666', # like the logo
# bluish # bluish
'charcoal': '#36454F', 'charcoal': '#36454F',
# default bars
'bracket': '#666666', # like the logo
# work well for filled polygons which want a 'bracket' feel # work well for filled polygons which want a 'bracket' feel
# going light to dark # going light to dark
'davies': '#555555', 'davies': '#555555',

View File

@ -88,7 +88,7 @@ class Dialog(Struct):
# TODO: use ``pydantic.UUID4`` field # TODO: use ``pydantic.UUID4`` field
uuid: str uuid: str
order: Order order: Order
symbol: Symbol symbol: str
lines: list[LevelLine] lines: list[LevelLine]
last_status_close: Callable = lambda: None last_status_close: Callable = lambda: None
msgs: dict[str, dict] = {} msgs: dict[str, dict] = {}
@ -379,7 +379,7 @@ class OrderMode:
dialog = Dialog( dialog = Dialog(
uuid=order.oid, uuid=order.oid,
order=order, order=order,
symbol=order.symbol, symbol=order.symbol, # XXX: always a str?
lines=lines, lines=lines,
last_status_close=self.multistatus.open_status( last_status_close=self.multistatus.open_status(
f'submitting {order.exec_mode}-{order.action}', f'submitting {order.exec_mode}-{order.action}',
@ -930,7 +930,6 @@ async def process_trade_msg(
) -> tuple[Dialog, Status]: ) -> tuple[Dialog, Status]:
get_index = mode.chart.get_index
fmsg = pformat(msg) fmsg = pformat(msg)
log.debug(f'Received order msg:\n{fmsg}') log.debug(f'Received order msg:\n{fmsg}')
name = msg['name'] name = msg['name']
@ -965,6 +964,10 @@ async def process_trade_msg(
oid = msg.oid oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid) dialog: Dialog = mode.dialogs.get(oid)
if dialog:
fqsn = dialog.symbol
flume = mode.feed.flumes[fqsn]
match msg: match msg:
case Status( case Status(
resp='dark_open' | 'open', resp='dark_open' | 'open',
@ -1034,10 +1037,11 @@ async def process_trade_msg(
# should only be one "fill" for an alert # should only be one "fill" for an alert
# add a triangle and remove the level line # add a triangle and remove the level line
req = Order(**req) req = Order(**req)
index = flume.get_index(time.time())
mode.on_fill( mode.on_fill(
oid, oid,
price=req.price, price=req.price,
arrow_index=get_index(time.time()), arrow_index=index,
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
msg.req = req msg.req = req
@ -1065,26 +1069,27 @@ async def process_trade_msg(
action = order.action action = order.action
details = msg.brokerd_msg details = msg.brokerd_msg
# TODO: put the actual exchange timestamp?
# NOTE: currently the ``kraken`` openOrders sub
# doesn't deliver their engine timestamp as part of
# it's schema, so this value is **not** from them
# (see our backend code). We should probably either
# include all provider-engine timestamps in the
# summary 'closed' status msg and/or figure out
# a way to indicate what is a `brokerd` stamp versus
# a true backend one? This will require finagling
# with how each backend tracks/summarizes time
# stamps for the downstream API.
index = flume.get_index(
details['broker_time']
)
# TODO: some kinda progress system # TODO: some kinda progress system
mode.on_fill( mode.on_fill(
oid, oid,
price=details['price'], price=details['price'],
arrow_index=index,
pointing='up' if action == 'buy' else 'down', pointing='up' if action == 'buy' else 'down',
# TODO: put the actual exchange timestamp
arrow_index=get_index(
# TODO: note currently the ``kraken`` openOrders sub
# doesn't deliver their engine timestamp as part of
# it's schema, so this value is **not** from them
# (see our backend code). We should probably either
# include all provider-engine timestamps in the
# summary 'closed' status msg and/or figure out
# a way to indicate what is a `brokerd` stamp versus
# a true backend one? This will require finagling
# with how each backend tracks/summarizes time
# stamps for the downstream API.
details['broker_time']
),
) )
# TODO: append these fill events to the position's clear # TODO: append these fill events to the position's clear