Merge pull request #230 from pikers/super_basic_brokerd_status

Super basic brokerd status
fsp_feeds
goodboy 2021-10-28 13:04:22 -04:00 committed by GitHub
commit 91c005b3c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 361 additions and 187 deletions

View File

@ -20,10 +20,17 @@ Handy financial calculations.
import math
import itertools
from bidict import bidict
_mag2suffix = bidict({3: 'k', 6: 'M', 9: 'B'})
def humanize(
number: float,
digits: int = 1
) -> str:
'''Convert large numbers to something with at most ``digits`` and
a letter suffix (eg. k: thousand, M: million, B: billion).
@ -36,19 +43,38 @@ def humanize(
if not number or number <= 0:
return round(number, ndigits=digits)
mag2suffix = {3: 'k', 6: 'M', 9: 'B'}
mag = math.floor(math.log(number, 10))
if mag < 3:
return round(number, ndigits=digits)
maxmag = max(itertools.takewhile(lambda key: mag >= key, mag2suffix))
maxmag = max(itertools.takewhile(lambda key: mag >= key, _mag2suffix))
return "{value}{suffix}".format(
value=round(number/10**maxmag, ndigits=digits),
suffix=mag2suffix[maxmag],
suffix=_mag2suffix[maxmag],
)
def puterize(
text: str,
digits: int = 1,
) -> float:
'''Inverse of ``humanize()`` above.
'''
try:
suffix = str(text)[-1]
mult = _mag2suffix.inverse[suffix]
value = text.rstrip(suffix)
return round(float(value) * 10**mult, ndigits=digits)
except KeyError:
# no matching suffix try just the value
return float(text)
def pnl(
init: float,

View File

@ -373,7 +373,9 @@ async def open_brokerd_trades_dialogue(
broker = feed.mod.name
# TODO: make a `tractor` bug/test for this!
# portal = feed._brokerd_portal
# if only i could member what the problem was..
# probably some GC of the portal thing?
# portal = feed.portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed

View File

@ -393,18 +393,23 @@ class Feed:
shm: ShmArray
mod: ModuleType
first_quotes: dict # symbol names to first quote dicts
stream: trio.abc.ReceiveChannel[dict[str, Any]]
_brokerd_portal: tractor._portal.Portal
_portal: tractor.Portal
stream: trio.abc.ReceiveChannel[dict[str, Any]]
throttle_rate: Optional[int] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 0
search: Callable[..., Awaitable] = None
# cache of symbol info messages received as first message when
# a stream startsc.
symbols: dict[str, Symbol] = field(default_factory=dict)
@property
def portal(self) -> tractor.Portal:
return self._portal
async def receive(self) -> dict:
return await self.stream.receive()
@ -418,7 +423,7 @@ class Feed:
delay_s = delay_s or self._max_sample_rate
async with open_sample_step_stream(
self._brokerd_portal,
self.portal,
delay_s,
) as istream:
yield istream
@ -526,7 +531,8 @@ async def open_feed(
mod=mod,
first_quotes=first_quotes,
stream=stream,
_brokerd_portal=portal,
_portal=portal,
throttle_rate=tick_throttle,
)
ohlc_sample_rates = []

View File

@ -18,14 +18,14 @@
Financial signal processing for the peeps.
"""
from functools import partial
from typing import AsyncIterator, Callable, Tuple
from typing import AsyncIterator, Callable, Tuple, Optional
import trio
from trio_typing import TaskStatus
import tractor
import numpy as np
from ..log import get_logger
from ..log import get_logger, get_console_log
from .. import data
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
@ -134,7 +134,7 @@ async def fsp_compute(
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
@ -149,6 +149,12 @@ async def fsp_compute(
# rt stream
async for processed in out_stream:
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
@ -165,12 +171,16 @@ async def cascade(
dst_shm_token: Tuple[str, np.dtype],
symbol: str,
fsp_func_name: str,
loglevel: Optional[str] = None,
) -> None:
"""Chain streaming signal processors and deliver output to
'''Chain streaming signal processors and deliver output to
destination mem buf.
"""
'''
if loglevel:
get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)
@ -180,6 +190,10 @@ async def cascade(
async with data.feed.maybe_open_feed(
brokername,
[symbol],
# TODO:
# tick_throttle=60,
) as (feed, stream):
assert src.token == feed.shm.token

View File

@ -276,6 +276,8 @@ class ChartnPane(QFrame):
hbox.setContentsMargins(0, 0, 0, 0)
hbox.setSpacing(3)
# self.setMaximumWidth()
class LinkedSplits(QWidget):
'''
@ -339,7 +341,8 @@ class LinkedSplits(QWidget):
def set_split_sizes(
self,
prop: float = 0.375 # proportion allocated to consumer subcharts
# prop: float = 0.375, # proportion allocated to consumer subcharts
prop: float = 5/8,
) -> None:
'''Set the proportion of space allocated for linked subcharts.
@ -450,7 +453,6 @@ class LinkedSplits(QWidget):
self.xaxis = xaxis
qframe = ChartnPane(sidepane=sidepane, parent=self.splitter)
cpw = ChartPlotWidget(
# this name will be used to register the primary
@ -522,10 +524,10 @@ class LinkedSplits(QWidget):
# track by name
self.subplots[name] = cpw
if sidepane:
# TODO: use a "panes" collection to manage this?
sidepane.setMinimumWidth(self.chart.sidepane.width())
sidepane.setMaximumWidth(self.chart.sidepane.width())
# if sidepane:
# # TODO: use a "panes" collection to manage this?
# qframe.setMaximumWidth(self.chart.sidepane.width())
# qframe.setMinimumWidth(self.chart.sidepane.width())
self.splitter.addWidget(qframe)
@ -537,6 +539,16 @@ class LinkedSplits(QWidget):
return cpw
def resize_sidepanes(
self,
) -> None:
'''Size all sidepanes based on the OHLC "main" plot.
'''
for name, cpw in self.subplots.items():
cpw.sidepane.setMinimumWidth(self.chart.sidepane.width())
cpw.sidepane.setMaximumWidth(self.chart.sidepane.width())
class ChartPlotWidget(pg.PlotWidget):
'''
@ -681,9 +693,9 @@ class ChartPlotWidget(pg.PlotWidget):
"""Return a range tuple for the bars present in view.
"""
l, r = self.view_range()
a = self._arrays['ohlc']
lbar = max(l, a[0]['index'])
rbar = min(r, a[-1]['index'])
array = self._arrays['ohlc']
lbar = max(l, array[0]['index'])
rbar = min(r, array[-1]['index'])
return l, lbar, rbar, r
def default_view(
@ -991,22 +1003,19 @@ class ChartPlotWidget(pg.PlotWidget):
a = self._arrays['ohlc']
ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst + 1]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
return
# TODO: should probably just have some kinda attr mark
# that determines this behavior based on array type
try:
if self.data_key != self.linked.symbol.key:
bars = a[self.data_key]
ylow = np.nanmin(bars)
yhigh = np.nanmax((bars))
else:
# just the std ohlc bars
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
except (IndexError, ValueError):
# likely non-ohlc array?
bars = bars[self.name]
ylow = np.nanmin(bars)
yhigh = np.nanmax(bars)
if set_range:
# view margins: stay within a % of the "true range"

View File

@ -18,6 +18,7 @@
Real-time display tasks for charting / graphics.
'''
from contextlib import asynccontextmanager
import time
from typing import Any
from types import ModuleType
@ -264,7 +265,7 @@ async def chart_from_quotes(
last_mx, last_mn = mx, mn
async def spawn_fsps(
async def fan_out_spawn_fsp_daemons(
linkedsplits: LinkedSplits,
fsps: dict[str, str],
@ -275,22 +276,21 @@ async def spawn_fsps(
loglevel: str,
) -> None:
"""Start financial signal processing in subactor.
'''Create financial signal processing sub-actors (under flat tree)
for each entry in config and attach to local graphics update tasks.
Pass target entrypoint and historical data.
"""
'''
linkedsplits.focus()
uid = tractor.current_actor().uid
# spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery(loglevel=loglevel) as n:
# spawns local task that consume and chart data streams from
# sub-procs
async with trio.open_nursery() as ln:
async with (
tractor.open_nursery() as n,
trio.open_nursery() as ln,
):
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
@ -339,45 +339,30 @@ async def spawn_fsps(
display_name,
conf,
group_status_key,
loglevel,
)
# blocks here until all fsp actors complete
async def run_fsp(
portal: tractor._portal.Portal,
linkedsplits: LinkedSplits,
brokermod: ModuleType,
sym: str,
src_shm: ShmArray,
fsp_func_name: str,
display_name: str,
conf: dict[str, Any],
group_status_key: str,
) -> None:
"""FSP stream chart update loop.
This is called once for each entry in the fsp
config map.
"""
done = linkedsplits.window().status_bar.open_status(
f'loading fsp, {display_name}..',
group_key=group_status_key,
)
# make sidepane config widget
class FspConfig(BaseModel):
class Config:
validate_assignment = True
name: str
period: int
@asynccontextmanager
async def open_sidepane(
linked: LinkedSplits,
display_name: str,
) -> FspConfig:
sidepane: FieldsForm = mk_form(
parent=linkedsplits.godwidget,
parent=linked.godwidget,
fields_schema={
'name': {
'label': '**fsp**:',
@ -386,6 +371,8 @@ async def run_fsp(
f'{display_name}'
],
},
# TODO: generate this from input map
'period': {
'label': '**period**:',
'type': 'edit',
@ -403,10 +390,46 @@ async def run_fsp(
print(f'{key}: {value}')
return True
# TODO:
async with (
open_form_input_handling(
sidepane,
focus_next=linked.godwidget,
on_value_change=settings_change,
)
):
yield sidepane
async def run_fsp(
portal: tractor._portal.Portal,
linkedsplits: LinkedSplits,
brokermod: ModuleType,
sym: str,
src_shm: ShmArray,
fsp_func_name: str,
display_name: str,
conf: dict[str, Any],
group_status_key: str,
loglevel: str,
) -> None:
'''FSP stream chart update loop.
This is called once for each entry in the fsp
config map.
'''
done = linkedsplits.window().status_bar.open_status(
f'loading fsp, {display_name}..',
group_key=group_status_key,
)
async with (
portal.open_stream_from(
# subactor entrypoint
# chaining entrypoint
fsp.cascade,
# name as title of sub-chart
@ -415,15 +438,14 @@ async def run_fsp(
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_func_name,
loglevel=loglevel,
) as stream,
# TODO:
open_form_input_handling(
sidepane,
focus_next=linkedsplits.godwidget,
on_value_change=settings_change,
),
open_sidepane(
linkedsplits,
display_name,
) as sidepane,
):
# receive last index for processed historical
@ -472,7 +494,7 @@ async def run_fsp(
# read from last calculated value
array = shm.array
# XXX: fsp func names are unique meaning we don't have
# XXX: fsp func names must be unique meaning we don't have
# duplicates of the underlying data even if multiple
# sub-charts reference it under different 'named charts'.
value = array[fsp_func_name][-1]
@ -489,6 +511,8 @@ async def run_fsp(
array_key=fsp_func_name
)
chart.linked.resize_sidepanes()
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
# ``pg.FillBetweenItems`` seems to be one technique using
@ -622,6 +646,73 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits):
price_chart.increment_view()
def has_vlm(ohlcv: ShmArray) -> bool:
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
volm = ohlcv.array['volume']
return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
@asynccontextmanager
async def maybe_open_vlm_display(
linked: LinkedSplits,
ohlcv: ShmArray,
) -> ChartPlotWidget:
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
# volm = ohlcv.array['volume']
# if (
# np.all(np.isin(volm, -1)) or
# np.all(np.isnan(volm))
# ):
if not has_vlm(ohlcv):
log.warning(f"{linked.symbol.key} does not seem to have volume info")
else:
async with open_sidepane(linked, 'volume') as sidepane:
# built-in $vlm
shm = ohlcv
chart = linked.add_plot(
name='vlm',
array=shm.array,
array_key='volume',
sidepane=sidepane,
# curve by default
ohlc=False,
# vertical bars
# stepMode=True,
# static_yrange=(0, 100),
)
# XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linked.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value
value = shm.array['volume'][-1]
last_val_sticky.update_from_data(-1, value)
# size view to data once at outset
chart._set_yrange()
yield chart
async def display_symbol_data(
godwidget: GodWidget,
@ -686,6 +777,7 @@ async def display_symbol_data(
# add as next-to-y-axis singleton pane
godwidget.pp_pane = pp_pane
# create main OHLC chart
chart = linkedsplits.plot_ohlc_main(
symbol,
bars,
@ -722,7 +814,7 @@ async def display_symbol_data(
'static_yrange': (0, 100),
},
},
# test for duplicate fsps on same chart
# # test for duplicate fsps on same chart
# 'rsi2': {
# 'fsp_func_name': 'rsi',
# 'period': 14,
@ -733,18 +825,8 @@ async def display_symbol_data(
}
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
volm = ohlcv.array['volume']
if (
np.all(np.isin(volm, -1)) or
np.all(np.isnan(volm))
):
log.warning(
f"{sym} does not seem to have volume info,"
" dropping volume signals")
else:
if has_vlm(ohlcv):
# add VWAP to fsp config for downstream loading
fsp_conf.update({
'vwap': {
'fsp_func_name': 'vwap',
@ -756,11 +838,10 @@ async def display_symbol_data(
async with (
trio.open_nursery() as ln,
):
# load initial fsp chain (otherwise known as "indicators")
ln.start_soon(
spawn_fsps,
fan_out_spawn_fsp_daemons,
linkedsplits,
fsp_conf,
sym,
@ -787,6 +868,7 @@ async def display_symbol_data(
)
async with (
maybe_open_vlm_display(linkedsplits, ohlcv),
open_order_mode(
feed,

View File

@ -21,7 +21,6 @@ Text entry "forms" widgets (mostly for configuration and UI user input).
from __future__ import annotations
from contextlib import asynccontextmanager
from functools import partial
from textwrap import dedent
from typing import (
Optional, Any, Callable, Awaitable
)
@ -320,14 +319,14 @@ class FieldsForm(QWidget):
self.vbox = QVBoxLayout(self)
# self.vbox.setAlignment(Qt.AlignVCenter)
self.vbox.setAlignment(Qt.AlignBottom)
self.vbox.setContentsMargins(0, 4, 3, 6)
self.vbox.setContentsMargins(3, 6, 3, 6)
self.vbox.setSpacing(0)
# split layout for the (<label>: |<widget>|) parameters entry
self.form = QFormLayout()
self.form.setAlignment(Qt.AlignTop | Qt.AlignLeft)
self.form.setContentsMargins(0, 0, 0, 0)
self.form.setSpacing(3)
self.form.setContentsMargins(0, 0, 3, 0)
self.form.setSpacing(0)
self.form.setHorizontalSpacing(0)
self.vbox.addLayout(self.form, stretch=1/3)
@ -645,9 +644,7 @@ def mk_fill_status_bar(
# PnL on lhs
bar_labels_lhs = QVBoxLayout()
left_label = form.add_field_label(
dedent("""
{pnl:>+.2%} pnl
"""),
'{pnl:>+.2%} pnl',
font_size=bar_label_font_size,
font_color='gunmetal',
)
@ -674,18 +671,13 @@ def mk_fill_status_bar(
# https://docs.python.org/3/library/string.html#grammar-token-precision
top_label = form.add_field_label(
# {limit:.1f} limit
dedent("""
{limit}
"""),
'{limit}',
font_size=bar_label_font_size,
font_color='gunmetal',
)
bottom_label = form.add_field_label(
dedent("""
x: {step_size}\n
"""),
'x: {step_size}',
font_size=bar_label_font_size,
font_color='gunmetal',
)
@ -788,29 +780,10 @@ def mk_order_pane_layout(
# add pp fill bar + spacing
vbox.addLayout(hbox, stretch=1/3)
# TODO: status labels for brokerd real-time info
# feed_label = form.add_field_label(
# dedent("""
# brokerd.ib\n
# |_@{host}:{port}\n
# |_consumers: {cons}\n
# |_streams: {streams}\n
# |_shms: {streams}\n
# """),
# font_size=_font_small.px_size,
# )
# # add feed info label
# vbox.addWidget(
# feed_label,
# alignment=Qt.AlignBottom,
# # stretch=1/3,
# )
# TODO: handle resize events and appropriately scale this
# to the sidepane height?
# https://doc.qt.io/qt-5/layout.html#adding-widgets-to-a-layout
vbox.setSpacing(_font.px_size * 1.375)
# vbox.setSpacing(_font.px_size * 1.375)
form.show()
return form

View File

@ -23,7 +23,7 @@ from typing import Callable, Optional, Any
import pyqtgraph as pg
from PyQt5 import QtGui, QtWidgets
from PyQt5.QtWidgets import QLabel
from PyQt5.QtWidgets import QLabel, QSizePolicy
from PyQt5.QtCore import QPointF, QRectF, Qt
from ._style import (
@ -269,8 +269,11 @@ class FormatLabel(QLabel):
self.setTextFormat(Qt.MarkdownText) # markdown
self.setMargin(0)
self.setAlignment(
Qt.AlignVCenter
self.setSizePolicy(
QSizePolicy.Expanding,
QSizePolicy.Expanding,
)
self.setAlignment(Qt.AlignVCenter
| Qt.AlignLeft
)
self.setText(self.fmt_str)

View File

@ -36,7 +36,7 @@ from ._anchors import (
pp_tight_and_right, # wanna keep it straight in the long run
gpath_pin,
)
from ..calc import humanize, pnl
from ..calc import humanize, pnl, puterize
from ..clearing._allocate import Allocator, Position
from ..data._normalize import iterticks
from ..data.feed import Feed
@ -208,11 +208,13 @@ class SettingsPane:
size_unit = alloc.size_unit
# WRITE any settings to current pp's allocator
try:
value = puterize(value)
if key == 'limit':
if size_unit == 'currency':
alloc.currency_limit = float(value)
alloc.currency_limit = value
else:
alloc.units_limit = float(value)
alloc.units_limit = value
elif key == 'slots':
alloc.slots = int(value)
@ -222,12 +224,15 @@ class SettingsPane:
# the current settings in the new units
alloc.size_unit = value
elif key != 'account':
else:
raise ValueError(f'Unknown setting {key}')
# READ out settings and update UI
log.info(f'settings change: {key}: {value}')
except ValueError:
log.error(f'Invalid value for `{key}`: {value}')
# READ out settings and update UI
suffix = {'currency': ' $', 'units': ' u'}[size_unit]
limit = alloc.limit()

View File

@ -45,6 +45,7 @@ from ._position import (
PositionTracker,
SettingsPane,
)
from ._label import FormatLabel
from ._window import MultiStatus
from ..clearing._messages import Order
from ._forms import open_form_input_handling
@ -623,6 +624,59 @@ async def open_order_mode(
# setup order mode sidepane widgets
form = chart.sidepane
vbox = form.vbox
from textwrap import dedent
from PyQt5.QtCore import Qt
from ._style import _font, _font_small
from ..calc import humanize
feed_label = FormatLabel(
fmt_str=dedent("""
actor: **{actor_name}**\n
|_ @**{host}:{port}**\n
|_ throttle_hz: **{throttle_rate}**\n
|_ streams: **{symbols}**\n
|_ shm: **{shm}**\n
"""),
font=_font.font,
font_size=_font_small.px_size,
font_color='default_lightest',
)
form.feed_label = feed_label
# add feed info label to top
vbox.insertWidget(
0,
feed_label,
alignment=Qt.AlignBottom,
)
# vbox.setAlignment(feed_label, Qt.AlignBottom)
# vbox.setAlignment(Qt.AlignBottom)
blank_h = chart.height() - (
form.height() +
form.fill_bar.height()
# feed_label.height()
)
vbox.setSpacing((1 + 5/8)*_font.px_size)
# fill in brokerd feed info
host, port = feed.portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
mpshm = feed.shm._shm
shmstr = f'{humanize(mpshm.size)}'
form.feed_label.format(
actor_name=feed.portal.channel.uid[0],
host=host,
port=port,
symbols=len(feed.symbols),
shm=shmstr,
throttle_rate=feed.throttle_rate,
)
order_pane = SettingsPane(
form=form,