Merge pull request #304 from pikers/offline_history_loading

Offline history loading
no_orderid_in_error
goodboy 2022-04-16 15:57:14 -04:00 committed by GitHub
commit 67cec4bc54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 449 additions and 309 deletions

View File

@ -37,7 +37,6 @@ import asyncio
from pprint import pformat
import inspect
import logging
import platform
from random import randint
import time
@ -1583,7 +1582,7 @@ async def backfill_bars(
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
count: int = 100,
count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -1603,11 +1602,6 @@ async def backfill_bars(
# async with open_history_client(fqsn) as proxy:
async with open_client_proxy() as proxy:
if platform.system() == 'Windows':
log.warning(
'Decreasing history query count to 4 since, windows...')
count = 4
out, fails = await get_bars(proxy, fqsn)
if out is None:

View File

@ -76,7 +76,6 @@ async def filter_quotes_by_sym(
async def fsp_compute(
ctx: tractor.Context,
symbol: Symbol,
feed: Feed,
quote_stream: trio.abc.ReceiveChannel,
@ -86,7 +85,7 @@ async def fsp_compute(
func: Callable,
attach_stream: bool = False,
# attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -127,8 +126,8 @@ async def fsp_compute(
# each respective field.
fields = getattr(dst.array.dtype, 'fields', None).copy()
fields.pop('index')
# TODO: nptyping here!
history: Optional[np.ndarray] = None
history: Optional[np.ndarray] = None # TODO: nptyping here!
if fields and len(fields) > 1 and fields:
if not isinstance(history_output, dict):
raise ValueError(
@ -193,22 +192,23 @@ async def fsp_compute(
profiler(f'{func_name} pushed history')
profiler.finish()
# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)
# setup a respawn handle
with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index')
# import time
# last = time.time()
try:
# rt stream
async with ctx.open_stream() as stream:
async for processed in out_stream:
log.debug(f"{func_name}: {processed}")
@ -219,8 +219,14 @@ async def fsp_compute(
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm
if attach_stream:
await stream.send(index)
# TODO: further this should likely be implemented much
# like our `Feed` api where there is one background
# "service" task which computes output and then sends to
# N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem
# chans for the fan out?
# if attach_stream:
# await client_stream.send(index)
# period = time.time() - last
# hz = 1/period if period else float('nan')
@ -314,7 +320,6 @@ async def cascade(
fsp_target = partial(
fsp_compute,
ctx=ctx,
symbol=symbol,
feed=feed,
quote_stream=quote_stream,
@ -323,7 +328,7 @@ async def cascade(
src=src,
dst=dst,
# func_name=func_name,
# target
func=func
)
@ -335,13 +340,34 @@ async def cascade(
profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
# sync client
await ctx.started(index)
# XXX: rt stream with client which we MUST
# open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
# TODO: these likely should all become
# methods of this ``TaskLifetime`` or wtv
# abstraction..
async def resync(
tracker: TaskTracker,
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
tracker, index = await n.start(fsp_target)
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send('update')
return tracker, index
def is_synced(
src: ShmArray,
@ -388,7 +414,9 @@ async def cascade(
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream(int(delay_s)) as istream:
async with feed.index_stream(
int(delay_s)
) as istream:
profiler(f'{func_name}: sample stream up')
profiler.finish()

View File

@ -19,7 +19,7 @@ High level chart-widget apis.
'''
from __future__ import annotations
from typing import Optional
from typing import Optional, TYPE_CHECKING
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtCore import Qt
@ -63,6 +63,8 @@ from ._interaction import ChartView
from ._forms import FieldsForm
from ._overlay import PlotItemOverlay
if TYPE_CHECKING:
from ._display import DisplayState
log = get_logger(__name__)
@ -230,6 +232,7 @@ class GodWidget(QWidget):
# chart is already in memory so just focus it
linkedsplits.show()
linkedsplits.focus()
linkedsplits.graphics_cycle()
await trio.sleep(0)
# resume feeds *after* rendering chart view asap
@ -346,8 +349,19 @@ class LinkedSplits(QWidget):
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.addWidget(self.splitter)
# chart-local graphics state that can be passed to
# a ``graphic_update_cycle()`` call by any task wishing to
# update the UI for a given "chart instance".
self.display_state: Optional[DisplayState] = None
self._symbol: Symbol = None
def graphics_cycle(self, **kwargs) -> None:
from . import _display
ds = self.display_state
if ds:
return _display.graphics_update_cycle(ds, **kwargs)
@property
def symbol(self) -> Symbol:
return self._symbol

View File

@ -21,9 +21,10 @@ this module ties together quote and computational (fsp) streams with
graphics update methods via our custom ``pyqtgraph`` charting api.
'''
from dataclasses import dataclass
from functools import partial
import time
from typing import Optional
from typing import Optional, Any, Callable
import numpy as np
import tractor
@ -31,6 +32,7 @@ import trio
from .. import brokers
from ..data.feed import open_feed
from ._axes import YAxisLabel
from ._chart import (
ChartPlotWidget,
LinkedSplits,
@ -109,6 +111,33 @@ def chart_maxmin(
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
@dataclass
class DisplayState:
'''
Chart-local real-time graphics state container.
'''
quotes: dict[str, Any]
maxmin: Callable
ohlcv: ShmArray
# high level chart handles
linked: LinkedSplits
chart: ChartPlotWidget
vlm_chart: ChartPlotWidget
# axis labels
l1: L1Labels
last_price_sticky: YAxisLabel
vlm_sticky: YAxisLabel
# misc state tracking
vars: dict[str, Any]
wap_in_history: bool = False
async def graphics_update_loop(
linked: LinkedSplits,
@ -147,7 +176,6 @@ async def graphics_update_loop(
if vlm_chart:
vlm_sticky = vlm_chart._ysticks['volume']
vlm_view = vlm_chart.view
maxmin = partial(chart_maxmin, chart, vlm_chart)
chart.default_view()
@ -183,7 +211,7 @@ async def graphics_update_loop(
tick_margin = 3 * tick_size
chart.show()
view = chart.view
# view = chart.view
last_quote = time.time()
i_last = ohlcv.index
@ -210,7 +238,29 @@ async def graphics_update_loop(
# async for quotes in iter_drain_quotes():
ds = linked.display_state = DisplayState(**{
'quotes': {},
'linked': linked,
'maxmin': maxmin,
'ohlcv': ohlcv,
'chart': chart,
'last_price_sticky': last_price_sticky,
'vlm_chart': vlm_chart,
'vlm_sticky': vlm_sticky,
'l1': l1,
'vars': {
'tick_margin': tick_margin,
'i_last': i_last,
'last_mx_vlm': last_mx_vlm,
'last_mx': last_mx,
'last_mn': last_mn,
}
})
# main loop
async for quotes in stream:
ds.quotes = quotes
quote_period = time.time() - last_quote
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
@ -231,23 +281,36 @@ async def graphics_update_loop(
chart.pause_all_feeds()
continue
for sym, quote in quotes.items():
# sync call to update all graphics/UX components.
graphics_update_cycle(ds)
(
brange,
mx_in_view,
mn_in_view,
mx_vlm_in_view,
) = maxmin()
l, lbar, rbar, r = brange
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
def graphics_update_cycle(
ds: DisplayState,
wap_in_history: bool = False,
trigger_all: bool = False, # flag used by prepend history updates
) -> None:
# TODO: eventually optimize this whole graphics stack with ``numba``
# hopefully XD
# unpack multi-referenced components
chart = ds.chart
vlm_chart = ds.vlm_chart
l1 = ds.l1
ohlcv = ds.ohlcv
array = ohlcv.array
vars = ds.vars
tick_margin = vars['tick_margin']
for sym, quote in ds.quotes.items():
# NOTE: vlm may be written by the ``brokerd`` backend
# event though a tick sample is not emitted.
# TODO: show dark trades differently
# https://github.com/pikers/piker/issues/116
array = ohlcv.array
# NOTE: this used to be implemented in a dedicated
# "increment tas": ``check_for_new_bars()`` but it doesn't
@ -258,26 +321,52 @@ async def graphics_update_loop(
# increment the view position by the sample offset.
i_step = ohlcv.index
i_diff = i_step - i_last
i_diff = i_step - vars['i_last']
if i_diff > 0:
chart.increment_view(
steps=i_diff,
)
i_last = i_step
vars['i_last'] = i_step
(
brange,
mx_in_view,
mn_in_view,
mx_vlm_in_view,
) = ds.maxmin()
l, lbar, rbar, r = brange
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
liv = r > i_step # the last datum is in view
# don't real-time "shift" the curve to the
# left under the following conditions:
if (
(
i_diff > 0 # no new sample step
and liv
)
or trigger_all
):
# TODO: we should track and compute whether the last
# pixel in a curve should show new data based on uppx
# and then iff update curves and shift?
chart.increment_view(steps=i_diff)
if vlm_chart:
vlm_chart.update_curve_from_array('volume', array)
vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
ds.vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
if (
mx_vlm_in_view != last_mx_vlm or
mx_vlm_in_view > last_mx_vlm
mx_vlm_in_view > vars['last_mx_vlm']
or trigger_all
):
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vlm_view._set_yrange(
vlm_chart.view._set_yrange(
yrange=(0, mx_vlm_in_view * 1.375)
)
last_mx_vlm = mx_vlm_in_view
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
update_fsp_chart(
@ -336,6 +425,12 @@ async def graphics_update_loop(
# last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# update ohlc sampled price bars
chart.update_ohlc_from_array(
chart.name,
array,
)
# iterate in FIFO order per frame
for typ, tick in lasts.items():
@ -364,19 +459,16 @@ async def graphics_update_loop(
# update price sticky(s)
end = array[-1]
last_price_sticky.update_from_data(
ds.last_price_sticky.update_from_data(
*end[['index', 'close']]
)
# update ohlc sampled price bars
chart.update_ohlc_from_array(
chart.name,
array,
)
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array('bar_wap', ohlcv.array)
chart.update_curve_from_array(
'bar_wap',
array,
)
# L1 book label-line updates
# XXX: is this correct for ib?
@ -390,7 +482,9 @@ async def graphics_update_loop(
}.get(price)
if label is not None:
label.update_fields({'level': price, 'size': size})
label.update_fields(
{'level': price, 'size': size}
)
# TODO: on trades should we be knocking down
# the relevant L1 queue?
@ -406,11 +500,11 @@ async def graphics_update_loop(
# check for y-range re-size
if (
(mx > last_mx) or (mn < last_mn)
(mx > vars['last_mx']) or (mn < vars['last_mn'])
and not chart._static_yrange == 'axis'
):
# print(f'new y range: {(mn, mx)}')
view._set_yrange(
chart.view._set_yrange(
yrange=(mn, mx),
# TODO: we should probably scale
# the view margin based on the size
@ -420,10 +514,10 @@ async def graphics_update_loop(
# range_margin=0.1,
)
last_mx, last_mn = mx, mn
vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all derived fsp subplots
for name, subchart in linked.subplots.items():
for name, subchart in ds.linked.subplots.items():
update_fsp_chart(
subchart,
subchart._shm,
@ -444,9 +538,6 @@ async def graphics_update_loop(
curve_name,
array_key=curve_name,
)
# chart.view._set_yrange()
# loop end
async def display_symbol_data(
@ -479,8 +570,10 @@ async def display_symbol_data(
# clear_on_next=True,
# group_key=loading_sym_key,
# )
fqsn = '.'.join((sym, provider))
async with open_feed(
['.'.join((sym, provider))],
[fqsn],
loglevel=loglevel,
# limit to at least display's FPS

View File

@ -438,6 +438,17 @@ class FspAdmin:
started.set()
# wait for graceful shutdown signal
async with stream.subscribe() as stream:
async for msg in stream:
if msg == 'update':
# if the chart isn't hidden try to update
# the data on screen.
if not self.linked.isHidden():
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle(trigger_all=True)
else:
log.info(f'recved unexpected fsp engine msg: {msg}')
await complete.wait()
async def start_engine_task(