piker/piker/ui/_display.py

895 lines
26 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Real-time display tasks for charting / graphics.
'''
from contextlib import asynccontextmanager
import time
from types import ModuleType
import numpy as np
from pydantic import create_model
import tractor
import trio
from .. import brokers
from ..data.feed import (
open_feed,
# Feed,
)
from ._chart import (
ChartPlotWidget,
LinkedSplits,
GodWidget,
)
from .. import fsp
from ._l1 import L1Labels
from ..data._sharedmem import ShmArray, maybe_open_shm_array
from ._forms import (
FieldsForm,
mk_form,
mk_order_pane_layout,
open_form_input_handling,
)
from .order_mode import open_order_mode
from ..log import get_logger
log = get_logger(__name__)
_clear_throttle_rate: int = 58 # Hz
_book_throttle_rate: int = 16 # Hz
async def chart_from_quotes(
chart: ChartPlotWidget,
stream: tractor.MsgStream,
ohlcv: np.ndarray,
wap_in_history: bool = False,
) -> None:
'''The 'main' (price) chart real-time update loop.
Receive from the quote stream and update the OHLC chart.
'''
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
# done in one place and "graphics update routines"
# should not be doing any length checking and array diffing.
# - handle odd lot orders
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
def maxmin():
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._arrays['ohlc']
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar - ifirst:rbar - ifirst]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
return last_bars_range, mx, max(mn, 0)
chart.default_view()
last_bars_range, last_mx, last_mn = maxmin()
last, volume = ohlcv.array[-1][['close', 'volume']]
symbol = chart.linked.symbol
l1 = L1Labels(
chart,
# determine precision/decimal lengths
digits=symbol.tick_size_digits,
size_digits=symbol.lot_size_digits,
)
chart._l1_labels = l1
# TODO:
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# - if trade volume jumps above / below prior L1 price
# levels this might be dark volume we need to
# present differently?
tick_size = chart.linked.symbol.tick_size
tick_margin = 2 * tick_size
last_ask = last_bid = last_clear = time.time()
chart.show()
async for quotes in stream:
# chart isn't actively shown so just skip render cycle
if chart.linked.isHidden():
await chart.pause_all_feeds()
continue
for sym, quote in quotes.items():
now = time.time()
for tick in quote.get('ticks', ()):
# print(f"CHART: {quote['symbol']}: {tick}")
ticktype = tick.get('type')
price = tick.get('price')
size = tick.get('size')
if ticktype == 'n/a' or price == -1:
# okkk..
continue
# clearing price event
if ticktype in ('trade', 'utrade', 'last'):
# throttle clearing price updates to ~ max 60 FPS
period = now - last_clear
if period <= 1/_clear_throttle_rate:
# faster then display refresh rate
continue
# print(f'passthrough {tick}\n{1/(now-last_clear)}')
# set time of last graphics update
last_clear = now
array = ohlcv.array
# update price sticky(s)
end = array[-1]
last_price_sticky.update_from_data(
*end[['index', 'close']]
)
# plot bars
# update price bar
chart.update_ohlc_from_array(
chart.name,
array,
)
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array('bar_wap', ohlcv.array)
# l1 book events
# throttle the book graphics updates at a lower rate
# since they aren't as critical for a manual user
# viewing the chart
elif ticktype in ('ask', 'asize'):
if (now - last_ask) <= 1/_book_throttle_rate:
# print(f'skipping\n{tick}')
continue
# print(f'passthrough {tick}\n{1/(now-last_ask)}')
last_ask = now
elif ticktype in ('bid', 'bsize'):
if (now - last_bid) <= 1/_book_throttle_rate:
continue
# print(f'passthrough {tick}\n{1/(now-last_bid)}')
last_bid = now
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
brange, mx_in_view, mn_in_view = maxmin()
l, lbar, rbar, r = brange
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
# XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'):
if ticktype in ('last',): # 'size'):
label = {
l1.ask_label.fields['level']: l1.ask_label,
l1.bid_label.fields['level']: l1.bid_label,
}.get(price)
if label is not None:
label.update_fields({'level': price, 'size': size})
# on trades should we be knocking down
# the relevant L1 queue?
# label.size -= size
elif ticktype in ('ask', 'asize'):
l1.ask_label.update_fields({'level': price, 'size': size})
elif ticktype in ('bid', 'bsize'):
l1.bid_label.update_fields({'level': price, 'size': size})
# update min price in view to keep bid on screen
mn = min(price - tick_margin, mn)
# update max price in view to keep ask on screen
mx = max(price + tick_margin, mx)
if (mx > last_mx) or (
mn < last_mn
):
# print(f'new y range: {(mn, mx)}')
chart._set_yrange(
yrange=(mn, mx),
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
)
last_mx, last_mn = mx, mn
async def fan_out_spawn_fsp_daemons(
linkedsplits: LinkedSplits,
fsps: dict[str, str],
sym: str,
src_shm: list,
brokermod: ModuleType,
group_status_key: str,
loglevel: str,
) -> None:
'''Create financial signal processing sub-actors (under flat tree)
for each entry in config and attach to local graphics update tasks.
Pass target entrypoint and historical data.
'''
linkedsplits.focus()
uid = tractor.current_actor().uid
# spawns sub-processes which execute cpu bound FSP code
async with (
tractor.open_nursery() as n,
trio.open_nursery() as ln,
):
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for display_name, conf in fsps.items():
fsp_func_name = conf['fsp_func_name']
# TODO: load function here and introspect
# return stream type(s)
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
# this is all sync currently
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
# XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely
# already exists?"
conf['shm'] = shm
portal = await n.start_actor(
enable_modules=['piker.fsp._engine'],
name='fsp.' + display_name,
)
# init async
ln.start_soon(
run_fsp,
portal,
linkedsplits,
brokermod,
sym,
src_shm,
fsp_func_name,
display_name,
conf,
group_status_key,
loglevel,
)
# blocks here until all fsp actors complete
@asynccontextmanager
async def open_sidepane(
linked: LinkedSplits,
conf: dict[str, dict[str, str]],
) -> FieldsForm:
schema = {}
assert len(conf) == 1 # for now
# add (single) selection widget
for display_name, config in conf.items():
schema[display_name] = {
'label': '**fsp**:',
'type': 'select',
'default_value': [display_name],
}
# add parameters for selection "options"
defaults = config.get('params', {})
for name, default in defaults.items():
# add to ORM schema
schema.update({
name: {
'label': f'**{name}**:',
'type': 'edit',
'default_value': default,
},
})
sidepane: FieldsForm = mk_form(
parent=linked.godwidget,
fields_schema=schema,
)
# https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation
FspConfig = create_model(
'FspConfig',
name=display_name,
**defaults,
)
sidepane.model = FspConfig()
# just a logger for now until we get fsp configs up and running.
async def settings_change(key: str, value: str) -> bool:
print(f'{key}: {value}')
return True
# TODO:
async with (
open_form_input_handling(
sidepane,
focus_next=linked.godwidget,
on_value_change=settings_change,
)
):
yield sidepane
async def run_fsp(
portal: tractor._portal.Portal,
linkedsplits: LinkedSplits,
brokermod: ModuleType,
sym: str,
src_shm: ShmArray,
fsp_func_name: str,
display_name: str,
conf: dict[str, dict],
group_status_key: str,
loglevel: str,
) -> None:
'''FSP stream chart update loop.
This is called once for each entry in the fsp
config map.
'''
done = linkedsplits.window().status_bar.open_status(
f'loading fsp, {display_name}..',
group_key=group_status_key,
)
async with (
portal.open_context(
# chaining entrypoint
fsp.cascade,
# name as title of sub-chart
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
func_name=fsp_func_name,
loglevel=loglevel,
) as (ctx, last_index),
ctx.open_stream() as stream,
open_sidepane(
linkedsplits,
{display_name: conf},
) as sidepane,
):
shm = conf['shm']
if conf.get('overlay'):
chart = linkedsplits.chart
chart.draw_curve(
name=display_name,
data=shm.array,
overlay=True,
color='default_light',
)
last_val_sticky = None
else:
chart = linkedsplits.add_plot(
name=display_name,
array=shm.array,
array_key=conf['fsp_func_name'],
sidepane=sidepane,
# curve by default
ohlc=False,
# settings passed down to ``ChartPlotWidget``
**conf.get('chart_kwargs', {})
# static_yrange=(0, 100),
)
# XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linkedsplits.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value
array = shm.array
# XXX: fsp func names must be unique meaning we don't have
# duplicates of the underlying data even if multiple
# sub-charts reference it under different 'named charts'.
value = array[fsp_func_name][-1]
last_val_sticky.update_from_data(-1, value)
chart.linked.focus()
# works also for overlays in which case data is looked up from
# internal chart array set....
chart.update_curve_from_array(
display_name,
shm.array,
array_key=fsp_func_name
)
chart.linked.resize_sidepanes()
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
# ``pg.FillBetweenItems`` seems to be one technique using
# generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50)
if fsp_func_name == 'rsi':
from ._lines import level_line
# add moveable over-[sold/bought] lines
# and labels only for the 70/30 lines
level_line(chart, 20)
level_line(chart, 30, orient_v='top')
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top')
chart._set_yrange()
last = time.time()
done()
# i = 0
# update chart graphics
async for value in stream:
# chart isn't actively shown so just skip render cycle
if chart.linked.isHidden():
# print(f'{i} unseen fsp cyclce')
# i += 1
continue
now = time.time()
period = now - last
# if period <= 1/30:
if period <= 1/_clear_throttle_rate:
# faster then display refresh rate
# print(f'fsp too fast: {1/period}')
continue
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
# update graphics
chart.update_curve_from_array(
display_name,
array,
array_key=fsp_func_name,
)
# set time of last graphics update
last = time.time()
async def check_for_new_bars(feed, ohlcv, linkedsplits):
"""Task which updates from new bars in the shared ohlcv buffer every
``delay_s`` seconds.
"""
# TODO: right now we'll spin printing bars if the last time
# stamp is before a large period of no market activity.
# Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours?
price_chart = linkedsplits.chart
price_chart.default_view()
async with feed.index_stream() as stream:
async for index in stream:
# update chart historical bars graphics by incrementing
# a time step and drawing the history and new bar
# When appending a new bar, in the time between the insert
# from the writing process and the Qt render call, here,
# the index of the shm buffer may be incremented and the
# (render) call here might read the new flat bar appended
# to the buffer (since -1 index read). In that case H==L and the
# body will be set as None (not drawn) on what this render call
# *thinks* is the curent bar (even though it's reading data from
# the newly inserted flat bar.
#
# HACK: We need to therefore write only the history (not the
# current bar) and then either write the current bar manually
# or place a cursor for visual cue of the current time step.
# XXX: this puts a flat bar on the current time step
# TODO: if we eventually have an x-axis time-step "cursor"
# we can get rid of this since it is extra overhead.
price_chart.update_ohlc_from_array(
price_chart.name,
ohlcv.array,
just_history=False,
)
for name in price_chart._overlays:
price_chart.update_curve_from_array(
name,
price_chart._arrays[name]
)
for name, chart in linkedsplits.subplots.items():
chart.update_curve_from_array(
chart.name,
chart._shm.array,
array_key=chart.data_key
)
# shift the view if in follow mode
price_chart.increment_view()
def has_vlm(ohlcv: ShmArray) -> bool:
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
volm = ohlcv.array['volume']
return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
@asynccontextmanager
async def maybe_open_vlm_display(
linked: LinkedSplits,
ohlcv: ShmArray,
) -> ChartPlotWidget:
if not has_vlm(ohlcv):
log.warning(f"{linked.symbol.key} does not seem to have volume info")
else:
async with open_sidepane(
linked, {
'volume': {
'params': {
'price_func': 'ohl3'
}
}
},
) as sidepane:
# built-in $vlm
shm = ohlcv
chart = linked.add_plot(
name='volume',
array=shm.array,
array_key='volume',
sidepane=sidepane,
# curve by default
ohlc=False,
style='step',
# vertical bars, we do this internally ourselves
# stepMode=True,
)
# XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linked.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value
value = shm.array['volume'][-1]
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(
'volume',
shm.array,
)
# size view to data once at outset
chart._set_yrange()
# size pain to parent chart
# TODO: this appears to nearly fix a bug where the vlm sidepane
# could be sized correctly nearly immediately (since the
# order pane is already sized), right now it doesn't seem to
# fully align until the VWAP fsp-actor comes up...
await trio.sleep(0)
chart.linked.resize_sidepanes()
await trio.sleep(0)
yield chart
async def display_symbol_data(
godwidget: GodWidget,
provider: str,
sym: str,
loglevel: str,
order_mode_started: trio.Event,
) -> None:
'''Spawn a real-time updated chart for ``symbol``.
Spawned ``LinkedSplits`` chart widgets can remain up but hidden so
that multiple symbols can be viewed and switched between extremely
fast from a cached watch-list.
'''
sbar = godwidget.window.status_bar
loading_sym_key = sbar.open_status(
f'loading {sym}.{provider} ->',
group_key=True
)
# historical data fetch
brokermod = brokers.get_brokermod(provider)
# ohlc_status_done = sbar.open_status(
# 'retreiving OHLC history.. ',
# clear_on_next=True,
# group_key=loading_sym_key,
# )
async with(
open_feed(
provider,
[sym],
loglevel=loglevel,
# 60 FPS to limit context switches
tick_throttle=_clear_throttle_rate,
) as feed,
):
ohlcv: ShmArray = feed.shm
bars = ohlcv.array
symbol = feed.symbols[sym]
# load in symbol's ohlc data
godwidget.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
linkedsplits = godwidget.linkedsplits
linkedsplits._symbol = symbol
# generate order mode side-pane UI
# A ``FieldsForm`` form to configure order entry
pp_pane: FieldsForm = mk_order_pane_layout(godwidget)
# add as next-to-y-axis singleton pane
godwidget.pp_pane = pp_pane
# create main OHLC chart
chart = linkedsplits.plot_ohlc_main(
symbol,
bars,
sidepane=pp_pane,
)
chart._feeds[symbol.key] = feed
chart.setFocus()
# plot historical vwap if available
wap_in_history = False
if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# size view to data once at outset
chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = {
'rsi': {
'fsp_func_name': 'rsi',
'params': {'period': 14},
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
if has_vlm(ohlcv):
# add VWAP to fsp config for downstream loading
fsp_conf.update({
'vwap': {
'fsp_func_name': 'vwap',
'overlay': True,
'anchor': 'session',
},
})
# NOTE: we must immediately tell Qt to show the OHLC chart
# to avoid a race where the subplots get added/shown to
# the linked set *before* the main price chart!
linkedsplits.show()
linkedsplits.focus()
await trio.sleep(0)
async with (
trio.open_nursery() as ln,
):
# load initial fsp chain (otherwise known as "indicators")
ln.start_soon(
fan_out_spawn_fsp_daemons,
linkedsplits,
fsp_conf,
sym,
ohlcv,
brokermod,
loading_sym_key,
loglevel,
)
# start graphics update loop(s)after receiving first live quote
ln.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
wap_in_history,
)
ln.start_soon(
check_for_new_bars,
feed,
ohlcv,
linkedsplits
)
async with (
maybe_open_vlm_display(linkedsplits, ohlcv),
open_order_mode(
feed,
chart,
symbol,
provider,
order_mode_started
)
):
await trio.sleep_forever()