Merge pull request #205 from pikers/ordermode_pps

Ordermode pps for gamified chart trading .
fsp_feeds
goodboy 2021-09-06 16:35:26 -04:00 committed by GitHub
commit 73b555a677
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 3946 additions and 1228 deletions

View File

@ -98,12 +98,38 @@ if you want your broker supported and they have an API let us know.
check out our charts
********************
bet you weren't expecting this from the foss bby::
bet you weren't expecting this from the foss::
piker -l info -b kraken -b binance chart btcusdt.binance --pdb
this runs the main chart in in debug mode.
this runs the main chart (currently with 1m sampled OHLC) in in debug
mode and you can practice paper trading using the following
micro-manual:
``order_mode`` (
edge triggered activation by any of the following keys,
``mouse-click`` on y-level to submit at that price
):
- ``f``/ ``ctl-f`` to stage buy
- ``d``/ ``ctl-d`` to stage sell
- ``a`` to stage alert
``search_mode`` (
``ctl-l`` or ``ctl-space`` to open,
``ctl-c`` or ``ctl-space`` to close
) :
- begin typing to have symbol search automatically lookup
symbols from all loaded backend (broker) providers
- arrow keys and mouse click to navigate selection
- vi-like ``ctl-[hjkl]`` for navigation
you can also configure your position allocation limits from the
sidepane.
run in distributed mode
@ -119,10 +145,10 @@ connect your chart::
piker -l info -b kraken -b binance chart xmrusdt.binance --pdb
enjoy persistent real-time data feeds tied to daemon lifetime.
key-bindings and mouse interaction is currently only documented in the
doce base. help us write some docs dawg.
enjoy persistent real-time data feeds tied to daemon lifetime. the next
time you spawn a chart it will load much faster since the data feed has
been cached and is now always running live in the background until you
kill ``pikerd``.
if anyone asks you what this project is about
@ -138,3 +164,5 @@ enter the matrix.
how come there ain't that many docs
***********************************
suck it up, learn the code; no one is trying to sell you on anything.
also, we need lotsa help so if you want to start somewhere and can't
necessarily write serious code, this might be the place for you!

View File

@ -140,7 +140,7 @@ async def maybe_open_ctx(
yield True, value
except KeyError:
log.info(f'Allocating new feed for {key}')
log.info(f'Allocating new resource for {key}')
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler

View File

@ -418,6 +418,7 @@ async def stream_quotes(
# just directly pick out the info we need
si['price_tick_size'] = syminfo.filters[0]['tickSize']
si['lot_tick_size'] = syminfo.filters[2]['stepSize']
si['asset_type'] = 'crypto'
symbol = symbols[0]

View File

@ -20,6 +20,7 @@ Broker configuration mgmt.
import os
from os.path import dirname
import shutil
from typing import Optional
import toml
import click
@ -101,3 +102,21 @@ def write(
log.debug(f"Writing config file {path}")
with open(path, 'w') as cf:
return toml.dump(config, cf)
def load_accounts() -> dict[str, Optional[str]]:
# our default paper engine entry
accounts: dict[str, Optional[str]] = {'paper': None}
conf, path = load()
section = conf.get('accounts')
if section is None:
log.warning('No accounts config found?')
else:
for brokername, account_labels in section.items():
for name, value in account_labels.items():
accounts[f'{brokername}.{name}'] = value
return accounts

View File

@ -196,6 +196,8 @@ _adhoc_futes_set = {
'mgc.nymex',
'xagusd.cmdty', # silver spot
'ni.nymex', # silver futes
'qi.comex', # mini-silver futes
}
# exchanges we don't support at the moment due to not knowing
@ -1295,10 +1297,14 @@ def pack_position(pos: Position) -> dict[str, Any]:
else:
symbol = con.symbol
symkey = '.'.join([
symbol.lower(),
(con.primaryExchange or con.exchange).lower(),
])
return BrokerdPosition(
broker='ib',
account=pos.account,
symbol=symbol,
symbol=symkey,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),

View File

@ -439,6 +439,7 @@ async def stream_quotes(
syminfo = si.dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto'
sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname

View File

@ -21,29 +21,52 @@ import math
import itertools
def humanize(number, digits=1):
"""Convert large numbers to something with at most 3 digits and
def humanize(
number: float,
digits: int = 1
) -> str:
'''Convert large numbers to something with at most ``digits`` and
a letter suffix (eg. k: thousand, M: million, B: billion).
"""
'''
try:
float(number)
except ValueError:
return 0
if not number or number <= 0:
return number
return round(number, ndigits=digits)
mag2suffix = {3: 'k', 6: 'M', 9: 'B'}
mag = math.floor(math.log(number, 10))
if mag < 3:
return number
return round(number, ndigits=digits)
maxmag = max(itertools.takewhile(lambda key: mag >= key, mag2suffix))
return "{:.{digits}f}{}".format(
number/10**maxmag, mag2suffix[maxmag], digits=digits)
return "{value}{suffix}".format(
value=round(number/10**maxmag, ndigits=digits),
suffix=mag2suffix[maxmag],
)
def percent_change(init, new):
"""Calcuate the percentage change of some ``new`` value
def pnl(
init: float,
new: float,
) -> float:
'''Calcuate the percentage change of some ``new`` value
from some initial value, ``init``.
"""
'''
if not (init and new):
return 0
return (new - init) / init * 100.
return (new - init) / init
def percent_change(
init: float,
new: float,
) -> float:
return pnl(init, new) * 100.

View File

@ -58,35 +58,20 @@ class OrderBook:
_ready_to_receive: trio.Event = trio.Event()
def send(
self,
uuid: str,
symbol: str,
brokers: list[str],
price: float,
size: float,
action: str,
exec_mode: str,
msg: Order,
) -> dict:
msg = Order(
action=action,
price=price,
size=size,
symbol=symbol,
brokers=brokers,
oid=uuid,
exec_mode=exec_mode, # dark or live
)
self._sent_orders[uuid] = msg
self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict())
return msg
def update(
self,
uuid: str,
**data: dict,
) -> dict:
cmd = self._sent_orders[uuid]
msg = cmd.dict()

View File

@ -576,7 +576,8 @@ async def translate_and_relay_brokerd_events(
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
if entry.action == 'cancel':
action = getattr(entry, 'action', None)
if action and action == 'cancel':
# assign newly providerd broker backend request id
entry.reqid = reqid
@ -796,11 +797,10 @@ async def process_client_order_cmds(
# sanity check on emsd id
assert live_entry.oid == oid
reqid = live_entry.reqid
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(
f"Modifying live {broker} order: {live_entry.reqid}")
log.info(f"Modifying live {broker} order: {reqid}")
msg = BrokerdOrder(
oid=oid, # no ib support for oids...
@ -966,10 +966,10 @@ async def _emsd_main(
):
# XXX: this should be initial price quote from target provider
first_quote = feed.first_quote
first_quote = feed.first_quotes[symbol]
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
book.lasts[(broker, symbol)] = first_quote['last']
# open a stream with the brokerd backend for order
# flow dialogue

View File

@ -24,6 +24,8 @@ from typing import Optional, Union
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol
# Client -> emsd
@ -42,7 +44,7 @@ class Order(BaseModel):
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: str
symbol: Union[str, Symbol]
price: float
size: float
@ -56,6 +58,13 @@ class Order(BaseModel):
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# Client <- emsd
# update msgs from ems which relay state change info
@ -81,8 +90,6 @@ class Status(BaseModel):
# 'alert_submitted',
# 'alert_triggered',
# 'position',
# }
resp: str # "response", see above

View File

@ -151,7 +151,12 @@ async def iter_ohlc_periods(
# stream and block until cancelled
await trio.sleep_forever()
finally:
subs.remove(ctx)
try:
subs.remove(ctx)
except ValueError:
log.error(
f'iOHLC step stream was already dropped for {ctx.chan.uid}?'
)
async def sample_and_broadcast(
@ -233,16 +238,23 @@ async def sample_and_broadcast(
# thus other consumers still attached.
subs = bus._subscribers[sym.lower()]
lags = 0
for (stream, tick_throttle) in subs:
try:
if tick_throttle:
# this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below.
await stream.send(quote)
with trio.move_on_after(0.2) as cs:
if tick_throttle:
# this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below.
await stream.send(quote)
else:
await stream.send({sym: quote})
else:
await stream.send({sym: quote})
if cs.cancelled_caught:
lags += 1
if lags > 10:
await tractor.breakpoint()
except (
trio.BrokenResourceError,

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# Copyright (C) 2018-present Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -22,7 +22,7 @@ import decimal
import numpy as np
import pandas as pd
from pydantic import BaseModel
from pydantic import BaseModel, validate_arguments
# from numba import from_dtype
@ -62,6 +62,9 @@ tf_in_1m = {
def float_digits(
value: float,
) -> int:
if value == 0:
return 0
return int(-decimal.Decimal(str(value)).as_tuple().exponent)
@ -82,28 +85,20 @@ class Symbol(BaseModel):
Yah, i guess dats what it izz.
"""
key: str
tick_size: float = 0.01
lot_tick_size: float = 0.01 # "volume" precision as min step value
type_key: str # {'stock', 'forex', 'future', ... etc.}
tick_size: float
lot_tick_size: float # "volume" precision as min step value
tick_size_digits: int
lot_size_digits: int
broker_info: Dict[str, Dict[str, Any]] = {}
# specifies a "class" of financial instrument
# ex. stock, futer, option, bond etc.
type_key: str
@property
def brokers(self) -> List[str]:
return list(self.broker_info.keys())
def digits(self) -> int:
"""Return the trailing number of digits specified by the min
tick size for the instrument.
"""
return float_digits(self.tick_size)
def lot_digits(self) -> int:
return float_digits(self.lot_tick_size)
def nearest_tick(self, value: float) -> float:
"""Return the nearest tick value based on mininum increment.
@ -111,6 +106,30 @@ class Symbol(BaseModel):
mult = 1 / self.tick_size
return round(value * mult) / mult
@validate_arguments
def mk_symbol(
key: str,
type_key: str,
tick_size: float = 0.01,
lot_tick_size: float = 0,
broker_info: dict[str, Any] = {},
) -> Symbol:
'''Create and return an instrument description for the
"symbol" named as ``key``.
'''
return Symbol(
key=key,
type_key=type_key,
tick_size=tick_size,
lot_tick_size=lot_tick_size,
tick_size_digits=float_digits(tick_size),
lot_size_digits=float_digits(lot_tick_size),
broker_info=broker_info,
)
def from_df(
df: pd.DataFrame,

View File

@ -49,7 +49,7 @@ from ._sharedmem import (
ShmArray,
)
from .ingest import get_ingestormod
from ._source import base_iohlc_dtype, Symbol
from ._source import base_iohlc_dtype, mk_symbol, Symbol
from ..ui import _search
from ._sampling import (
_shms,
@ -192,7 +192,7 @@ async def allocate_persistent_feed(
# establish broker backend quote stream
# ``stream_quotes()`` is a required backend func
init_msg, first_quote = await bus.nursery.start(
init_msg, first_quotes = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
@ -212,7 +212,7 @@ async def allocate_persistent_feed(
# XXX: the ``symbol`` here is put into our native piker format (i.e.
# lower case).
bus.feeds[symbol.lower()] = (cs, init_msg, first_quote)
bus.feeds[symbol.lower()] = (cs, init_msg, first_quotes)
if opened:
# start history backfill task ``backfill_bars()`` is
@ -227,7 +227,7 @@ async def allocate_persistent_feed(
init_msg[symbol]['sample_rate'] = int(delay_s)
# yield back control to starting nursery
task_status.started((init_msg, first_quote))
task_status.started((init_msg, first_quotes))
await feed_is_live.wait()
@ -277,7 +277,7 @@ async def attach_feed_bus(
# service nursery
async with bus.task_lock:
if entry is None:
init_msg, first_quote = await bus.nursery.start(
init_msg, first_quotes = await bus.nursery.start(
partial(
allocate_persistent_feed,
@ -294,13 +294,13 @@ async def attach_feed_bus(
)
assert isinstance(bus.feeds[symbol], tuple)
# XXX: ``first_quote`` may be outdated here if this is secondary
# XXX: ``first_quotes`` may be outdated here if this is secondary
# subscriber
cs, init_msg, first_quote = bus.feeds[symbol]
cs, init_msg, first_quotes = bus.feeds[symbol]
# send this even to subscribers to existing feed?
# deliver initial info message a first quote asap
await ctx.started((init_msg, first_quote))
await ctx.started((init_msg, first_quotes))
async with (
ctx.open_stream() as stream,
@ -392,7 +392,7 @@ class Feed:
name: str
shm: ShmArray
mod: ModuleType
first_quote: dict
first_quotes: dict # symbol names to first quote dicts
stream: trio.abc.ReceiveChannel[dict[str, Any]]
_brokerd_portal: tractor._portal.Portal
@ -509,7 +509,7 @@ async def open_feed(
tick_throttle=tick_throttle,
) as (ctx, (init_msg, first_quote)),
) as (ctx, (init_msg, first_quotes)),
ctx.open_stream() as stream,
@ -524,7 +524,7 @@ async def open_feed(
name=brokername,
shm=shm,
mod=mod,
first_quote=first_quote,
first_quotes=first_quotes,
stream=stream,
_brokerd_portal=portal,
)
@ -535,7 +535,7 @@ async def open_feed(
si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol(
symbol = mk_symbol(
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),

View File

@ -0,0 +1,153 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Anchor funtions for UI placement of annotions.
'''
from typing import Callable
from PyQt5.QtCore import QPointF
from PyQt5.QtWidgets import QGraphicsPathItem
from ._label import Label
def marker_right_points(
chart: 'ChartPlotWidget', # noqa
marker_size: int = 20,
) -> (float, float, float):
'''Return x-dimension, y-axis-aware, level-line marker oriented scene values.
X values correspond to set the end of a level line, end of
a paried level line marker, and the right most side of the "right"
axis respectively.
'''
# TODO: compute some sensible maximum value here
# and use a humanized scheme to limit to that length.
l1_len = chart._max_l1_line_len
ryaxis = chart.getAxis('right')
r_axis_x = ryaxis.pos().x()
up_to_l1_sc = r_axis_x - l1_len - 10
marker_right = up_to_l1_sc - (1.375 * 2 * marker_size)
line_end = marker_right - (6/16 * marker_size)
return line_end, marker_right, r_axis_x
def vbr_left(
label: Label,
) -> Callable[..., float]:
"""Return a closure which gives the scene x-coordinate for the
leftmost point of the containing view box.
"""
return label.vbr().left
def right_axis(
chart: 'ChartPlotWidget', # noqa
label: Label,
side: str = 'left',
offset: float = 10,
avoid_book: bool = True,
# width: float = None,
) -> Callable[..., float]:
'''Return a position closure which gives the scene x-coordinate for
the x point on the right y-axis minus the width of the label given
it's contents.
'''
ryaxis = chart.getAxis('right')
if side == 'left':
if avoid_book:
def right_axis_offset_by_w() -> float:
# l1 spread graphics x-size
l1_len = chart._max_l1_line_len
# sum of all distances "from" the y-axis
right_offset = l1_len + label.w + offset
return ryaxis.pos().x() - right_offset
else:
def right_axis_offset_by_w() -> float:
return ryaxis.pos().x() - (label.w + offset)
return right_axis_offset_by_w
elif 'right':
# axis_offset = ryaxis.style['tickTextOffset'][0]
def on_axis() -> float:
return ryaxis.pos().x() # + axis_offset - 2
return on_axis
def gpath_pin(
gpath: QGraphicsPathItem,
label: Label, # noqa
location_description: str = 'right-of-path-centered',
use_right_of_pp_label: bool = False,
) -> QPointF:
# get actual arrow graphics path
path_br = gpath.mapToScene(gpath.path()).boundingRect()
# label.vb.locate(label.txt) #, children=True)
if location_description == 'right-of-path-centered':
return path_br.topRight() - QPointF(label.h/16, label.h / 3)
if location_description == 'left-of-path-centered':
return path_br.topLeft() - QPointF(label.w, label.h / 6)
elif location_description == 'below-path-left-aligned':
return path_br.bottomLeft() - QPointF(0, label.h / 6)
elif location_description == 'below-path-right-aligned':
return path_br.bottomRight() - QPointF(label.w, label.h / 6)
def pp_tight_and_right(
label: Label
) -> QPointF:
'''Place *just* right of the pp label.
'''
txt = label.txt
return label.txt.pos() + QPointF(label.w - label.h/3, 0)

View File

@ -18,18 +18,20 @@
Annotations for ur faces.
"""
import PyQt5
from PyQt5 import QtCore, QtGui
from typing import Callable, Optional
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QPointF, QRectF
from PyQt5.QtWidgets import QGraphicsPathItem
from pyqtgraph import Point, functions as fn, Color
import numpy as np
from ._anchors import marker_right_points
def mk_marker(
style,
size: float = 20.0,
use_qgpath: bool = True,
def mk_marker_path(
style: str,
) -> QGraphicsPathItem:
"""Add a marker to be displayed on the line wrapped in a ``QGraphicsPathItem``
@ -39,7 +41,7 @@ def mk_marker(
style String indicating the style of marker to add:
``'<|'``, ``'|>'``, ``'>|'``, ``'|<'``, ``'<|>'``,
``'>|<'``, ``'^'``, ``'v'``, ``'o'``
size Size of the marker in pixels. Default is 10.0.
size Size of the marker in pixels.
"""
path = QtGui.QPainterPath()
@ -83,13 +85,148 @@ def mk_marker(
# self._maxMarkerSize = max([m[2] / 2. for m in self.markers])
if use_qgpath:
path = QGraphicsPathItem(path)
path.scale(size, size)
return path
class LevelMarker(QGraphicsPathItem):
'''An arrow marker path graphich which redraws itself
to the specified view coordinate level on each paint cycle.
'''
def __init__(
self,
chart: 'ChartPlotWidget', # noqa
style: str,
get_level: Callable[..., float],
size: float = 20,
keep_in_view: bool = True,
on_paint: Optional[Callable] = None,
) -> None:
# get polygon and scale
super().__init__()
self.scale(size, size)
# interally generates path
self._style = None
self.style = style
self.chart = chart
self.get_level = get_level
self._on_paint = on_paint
self.scene_x = lambda: marker_right_points(chart)[1]
self.level: float = 0
self.keep_in_view = keep_in_view
@property
def style(self) -> str:
return self._style
@style.setter
def style(self, value: str) -> None:
if self._style != value:
polygon = mk_marker_path(value)
self.setPath(polygon)
self._style = value
def path_br(self) -> QRectF:
'''Return the bounding rect for the opaque path part
of this item.
'''
return self.mapToScene(
self.path()
).boundingRect()
def delete(self) -> None:
self.scene().removeItem(self)
@property
def h(self) -> float:
return self.path_br().height()
@property
def w(self) -> float:
return self.path_br().width()
def position_in_view(
self,
# level: float,
) -> None:
'''Show a pp off-screen indicator for a level label.
This is like in fps games where you have a gps "nav" indicator
but your teammate is outside the range of view, except in 2D, on
the y-dimension.
'''
level = self.get_level()
view = self.chart.getViewBox()
vr = view.state['viewRange']
ymn, ymx = vr[1]
# _, marker_right, _ = marker_right_points(line._chart)
x = self.scene_x()
if self.style == '>|': # short style, points "down-to" line
top_offset = self.h
bottom_offset = 0
else:
top_offset = 0
bottom_offset = self.h
if level > ymx: # pin to top of view
self.setPos(
QPointF(
x,
top_offset + self.h/3,
)
)
elif level < ymn: # pin to bottom of view
self.setPos(
QPointF(
x,
view.height() - (bottom_offset + self.h/3),
)
)
else:
# pp line is viewable so show marker normally
self.setPos(
x,
self.chart.view.mapFromView(
QPointF(0, self.get_level())
).y()
)
def paint(
self,
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget
) -> None:
'''Core paint which we override to always update
our marker position in scene coordinates from a
view cooridnate "level".
'''
if self.keep_in_view:
self.position_in_view()
super().paint(p, opt, w)
if self._on_paint:
self._on_paint(self)
def qgo_draw_markers(
markers: list,

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,7 @@ from ._style import (
_xaxis_at,
hcolor,
_font_small,
_font,
)
from ._axes import YAxisLabel, XAxisLabel
from ..log import get_logger
@ -41,8 +42,9 @@ log = get_logger(__name__)
# XXX: these settings seem to result in really decent mouse scroll
# latency (in terms of perceived lag in cross hair) so really be sure
# there's an improvement if you want to change it!
_mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 2e3
_mouse_rate_limit = 120 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 40
_ch_label_opac = 1
@ -52,13 +54,18 @@ class LineDot(pg.CurvePoint):
def __init__(
self,
curve: pg.PlotCurveItem,
index: int,
plot: 'ChartPlotWidget', # type: ingore # noqa
pos=None,
size: int = 6, # in pxs
color: str = 'default_light',
) -> None:
# scale from dpi aware font size
size = int(_font.px_size * 0.375)
pg.CurvePoint.__init__(
self,
curve,
@ -88,7 +95,9 @@ class LineDot(pg.CurvePoint):
def event(
self,
ev: QtCore.QEvent,
) -> None:
if not isinstance(
ev, QtCore.QDynamicPropertyChangeEvent
@ -132,8 +141,8 @@ class ContentsLabel(pg.LabelItem):
}
def __init__(
self,
# chart: 'ChartPlotWidget', # noqa
view: pg.ViewBox,
@ -167,8 +176,8 @@ class ContentsLabel(pg.LabelItem):
self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc(
self,
name: str,
index: int,
array: np.ndarray,
@ -194,8 +203,8 @@ class ContentsLabel(pg.LabelItem):
)
def update_from_value(
self,
name: str,
index: int,
array: np.ndarray,
@ -239,6 +248,7 @@ class ContentsLabels:
if not (index >= 0 and index < chart._arrays['ohlc'][-1]['index']):
# out of range
print('out of range?')
continue
array = chart._arrays[name]
@ -272,13 +282,15 @@ class ContentsLabels:
self._labels.append(
(chart, name, label, partial(update_func, label, name))
)
# label.hide()
label.hide()
return label
class Cursor(pg.GraphicsObject):
'''Multi-plot cursor for use on a ``LinkedSplits`` chart (set).
'''
def __init__(
self,

View File

@ -28,7 +28,7 @@ from PyQt5.QtCore import QPointF
import numpy as np
from ._style import hcolor, _font
from ._lines import order_line, LevelLine
from ._lines import LevelLine
from ..log import get_logger
@ -97,69 +97,21 @@ class LineEditor:
def stage_line(
self,
action: str,
line: LevelLine,
color: str = 'alert_yellow',
hl_on_hover: bool = False,
dotted: bool = False,
# fields settings
size: Optional[int] = None,
) -> LevelLine:
"""Stage a line at the current chart's cursor position
and return it.
"""
# chart.setCursor(QtCore.Qt.PointingHandCursor)
cursor = self.chart.linked.cursor
if not cursor:
return None
chart = cursor.active_plot
y = cursor._datum_xy[1]
symbol = chart._lc.symbol
# add a "staged" cursor-tracking line to view
# and cash it in a a var
if self._active_staged_line:
self.unstage_line()
line = order_line(
chart,
level=y,
level_digits=symbol.digits(),
size=size,
size_digits=symbol.lot_digits(),
# just for the stage line to avoid
# flickering while moving the cursor
# around where it might trigger highlight
# then non-highlight depending on sensitivity
always_show_labels=True,
# kwargs
color=color,
# don't highlight the "staging" line
hl_on_hover=hl_on_hover,
dotted=dotted,
exec_type='dark' if dotted else 'live',
action=action,
show_markers=True,
# prevent flickering of marker while moving/tracking cursor
only_show_markers_on_hover=False,
)
self._active_staged_line = line
# hide crosshair y-line and label
cursor.hide_xhair()
# add line to cursor trackers
cursor._trackers.add(line)
return line
def unstage_line(self) -> LevelLine:
@ -181,41 +133,17 @@ class LineEditor:
# show the crosshair y line and label
cursor.show_xhair()
def create_order_line(
def submit_line(
self,
line: LevelLine,
uuid: str,
level: float,
chart: 'ChartPlotWidget', # noqa
size: float,
action: str,
) -> LevelLine:
line = self._active_staged_line
if not line:
staged_line = self._active_staged_line
if not staged_line:
raise RuntimeError("No line is currently staged!?")
sym = chart._lc.symbol
line = order_line(
chart,
# label fields default values
level=level,
level_digits=sym.digits(),
size=size,
size_digits=sym.lot_digits(),
# LevelLine kwargs
color=line.color,
dotted=line._dotted,
show_markers=True,
only_show_markers_on_hover=True,
action=action,
)
# for now, until submission reponse arrives
line.hide_labels()

View File

@ -18,13 +18,64 @@
Qt event proxying and processing using ``trio`` mem chans.
"""
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, AsyncExitStack
from typing import Callable
from PyQt5 import QtCore
from PyQt5.QtCore import QEvent
from PyQt5.QtWidgets import QWidget
from pydantic import BaseModel
import trio
from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal
from PyQt5.QtWidgets import QWidget
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
MOUSE_EVENTS = {
gs_mouse.GraphicsSceneMousePress,
gs_mouse.GraphicsSceneMouseRelease,
QEvent.MouseButtonPress,
QEvent.MouseButtonRelease,
# QtGui.QMouseEvent,
}
# TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel):
'''Unpacked Qt keyboard event data.
'''
class Config:
arbitrary_types_allowed = True
event: QEvent
etype: int
key: int
mods: int
txt: str
def to_tuple(self) -> tuple:
return tuple(self.dict().values())
class MouseMsg(BaseModel):
'''Unpacked Qt keyboard event data.
'''
class Config:
arbitrary_types_allowed = True
event: QEvent
etype: int
button: int
# TODO: maybe add some methods to detect key combos? Or is that gonna be
# better with pattern matching?
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
class EventRelay(QtCore.QObject):
@ -38,8 +89,10 @@ class EventRelay(QtCore.QObject):
def eventFilter(
self,
source: QWidget,
ev: QEvent,
) -> None:
'''
Qt global event filter: return `False` to pass through and `True`
@ -50,48 +103,57 @@ class EventRelay(QtCore.QObject):
'''
etype = ev.type()
# print(f'etype: {etype}')
# TODO: turn this on and see what we can filter by default (such
# as mouseWheelEvent).
# print(f'ev: {ev}')
if etype in self._event_types:
# ev.accept()
# TODO: what's the right way to allow this?
# if ev.isAutoRepeat():
# ev.ignore()
# XXX: we unpack here because apparently doing it
# after pop from the mem chan isn't showing the same
# event object? no clue wtf is going on there, likely
# something to do with Qt internals and calling the
# parent handler?
if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
# TODO: is there a global setting for this?
if ev.isAutoRepeat() and self._filter_auto_repeats:
ev.ignore()
return True
key = ev.key()
mods = ev.modifiers()
txt = ev.text()
# NOTE: the event object instance coming out
# the other side is mutated since Qt resumes event
# processing **before** running a ``trio`` guest mode
# tick, thus special handling or copying must be done.
# send elements to async handler
self._send_chan.send_nowait((ev, etype, key, mods, txt))
else:
# send event to async handler
self._send_chan.send_nowait(ev)
# **do not** filter out this event
# and instead forward to the source widget
if etype not in self._event_types:
return False
# XXX: we unpack here because apparently doing it
# after pop from the mem chan isn't showing the same
# event object? no clue wtf is going on there, likely
# something to do with Qt internals and calling the
# parent handler?
if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
msg = KeyboardMsg(
event=ev,
etype=etype,
key=ev.key(),
mods=ev.modifiers(),
txt=ev.text(),
)
# TODO: is there a global setting for this?
if ev.isAutoRepeat() and self._filter_auto_repeats:
ev.ignore()
return True
# NOTE: the event object instance coming out
# the other side is mutated since Qt resumes event
# processing **before** running a ``trio`` guest mode
# tick, thus special handling or copying must be done.
elif etype in MOUSE_EVENTS:
# print('f mouse event: {ev}')
msg = MouseMsg(
event=ev,
etype=etype,
button=ev.button(),
)
else:
msg = ev
# send event-msg to async handler
self._send_chan.send_nowait(msg)
# **do not** filter out this event
# and instead forward to the source widget
return False
# filter out this event
# https://doc.qt.io/qt-5/qobject.html#installEventFilter
return False
@ -124,9 +186,34 @@ async def open_event_stream(
@asynccontextmanager
async def open_handler(
async def open_signal_handler(
source_widget: QWidget,
signal: pyqtBoundSignal,
async_handler: Callable,
) -> trio.abc.ReceiveChannel:
send, recv = trio.open_memory_channel(0)
def proxy_args_to_chan(*args):
send.send_nowait(args)
signal.connect(proxy_args_to_chan)
async def proxy_to_handler():
async for args in recv:
await async_handler(*args)
async with trio.open_nursery() as n:
n.start_soon(proxy_to_handler)
async with send:
yield
@asynccontextmanager
async def open_handlers(
source_widgets: list[QWidget],
event_types: set[QEvent],
async_handler: Callable[[QWidget, trio.abc.ReceiveChannel], None],
**kwargs,
@ -135,7 +222,13 @@ async def open_handler(
async with (
trio.open_nursery() as n,
open_event_stream(source_widget, event_types, **kwargs) as event_recv_stream,
AsyncExitStack() as stack,
):
n.start_soon(async_handler, source_widget, event_recv_stream)
for widget in source_widgets:
event_recv_stream = await stack.enter_async_context(
open_event_stream(widget, event_types, **kwargs)
)
n.start_soon(async_handler, widget, event_recv_stream)
yield

View File

@ -99,6 +99,9 @@ def run_qtractor(
# "This is substantially faster than using a signal... for some
# reason Qt signal dispatch is really slow (and relies on events
# underneath anyway, so this is strictly less work)."
# source gist and credit to njs:
# https://gist.github.com/njsmith/d996e80b700a339e0623f97f48bcf0cb
REENTER_EVENT = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
class ReenterEvent(QtCore.QEvent):

727
piker/ui/_forms.py 100644
View File

@ -0,0 +1,727 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Text entry "forms" widgets (mostly for configuration and UI user input).
'''
from __future__ import annotations
from contextlib import asynccontextmanager
from functools import partial
from textwrap import dedent
from typing import (
Optional, Any, Callable, Awaitable
)
import trio
from PyQt5 import QtGui
from PyQt5.QtCore import QSize, QModelIndex, Qt, QEvent
from PyQt5.QtWidgets import (
QWidget,
QLabel,
QComboBox,
QLineEdit,
QHBoxLayout,
QVBoxLayout,
QFormLayout,
QProgressBar,
QSizePolicy,
QStyledItemDelegate,
QStyleOptionViewItem,
)
# import pydantic
from ._event import open_handlers
from ._style import hcolor, _font, _font_small, DpiAwareFont
from ._label import FormatLabel
from .. import brokers
class FontAndChartAwareLineEdit(QLineEdit):
def __init__(
self,
parent: QWidget,
# parent_chart: QWidget, # noqa
font: DpiAwareFont = _font,
width_in_chars: int = None,
) -> None:
# self.setContextMenuPolicy(Qt.CustomContextMenu)
# self.customContextMenuRequested.connect(self.show_menu)
# self.setStyleSheet(f"font: 18px")
self.dpi_font = font
# self.godwidget = parent_chart
if width_in_chars:
self._chars = int(width_in_chars)
else:
# chart count which will be used to calculate
# width of input field.
self._chars: int = 9
super().__init__(parent)
# size it as we specify
# https://doc.qt.io/qt-5/qsizepolicy.html#Policy-enum
self.setSizePolicy(
QSizePolicy.Expanding,
QSizePolicy.Fixed,
)
self.setFont(font.font)
# witty bit of margin
self.setTextMargins(2, 2, 2, 2)
def sizeHint(self) -> QSize:
"""
Scale edit box to size of dpi aware font.
"""
psh = super().sizeHint()
dpi_font = self.dpi_font
psh.setHeight(dpi_font.px_size)
# space for ``._chars: int``
char_w_pxs = dpi_font.boundingRect(self.text()).width()
chars_w = char_w_pxs + 6 # * dpi_font.scale() * self._chars
psh.setWidth(chars_w)
return psh
def set_width_in_chars(
self,
chars: int,
) -> None:
self._chars = chars
self.sizeHint()
self.update()
def focus(self) -> None:
self.selectAll()
self.show()
self.setFocus()
class FontScaledDelegate(QStyledItemDelegate):
'''
Super simple view delegate to render text in the same
font size as the search widget.
'''
def __init__(
self,
parent=None,
font: DpiAwareFont = _font,
) -> None:
super().__init__(parent)
self.dpi_font = font
def sizeHint(
self,
option: QStyleOptionViewItem,
index: QModelIndex,
) -> QSize:
# value = index.data()
# br = self.dpi_font.boundingRect(value)
# w, h = br.width(), br.height()
parent = self.parent()
if getattr(parent, '_max_item_size', None):
return QSize(*self.parent()._max_item_size)
else:
return super().sizeHint(option, index)
# slew of resources which helped get this where it is:
# https://stackoverflow.com/questions/20648210/qcombobox-adjusttocontents-changing-height
# https://stackoverflow.com/questions/3151798/how-do-i-set-the-qcombobox-width-to-fit-the-largest-item
# https://stackoverflow.com/questions/6337589/qlistwidget-adjust-size-to-content#6370892
# https://stackoverflow.com/questions/25304267/qt-resize-of-qlistview
# https://stackoverflow.com/questions/28227406/how-to-set-qlistview-rows-height-permanently
class FieldsForm(QWidget):
vbox: QVBoxLayout
form: QFormLayout
def __init__(
self,
parent=None,
) -> None:
super().__init__(parent)
# size it as we specify
self.setSizePolicy(
QSizePolicy.Expanding,
QSizePolicy.Expanding,
)
# XXX: not sure why we have to create this here exactly
# (instead of in the pane creation routine) but it's
# here and is managed by downstream layout routines.
# best guess is that you have to create layouts in order
# of hierarchy in order for things to display correctly?
# TODO: we may want to hand this *down* from some "pane manager"
# thing eventually?
self.vbox = QVBoxLayout(self)
# self.vbox.setAlignment(Qt.AlignVCenter)
self.vbox.setAlignment(Qt.AlignBottom)
self.vbox.setContentsMargins(0, 4, 3, 6)
self.vbox.setSpacing(0)
# split layout for the (<label>: |<widget>|) parameters entry
self.form = QFormLayout()
self.form.setAlignment(Qt.AlignTop | Qt.AlignLeft)
self.form.setContentsMargins(0, 0, 0, 0)
self.form.setSpacing(3)
self.form.setHorizontalSpacing(0)
self.vbox.addLayout(self.form, stretch=1/3)
self.labels: dict[str, QLabel] = {}
self.fields: dict[str, QWidget] = {}
self._font_size = _font_small.px_size - 2
self._max_item_width: (float, float) = 0, 0
def add_field_label(
self,
name: str,
font_size: Optional[int] = None,
font_color: str = 'default_lightest',
) -> QtGui.QLabel:
# add label to left of search bar
# self.label = label = QtGui.QLabel()
font_size = font_size or self._font_size - 1
self.label = label = FormatLabel(
fmt_str=name,
font=_font.font,
font_size=font_size,
font_color=font_color,
)
# for later lookup
self.labels[name] = label
return label
def add_edit_field(
self,
key: str,
label_name: str,
value: str,
) -> FontAndChartAwareLineEdit:
# TODO: maybe a distint layout per "field" item?
label = self.add_field_label(label_name)
edit = FontAndChartAwareLineEdit(
parent=self,
)
edit.setStyleSheet(
f"""QLineEdit {{
color : {hcolor('gunmetal')};
font-size : {self._font_size}px;
}}
"""
)
edit.setText(str(value))
self.form.addRow(label, edit)
self.fields[key] = edit
return edit
def add_select_field(
self,
key: str,
label_name: str,
values: list[str],
) -> QComboBox:
# TODO: maybe a distint layout per "field" item?
label = self.add_field_label(label_name)
select = QComboBox(self)
select._key = key
for i, value in enumerate(values):
select.insertItem(i, str(value))
select.setStyleSheet(
f"""QComboBox {{
color : {hcolor('gunmetal')};
font-size : {self._font_size}px;
}}
"""
)
select.setSizeAdjustPolicy(QComboBox.AdjustToContents)
select.setIconSize(QSize(0, 0))
self.setSizePolicy(
QSizePolicy.Fixed,
QSizePolicy.Fixed,
)
view = select.view()
view.setUniformItemSizes(True)
view.setItemDelegate(FontScaledDelegate(view))
# compute maximum item size so that the weird
# "style item delegate" thing can then specify
# that size on each item...
values.sort()
br = _font.boundingRect(str(values[-1]))
_, h = br.width(), br.height()
# TODO: something better then this monkey patch
# view._max_item_size = w, h
# limit to 6 items?
view.setMaximumHeight(6*h)
# one entry in view
select.setMinimumHeight(h)
select.show()
self.form.addRow(label, select)
self.fields[key] = select
return select
async def handle_field_input(
widget: QWidget,
recv_chan: trio.abc.ReceiveChannel,
form: FieldsForm,
on_value_change: Callable[[str, Any], Awaitable[bool]],
focus_next: QWidget,
) -> None:
async for kbmsg in recv_chan:
if kbmsg.etype in {QEvent.KeyPress, QEvent.KeyRelease}:
event, etype, key, mods, txt = kbmsg.to_tuple()
print(f'key: {kbmsg.key}, mods: {kbmsg.mods}, txt: {kbmsg.txt}')
# default controls set
ctl = False
if kbmsg.mods == Qt.ControlModifier:
ctl = True
if ctl and key in { # cancel and refocus
Qt.Key_C,
Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt,
}:
widget.clearFocus()
# normally the godwidget
focus_next.focus()
continue
# process field input
if key in (Qt.Key_Enter, Qt.Key_Return):
key = widget._key
value = widget.text()
on_value_change(key, value)
def mk_form(
parent: QWidget,
fields_schema: dict,
font_size: Optional[int] = None,
) -> FieldsForm:
# TODO: generate components from model
# instead of schema dict (aka use an ORM)
form = FieldsForm(parent=parent)
form._font_size = font_size or _font_small.px_size
# generate sub-components from schema dict
for key, config in fields_schema.items():
wtype = config['type']
label = str(config.get('label', key))
# plain (line) edit field
if wtype == 'edit':
w = form.add_edit_field(
key,
label,
config['default_value']
)
# drop-down selection
elif wtype == 'select':
values = list(config['default_value'])
w = form.add_select_field(
key,
label,
values
)
w._key = key
return form
@asynccontextmanager
async def open_form_input_handling(
form: FieldsForm,
focus_next: QWidget,
on_value_change: Callable[[str, Any], Awaitable[bool]],
) -> FieldsForm:
# assert form.model, f'{form} must define a `.model`'
async with open_handlers(
list(form.fields.values()),
event_types={
QEvent.KeyPress,
},
async_handler=partial(
handle_field_input,
form=form,
focus_next=focus_next,
on_value_change=on_value_change,
),
# block key repeats?
filter_auto_repeats=True,
):
yield form
class FillStatusBar(QProgressBar):
'''A status bar for fills up to a position limit.
'''
border_px: int = 2
slot_margin_px: int = 2
def __init__(
self,
approx_height_px: float,
width_px: float,
font_size: int,
parent=None
) -> None:
super().__init__(parent=parent)
self.approx_h = approx_height_px
self.font_size = font_size
self.setFormat('') # label format
self.setMinimumWidth(width_px)
self.setMaximumWidth(width_px)
def set_slots(
self,
slots: int,
value: float,
) -> None:
approx_h = self.approx_h
# TODO: compute "used height" thus far and mostly fill the rest
tot_slot_h, r = divmod(
approx_h,
slots,
)
clipped = slots * tot_slot_h + 2*self.border_px
self.setMaximumHeight(clipped)
slot_height_px = tot_slot_h - 2*self.slot_margin_px
self.setOrientation(Qt.Vertical)
self.setStyleSheet(
f"""
QProgressBar {{
text-align: center;
font-size : {self.font_size - 2}px;
background-color: {hcolor('papas_special')};
color : {hcolor('papas_special')};
border: {self.border_px}px solid {hcolor('default_light')};
border-radius: 2px;
}}
QProgressBar::chunk {{
background-color: {hcolor('default_spotlight')};
color: {hcolor('bracket')};
border-radius: 2px;
margin: {self.slot_margin_px}px;
height: {slot_height_px}px;
}}
"""
)
# margin-bottom: {slot_margin_px*2}px;
# margin-top: {slot_margin_px*2}px;
# color: #19232D;
# width: 10px;
self.setRange(0, slots)
self.setValue(value)
def mk_fill_status_bar(
parent_pane: QWidget,
form: FieldsForm,
pane_vbox: QVBoxLayout,
label_font_size: Optional[int] = None,
) -> (
# TODO: turn this into a composite?
QHBoxLayout,
QProgressBar,
QLabel,
QLabel,
QLabel,
):
# indent = 18
# bracket_val = 0.375 * 0.666 * w
# indent = bracket_val / (1 + 5/8)
# TODO: calc this height from the ``ChartnPane``
chart_h = round(parent_pane.height() * 5/8)
bar_h = chart_h * 0.375
# TODO: once things are sized to screen
bar_label_font_size = label_font_size or _font.px_size - 2
label_font = DpiAwareFont()
label_font._set_qfont_px_size(bar_label_font_size)
br = label_font.boundingRect(f'{3.32:.1f}% port')
_, h = br.width(), br.height()
# text_w = 8/3 * w
# PnL on lhs
bar_labels_lhs = QVBoxLayout()
left_label = form.add_field_label(
dedent("""
{pnl:>+.2%} pnl
"""),
font_size=bar_label_font_size,
font_color='gunmetal',
)
bar_labels_lhs.addSpacing(5/8 * bar_h)
bar_labels_lhs.addWidget(
left_label,
alignment=Qt.AlignLeft | Qt.AlignTop,
)
# this hbox is added as a layout by the paner maker/caller
hbox = QHBoxLayout()
hbox.addLayout(bar_labels_lhs)
# hbox.addSpacing(indent) # push to right a bit
# config
# hbox.setSpacing(indent * 0.375)
hbox.setSpacing(0)
hbox.setAlignment(Qt.AlignTop | Qt.AlignLeft)
hbox.setContentsMargins(0, 0, 0, 0)
# TODO: use percentage str formatter:
# https://docs.python.org/3/library/string.html#grammar-token-precision
top_label = form.add_field_label(
# {limit:.1f} limit
dedent("""
{limit}
"""),
font_size=bar_label_font_size,
font_color='gunmetal',
)
bottom_label = form.add_field_label(
dedent("""
x: {step_size}\n
"""),
font_size=bar_label_font_size,
font_color='gunmetal',
)
bar = FillStatusBar(
approx_height_px=bar_h,
width_px=h * (1 + 1/6),
font_size=form._font_size,
parent=form
)
hbox.addWidget(bar, alignment=Qt.AlignLeft | Qt.AlignTop)
bar_labels_rhs_vbox = QVBoxLayout()
bar_labels_rhs_vbox.addWidget(
top_label,
alignment=Qt.AlignLeft | Qt.AlignTop
)
bar_labels_rhs_vbox.addWidget(
bottom_label,
alignment=Qt.AlignLeft | Qt.AlignBottom
)
hbox.addLayout(bar_labels_rhs_vbox)
# compute "chunk" sizes for fill-status-bar based on some static height
slots = 4
bar.set_slots(slots, value=0)
return hbox, bar, left_label, top_label, bottom_label
def mk_order_pane_layout(
parent: QWidget,
# accounts: dict[str, Optional[str]],
) -> FieldsForm:
# font_size: int = _font_small.px_size - 2
font_size: int = _font.px_size - 2
accounts = brokers.config.load_accounts()
# TODO: maybe just allocate the whole fields form here
# and expect an async ctx entry?
form = mk_form(
parent=parent,
fields_schema={
'account': {
'label': '**account**:',
'type': 'select',
'default_value': accounts.keys(),
},
'size_unit': {
'label': '**allocate**:',
'type': 'select',
'default_value': [
'$ size',
'# units',
# '% of port',
],
},
# 'disti_weight': {
# 'label': '**weighting**:',
# 'type': 'select',
# 'default_value': ['uniform'],
# },
'limit': {
'label': '**limit**:',
'type': 'edit',
'default_value': 5000,
},
'slots': {
'label': '**slots**:',
'type': 'edit',
'default_value': 4,
},
},
)
# top level pane layout
# XXX: see ``FieldsForm.__init__()`` for why we can't do basic
# config of the vbox here
vbox = form.vbox
# _, h = form.width(), form.height()
# print(f'w, h: {w, h}')
hbox, fill_bar, left_label, top_label, bottom_label = mk_fill_status_bar(
parent,
form,
pane_vbox=vbox,
label_font_size=font_size,
)
# TODO: would be nice to have some better way of reffing these over
# monkey patching...
form.fill_bar = fill_bar
form.left_label = left_label
form.bottom_label = bottom_label
form.top_label = top_label
# add pp fill bar + spacing
vbox.addLayout(hbox, stretch=1/3)
# TODO: status labels for brokerd real-time info
# feed_label = form.add_field_label(
# dedent("""
# brokerd.ib\n
# |_@{host}:{port}\n
# |_consumers: {cons}\n
# |_streams: {streams}\n
# |_shms: {streams}\n
# """),
# font_size=_font_small.px_size,
# )
# # add feed info label
# vbox.addWidget(
# feed_label,
# alignment=Qt.AlignBottom,
# # stretch=1/3,
# )
# TODO: handle resize events and appropriately scale this
# to the sidepane height?
# https://doc.qt.io/qt-5/layout.html#adding-widgets-to-a-layout
vbox.setSpacing(_font.px_size * 1.375)
form.show()
return form

View File

@ -23,6 +23,8 @@ import time
from typing import Optional, Callable
import pyqtgraph as pg
# from pyqtgraph.GraphicsScene import mouseEvents
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
from PyQt5.QtCore import Qt, QEvent
from pyqtgraph import ViewBox, Point, QtCore
from pyqtgraph import functions as fn
@ -32,20 +34,39 @@ import trio
from ..log import get_logger
from ._style import _min_points_to_show
from ._editors import SelectRect
from ._window import main_window
from . import _event
log = get_logger(__name__)
NUMBER_LINE = {
Qt.Key_1,
Qt.Key_2,
Qt.Key_3,
Qt.Key_4,
Qt.Key_5,
Qt.Key_6,
Qt.Key_7,
Qt.Key_8,
Qt.Key_9,
Qt.Key_0,
}
async def handle_viewmode_inputs(
ORDER_MODE = {
Qt.Key_A,
Qt.Key_F,
Qt.Key_D,
}
async def handle_viewmode_kb_inputs(
view: 'ChartView',
recv_chan: trio.abc.ReceiveChannel,
) -> None:
mode = view.mode
order_mode = view.order_mode
# track edge triggered keys
# (https://en.wikipedia.org/wiki/Interrupt#Triggering_methods)
@ -55,6 +76,8 @@ async def handle_viewmode_inputs(
trigger_mode: str
action: str
on_next_release: Optional[Callable] = None
# for quick key sequence-combo pattern matching
# we have a min_tap period and these should not
# ever be auto-repeats since we filter those at the
@ -62,10 +85,11 @@ async def handle_viewmode_inputs(
min_tap = 1/6
fast_key_seq: list[str] = []
fast_taps: dict[str, Callable] = {
'cc': mode.cancel_all_orders,
'cc': order_mode.cancel_all_orders,
}
async for event, etype, key, mods, text in recv_chan:
async for kbmsg in recv_chan:
event, etype, key, mods, text = kbmsg.to_tuple()
log.debug(f'key: {key}, mods: {mods}, text: {text}')
now = time.time()
period = now - last
@ -115,7 +139,7 @@ async def handle_viewmode_inputs(
Qt.Key_Space,
}
):
view._chart._lc.godwidget.search.focus()
view._chart.linked.godwidget.search.focus()
# esc and ctrl-c
if key == Qt.Key_Escape or (ctrl and key == Qt.Key_C):
@ -126,7 +150,7 @@ async def handle_viewmode_inputs(
# cancel order or clear graphics
if key == Qt.Key_C or key == Qt.Key_Delete:
mode.cancel_orders_under_cursor()
order_mode.cancel_orders_under_cursor()
# View modes
if key == Qt.Key_R:
@ -144,10 +168,14 @@ async def handle_viewmode_inputs(
# release branch
elif etype in {QEvent.KeyRelease}:
if on_next_release:
on_next_release()
on_next_release = None
if key in pressed:
pressed.remove(key)
# QUERY MODE #
# QUERY/QUOTE MODE #
if {Qt.Key_Q}.intersection(pressed):
view.linkedsplits.cursor.in_query_mode = True
@ -155,7 +183,8 @@ async def handle_viewmode_inputs(
else:
view.linkedsplits.cursor.in_query_mode = False
# SELECTION MODE #
# SELECTION MODE
# --------------
if shift:
if view.state['mouseMode'] == ViewBox.PanMode:
@ -163,25 +192,41 @@ async def handle_viewmode_inputs(
else:
view.setMouseMode(ViewBox.PanMode)
# ORDER MODE #
# live vs. dark trigger + an action {buy, sell, alert}
# Toggle position config pane
if (
ctrl and key in {
Qt.Key_P,
}
):
pp_pane = order_mode.pp.pane
if pp_pane.isHidden():
pp_pane.show()
else:
pp_pane.hide()
order_keys_pressed = {
Qt.Key_A,
Qt.Key_F,
Qt.Key_D
}.intersection(pressed)
# ORDER MODE
# ----------
# live vs. dark trigger + an action {buy, sell, alert}
order_keys_pressed = ORDER_MODE.intersection(pressed)
if order_keys_pressed:
# show the pp size label
order_mode.pp.show()
# TODO: show pp config mini-params in status bar widget
# mode.pp_config.show()
if (
# 's' for "submit" to activate "live" order
Qt.Key_S in pressed or
ctrl
):
trigger_mode: str = 'live'
trigger_type: str = 'live'
else:
trigger_mode: str = 'dark'
trigger_type: str = 'dark'
# order mode trigger "actions"
if Qt.Key_D in pressed: # for "damp eet"
@ -192,32 +237,85 @@ async def handle_viewmode_inputs(
elif Qt.Key_A in pressed:
action = 'alert'
trigger_mode = 'live'
trigger_type = 'live'
view.order_mode = True
order_mode.active = True
# XXX: order matters here for line style!
view.mode._exec_mode = trigger_mode
view.mode.set_exec(action)
order_mode._trigger_type = trigger_type
order_mode.stage_order(
action,
trigger_type=trigger_type,
)
prefix = trigger_mode + '-' if action != 'alert' else ''
view._chart.window().mode_label.setText(
f'mode: {prefix}{action}')
prefix = trigger_type + '-' if action != 'alert' else ''
view._chart.window().set_mode_name(f'{prefix}{action}')
elif (
(
Qt.Key_S in pressed or
order_keys_pressed or
Qt.Key_O in pressed
) and
key in NUMBER_LINE
):
# hot key to set order slots size
num = int(text)
pp_pane = order_mode.pane
pp_pane.on_ui_settings_change('slots', num)
edit = pp_pane.form.fields['slots']
edit.selectAll()
on_next_release = edit.deselect
pp_pane.update_status_ui()
else: # none active
# hide pp label
order_mode.pp.hide_info()
# if none are pressed, remove "staged" level
# line under cursor position
view.mode.lines.unstage_line()
order_mode.lines.unstage_line()
if view.hasFocus():
# update mode label
view._chart.window().mode_label.setText('mode: view')
view._chart.window().set_mode_name('view')
view.order_mode = False
order_mode.active = False
last = time.time()
async def handle_viewmode_mouse(
view: 'ChartView',
recv_chan: trio.abc.ReceiveChannel,
) -> None:
async for msg in recv_chan:
button = msg.button
# XXX: ugggh ``pyqtgraph`` has its own mouse events..
# so we can't overried this easily.
# it's going to take probably some decent
# reworking of the mouseClickEvent() handler.
# if button == QtCore.Qt.RightButton and view.menuEnabled():
# event = mouseEvents.MouseClickEvent(msg.event)
# # event.accept()
# view.raiseContextMenu(event)
if (
view.order_mode.active and
button == QtCore.Qt.LeftButton
):
# when in order mode, submit execution
# msg.event.accept()
view.order_mode.submit_order()
class ChartView(ViewBox):
'''
Price chart view box with interaction behaviors you'd expect from
@ -229,12 +327,13 @@ class ChartView(ViewBox):
- zoom on right-click-n-drag to cursor position
'''
mode_name: str = 'mode: view'
mode_name: str = 'view'
def __init__(
self,
name: str,
parent: pg.PlotItem = None,
**kwargs,
@ -251,7 +350,6 @@ class ChartView(ViewBox):
self.select_box = SelectRect(self)
self.addItem(self.select_box, ignoreBounds=True)
self.name = name
self.mode = None
self.order_mode: bool = False
@ -260,13 +358,25 @@ class ChartView(ViewBox):
@asynccontextmanager
async def open_async_input_handler(
self,
) -> 'ChartView':
from . import _event
async with _event.open_handler(
self,
event_types={QEvent.KeyPress, QEvent.KeyRelease},
async_handler=handle_viewmode_inputs,
) -> 'ChartView':
async with (
_event.open_handlers(
[self],
event_types={
QEvent.KeyPress,
QEvent.KeyRelease,
},
async_handler=handle_viewmode_kb_inputs,
),
_event.open_handlers(
[self],
event_types={
gs_mouse.GraphicsSceneMousePress,
},
async_handler=handle_viewmode_mouse,
),
):
yield self
@ -458,22 +568,10 @@ class ChartView(ViewBox):
self.scaleBy(x=x, y=y, center=center)
self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
def mouseClickEvent(self, ev):
"""Full-click callback.
"""
button = ev.button()
# pos = ev.pos()
if button == QtCore.Qt.RightButton and self.menuEnabled():
ev.accept()
self.raiseContextMenu(ev)
elif button == QtCore.Qt.LeftButton:
# when in order mode, submit execution
if self.order_mode:
ev.accept()
self.mode.submit_exec()
# def mouseClickEvent(self, event: QtCore.QEvent) -> None:
# '''This routine is rerouted to an async handler.
# '''
# pass
def keyReleaseEvent(self, event: QtCore.QEvent) -> None:
'''This routine is rerouted to an async handler.

View File

@ -19,74 +19,20 @@ Non-shitty labels that don't re-invent the wheel.
"""
from inspect import isfunction
from typing import Callable
from typing import Callable, Optional, Any
import pyqtgraph as pg
from PyQt5 import QtGui, QtWidgets
from PyQt5.QtCore import QPointF, QRectF
from PyQt5.QtWidgets import QLabel
from PyQt5.QtCore import QPointF, QRectF, Qt
from ._style import (
DpiAwareFont,
hcolor,
_font,
)
def vbr_left(label) -> Callable[..., float]:
"""Return a closure which gives the scene x-coordinate for the
leftmost point of the containing view box.
"""
return label.vbr().left
def right_axis(
chart: 'ChartPlotWidget', # noqa
label: 'Label', # noqa
side: str = 'left',
offset: float = 10,
avoid_book: bool = True,
width: float = None,
) -> Callable[..., float]:
"""Return a position closure which gives the scene x-coordinate for
the x point on the right y-axis minus the width of the label given
it's contents.
"""
ryaxis = chart.getAxis('right')
if side == 'left':
if avoid_book:
def right_axis_offset_by_w() -> float:
# l1 spread graphics x-size
l1_len = chart._max_l1_line_len
# sum of all distances "from" the y-axis
right_offset = l1_len + label.w + offset
return ryaxis.pos().x() - right_offset
else:
def right_axis_offset_by_w() -> float:
return ryaxis.pos().x() - (label.w + offset)
return right_axis_offset_by_w
elif 'right':
# axis_offset = ryaxis.style['tickTextOffset'][0]
def on_axis() -> float:
return ryaxis.pos().x() # + axis_offset - 2
return on_axis
class Label:
"""
A plain ol' "scene label" using an underlying ``QGraphicsTextItem``.
@ -110,13 +56,14 @@ class Label:
self,
view: pg.ViewBox,
fmt_str: str,
color: str = 'bracket',
color: str = 'default_light',
x_offset: float = 0,
font_size: str = 'small',
opacity: float = 0.666,
fields: dict = {}
opacity: float = 1,
fields: dict = {},
update_on_range_change: bool = True,
) -> None:
@ -124,9 +71,13 @@ class Label:
self._fmt_str = fmt_str
self._view_xy = QPointF(0, 0)
self.scene_anchor: Optional[Callable[..., QPointF]] = None
self._x_offset = x_offset
txt = self.txt = QtWidgets.QGraphicsTextItem()
txt.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
vb.scene().addItem(txt)
# configure font size based on DPI
@ -139,7 +90,8 @@ class Label:
txt.setOpacity(opacity)
# register viewbox callbacks
vb.sigRangeChanged.connect(self.on_sigrange_change)
if update_on_range_change:
vb.sigRangeChanged.connect(self.on_sigrange_change)
self._hcolor: str = ''
self.color = color
@ -165,13 +117,34 @@ class Label:
self.txt.setDefaultTextColor(pg.mkColor(hcolor(color)))
self._hcolor = color
def update(self) -> None:
'''Update this label either by invoking its
user defined anchoring function, or by positioning
to the last recorded data view coordinates.
'''
# move label in scene coords to desired position
anchor = self.scene_anchor
if anchor:
self.txt.setPos(anchor())
else:
# position based on last computed view coordinate
self.set_view_pos(self._view_xy.y())
def on_sigrange_change(self, vr, r) -> None:
self.set_view_y(self._view_xy.y())
return self.update()
@property
def w(self) -> float:
return self.txt.boundingRect().width()
def scene_br(self) -> QRectF:
txt = self.txt
return txt.mapToScene(
txt.boundingRect()
).boundingRect()
@property
def h(self) -> float:
return self.txt.boundingRect().height()
@ -186,18 +159,20 @@ class Label:
assert isinstance(func(), float)
self._anchor_func = func
def set_view_y(
def set_view_pos(
self,
y: float,
x: Optional[float] = None,
) -> None:
scene_x = self._anchor_func() or self.txt.pos().x()
if x is None:
scene_x = self._anchor_func() or self.txt.pos().x()
x = self.vb.mapToView(QPointF(scene_x, scene_x)).x()
# get new (inside the) view coordinates / position
self._view_xy = QPointF(
self.vb.mapToView(QPointF(scene_x, scene_x)).x(),
y,
)
self._view_xy = QPointF(x, y)
# map back to the outer UI-land "scene" coordinates
s_xy = self.vb.mapFromView(self._view_xy)
@ -210,9 +185,6 @@ class Label:
assert s_xy == self.txt.pos()
def orient_on(self, h: str, v: str) -> None:
pass
@property
def fmt_str(self) -> str:
return self._fmt_str
@ -221,7 +193,11 @@ class Label:
def fmt_str(self, fmt_str: str) -> None:
self._fmt_str = fmt_str
def format(self, **fields: dict) -> str:
def format(
self,
**fields: dict
) -> str:
out = {}
@ -229,8 +205,10 @@ class Label:
# calcs of field data from field data
# ex. to calculate a $value = price * size
for k, v in fields.items():
if isfunction(v):
out[k] = v(fields)
else:
out[k] = v
@ -252,3 +230,55 @@ class Label:
def delete(self) -> None:
self.vb.scene().removeItem(self.txt)
class FormatLabel(QLabel):
'''Kinda similar to above but using the widget apis.
'''
def __init__(
self,
fmt_str: str,
font: QtGui.QFont,
font_size: int,
font_color: str,
parent=None,
) -> None:
super().__init__(parent)
# by default set the format string verbatim and expect user to
# call ``.format()`` later (presumably they'll notice the
# unformatted content if ``fmt_str`` isn't meant to be
# unformatted).
self.fmt_str = fmt_str
self.setText(fmt_str)
self.setStyleSheet(
f"""QLabel {{
color : {hcolor(font_color)};
font-size : {font_size}px;
}}
"""
)
self.setFont(_font.font)
self.setTextFormat(Qt.MarkdownText) # markdown
self.setMargin(0)
self.setAlignment(
Qt.AlignVCenter
| Qt.AlignLeft
)
self.setText(self.fmt_str)
def format(
self,
**fields: dict[str, Any],
) -> str:
out = self.fmt_str.format(**fields)
self.setText(out)
return out

View File

@ -18,16 +18,25 @@
Lines for orders, alerts, L2.
"""
from functools import partial
from math import floor
from typing import Tuple, Optional, List
from typing import Tuple, Optional, List, Callable
import pyqtgraph as pg
from pyqtgraph import Point, functions as fn
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QPointF
from ._annotate import mk_marker, qgo_draw_markers
from ._label import Label, vbr_left, right_axis
from ._annotate import qgo_draw_markers, LevelMarker
from ._anchors import (
marker_right_points,
vbr_left,
right_axis,
# pp_tight_and_right, # wanna keep it straight in the long run
gpath_pin,
)
from ..calc import humanize
from ._label import Label
from ._style import hcolor, _font
@ -36,12 +45,6 @@ from ._style import hcolor, _font
# https://stackoverflow.com/questions/26156486/determine-bounding-rect-of-line-in-qt
class LevelLine(pg.InfiniteLine):
# TODO: fill in these slots for orders
# available parent signals
# sigDragged(self)
# sigPositionChangeFinished(self)
# sigPositionChanged(self)
def __init__(
self,
chart: 'ChartPlotWidget', # type: ignore # noqa
@ -50,19 +53,20 @@ class LevelLine(pg.InfiniteLine):
color: str = 'default',
highlight_color: str = 'default_light',
dotted: bool = False,
marker_size: int = 20,
# UX look and feel opts
always_show_labels: bool = False,
hl_on_hover: bool = True,
highlight_on_hover: bool = True,
hide_xhair_on_hover: bool = True,
only_show_markers_on_hover: bool = True,
use_marker_margin: bool = False,
movable: bool = True,
) -> None:
# TODO: at this point it's probably not worth the inheritance
# any more since we've reimplemented ``.pain()`` among other
# things..
super().__init__(
movable=movable,
angle=0,
@ -72,13 +76,16 @@ class LevelLine(pg.InfiniteLine):
)
self._chart = chart
self._hoh = hl_on_hover
self.highlight_on_hover = highlight_on_hover
self._dotted = dotted
self._hide_xhair_on_hover = hide_xhair_on_hover
# callback that can be assigned by user code
# to get updates from each level change
self._on_level_change: Callable[[float], None] = lambda y: None
self._marker = None
self._default_mkr_size = marker_size
self._moh = only_show_markers_on_hover
self.only_show_markers_on_hover = only_show_markers_on_hover
self.show_markers: bool = True # presuming the line is hovered at init
# should line go all the way to far end or leave a "margin"
@ -97,7 +104,7 @@ class LevelLine(pg.InfiniteLine):
# list of labels anchored at one of the 2 line endpoints
# inside the viewbox
self._labels: List[(int, Label)] = []
self._labels: List[Label] = []
self._markers: List[(int, Label)] = []
# whenever this line is moved trigger label updates
@ -109,14 +116,12 @@ class LevelLine(pg.InfiniteLine):
# TODO: for when we want to move groups of lines?
self._track_cursor: bool = False
self._always_show_labels = always_show_labels
self.always_show_labels = always_show_labels
self._on_drag_start = lambda l: None
self._on_drag_end = lambda l: None
self._y_incr_mult = 1 / chart._lc._symbol.tick_size
self._last_scene_y: float = 0
self._y_incr_mult = 1 / chart.linked.symbol.tick_size
self._right_end_sc: float = 0
def txt_offsets(self) -> Tuple[int, int]:
@ -143,52 +148,6 @@ class LevelLine(pg.InfiniteLine):
hoverpen.setWidth(2)
self.hoverPen = hoverpen
def add_label(
self,
# by default we only display the line's level value
# in the label
fmt_str: str = (
'{level:,.{level_digits}f}'
),
side: str = 'right',
side_of_axis: str = 'left',
x_offset: float = 0,
color: str = None,
bg_color: str = None,
avoid_book: bool = True,
**label_kwargs,
) -> Label:
"""Add a ``LevelLabel`` anchored at one of the line endpoints in view.
"""
label = Label(
view=self.getViewBox(),
fmt_str=fmt_str,
color=self.color,
)
# set anchor callback
if side == 'right':
label.set_x_anchor_func(
right_axis(
self._chart,
label,
side=side_of_axis,
offset=x_offset,
avoid_book=avoid_book,
)
)
elif side == 'left':
label.set_x_anchor_func(vbr_left(label))
self._labels.append((side, label))
return label
def on_pos_change(
self,
line: 'LevelLine', # noqa
@ -196,61 +155,75 @@ class LevelLine(pg.InfiniteLine):
"""Position changed handler.
"""
self.update_labels({'level': self.value()})
level = self.value()
self.update_labels({'level': level})
self.set_level(level, called_from_on_pos_change=True)
def update_labels(
self,
fields_data: dict,
) -> None:
for at, label in self._labels:
label.color = self.color
# print(f'color is {self.color}')
for label in self._labels:
label.color = self.color
label.fields.update(fields_data)
label.render()
level = fields_data.get('level')
if level:
label.set_view_y(level)
label.render()
label.set_view_pos(y=level)
self.update()
def hide_labels(self) -> None:
for at, label in self._labels:
for label in self._labels:
label.hide()
def show_labels(self) -> None:
for at, label in self._labels:
for label in self._labels:
label.show()
def set_level(
self,
level: float,
called_from_on_pos_change: bool = False,
) -> None:
last = self.value()
# if the position hasn't changed then ``.update_labels()``
# will not be called by a non-triggered `.on_pos_change()`,
# so we need to call it manually to avoid mismatching
# label-to-line color when the line is updated but not
# "moved".
if level == last:
self.update_labels({'level': level})
if not called_from_on_pos_change:
last = self.value()
# if the position hasn't changed then ``.update_labels()``
# will not be called by a non-triggered `.on_pos_change()`,
# so we need to call it manually to avoid mismatching
# label-to-line color when the line is updated but not
# from a "moved" event.
if level == last:
self.update_labels({'level': level})
self.setPos(level)
self.setPos(level)
self.level = self.value()
self.update()
# invoke any user code
self._on_level_change(level)
def on_tracked_source(
self,
x: int,
y: float
) -> None:
# XXX: this is called by our ``Cursor`` type once this
# line is set to track the cursor: for every movement
# this callback is invoked to reposition the line
'''Chart coordinates cursor tracking callback.
this is called by our ``Cursor`` type once this line is set to
track the cursor: for every movement this callback is invoked to
reposition the line with the current view coordinates.
'''
self.movable = True
self.set_level(y) # implictly calls reposition handler
@ -316,9 +289,10 @@ class LevelLine(pg.InfiniteLine):
"""
scene = self.scene()
if scene:
for at, label in self._labels:
for label in self._labels:
label.delete()
# gc managed labels?
self._labels.clear()
if self._marker:
@ -352,26 +326,13 @@ class LevelLine(pg.InfiniteLine):
return up_to_l1_sc
def marker_right_points(self) -> (float, float, float):
chart = self._chart
l1_len = chart._max_l1_line_len
ryaxis = chart.getAxis('right')
r_axis_x = ryaxis.pos().x()
up_to_l1_sc = r_axis_x - l1_len
size = self._default_mkr_size
marker_right = up_to_l1_sc - (1.375 * 2*size)
line_end = marker_right - (6/16 * size)
return line_end, marker_right, r_axis_x
def paint(
self,
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget
) -> None:
"""Core paint which we override (yet again)
from pg..
@ -383,7 +344,7 @@ class LevelLine(pg.InfiniteLine):
vb_left, vb_right = self._endPoints
vb = self.getViewBox()
line_end, marker_right, r_axis_x = self.marker_right_points()
line_end, marker_right, r_axis_x = marker_right_points(self._chart)
if self.show_markers and self.markers:
@ -403,9 +364,14 @@ class LevelLine(pg.InfiniteLine):
# order lines.. not sure wtf is up with that.
# for now we're just using it on the position line.
elif self._marker:
# TODO: make this label update part of a scene-aware-marker
# composed annotation
self._marker.setPos(
QPointF(marker_right, self.scene_y())
)
if hasattr(self._marker, 'label'):
self._marker.label.update()
elif not self.use_marker_margin:
# basically means **don't** shorten the line with normally
@ -427,19 +393,33 @@ class LevelLine(pg.InfiniteLine):
super().hide()
if self._marker:
self._marker.hide()
# needed for ``order_line()`` lines currently
self._marker.label.hide()
def scene_right_xy(self) -> QPointF:
return self.getViewBox().mapFromView(
QPointF(0, self.value())
)
def show(self) -> None:
super().show()
if self._marker:
self._marker.show()
# self._marker.label.show()
def scene_y(self) -> float:
return self.getViewBox().mapFromView(Point(0, self.value())).y()
return self.getViewBox().mapFromView(
Point(0, self.value())
).y()
def scene_endpoint(self) -> QPointF:
if not self._right_end_sc:
line_end, _, _ = marker_right_points(self._chart)
self._right_end_sc = line_end - 10
return QPointF(self._right_end_sc, self.scene_y())
def add_marker(
self,
path: QtWidgets.QGraphicsPathItem,
) -> None:
) -> QtWidgets.QGraphicsPathItem:
# add path to scene
self.getViewBox().scene().addItem(path)
@ -450,10 +430,9 @@ class LevelLine(pg.InfiniteLine):
self._marker.setPen(self.currentPen)
self._marker.setBrush(fn.mkBrush(self.currentPen.color()))
# y_in_sc = chart._vb.mapFromView(Point(0, self.value())).y()
path.setPos(QPointF(rsc, self.scene_y()))
# self.update()
return path
def hoverEvent(self, ev):
"""Mouse hover callback.
@ -468,11 +447,14 @@ class LevelLine(pg.InfiniteLine):
if self.mouseHovering is True:
return
if self._moh:
if self.only_show_markers_on_hover:
self.show_markers = True
if self._marker:
self._marker.show()
# highlight if so configured
if self._hoh:
if self.highlight_on_hover:
self.currentPen = self.hoverPen
@ -511,17 +493,18 @@ class LevelLine(pg.InfiniteLine):
cur._hovered.remove(self)
if self._moh:
if self.only_show_markers_on_hover:
self.show_markers = False
if self._marker:
self._marker.hide()
self._marker.label.hide()
if self not in cur._trackers:
cur.show_xhair(y_label_level=self.value())
if not self._always_show_labels:
for at, label in self._labels:
label.hide()
label.txt.update()
# label.unhighlight()
if not self.always_show_labels:
self.hide_labels()
self.mouseHovering = False
@ -529,33 +512,28 @@ class LevelLine(pg.InfiniteLine):
def level_line(
chart: 'ChartPlotWidget', # noqa
level: float,
color: str = 'default',
# whether or not the line placed in view should highlight
# when moused over (aka "hovered")
hl_on_hover: bool = True,
# line style
dotted: bool = False,
color: str = 'default',
# ux
highlight_on_hover: bool = True,
# label fields and options
digits: int = 1,
always_show_labels: bool = False,
add_label: bool = True,
orient_v: str = 'bottom',
**kwargs,
) -> LevelLine:
"""Convenience routine to add a styled horizontal line to a plot.
"""
hl_color = color + '_light' if hl_on_hover else color
hl_color = color + '_light' if highlight_on_hover else color
line = LevelLine(
chart,
@ -567,7 +545,7 @@ def level_line(
dotted=dotted,
# UX related options
hl_on_hover=hl_on_hover,
highlight_on_hover=highlight_on_hover,
# when set to True the label is always shown instead of just on
# highlight (which is a privacy thing for orders)
@ -580,17 +558,36 @@ def level_line(
if add_label:
label = line.add_label(
side='right',
opacity=1,
x_offset=0,
avoid_book=False,
)
label.orient_v = orient_v
label = Label(
view=line.getViewBox(),
# by default we only display the line's level value
# in the label
fmt_str=('{level:,.{level_digits}f}'),
color=color,
)
# anchor to right side (of view ) label
label.set_x_anchor_func(
right_axis(
chart,
label,
side='left', # side of axis
offset=0,
avoid_book=False,
)
)
# add to label set which will be updated on level changes
line._labels.append(label)
label.orient_v = orient_v
line.update_labels({'level': level, 'level_digits': 2})
label.render()
# keep pp label details private until
# the user edge triggers "order mode"
line.hide_labels()
# activate/draw label
@ -600,128 +597,181 @@ def level_line(
def order_line(
chart,
level: float,
level_digits: float,
action: str, # buy or sell
action: Optional[str] = 'buy', # buy or sell
marker_style: Optional[str] = None,
level_digits: Optional[float] = 3,
size: Optional[int] = 1,
size_digits: int = 0,
size_digits: int = 1,
show_markers: bool = False,
submit_price: float = None,
exec_type: str = 'dark',
order_type: str = 'limit',
orient_v: str = 'bottom',
**line_kwargs,
) -> LevelLine:
"""Convenience routine to add a line graphic representing an order
'''Convenience routine to add a line graphic representing an order
execution submitted to the EMS via the chart's "order mode".
"""
'''
line = level_line(
chart,
level,
add_label=False,
use_marker_margin=True,
# only_show_markers_on_hover=True,
**line_kwargs
)
if show_markers:
font_size = _font.font.pixelSize()
font_size = _font.font.pixelSize()
# scale marker size with dpi-aware font size
arrow_size = floor(1.375 * font_size)
alert_size = arrow_size * 0.666
# add arrow marker on end of line nearest y-axis
marker_style, marker_size = {
'buy': ('|<', arrow_size),
'sell': ('>|', arrow_size),
'alert': ('v', alert_size),
}[action]
# this fixes it the artifact issue! .. of course, bouding rect stuff
line._maxMarkerSize = marker_size
# use ``QPathGraphicsItem``s to draw markers in scene coords
# instead of the old way that was doing the same but by
# resetting the graphics item transform intermittently
# XXX: this is our new approach but seems slower?
# line.add_marker(mk_marker(marker_style, marker_size))
assert not line.markers
# the old way which is still somehow faster?
path = mk_marker(
marker_style,
# the "position" here is now ignored since we modified
# internals to pin markers to the right end of the line
marker_size,
use_qgpath=False,
)
# manually append for later ``InfiniteLine.paint()`` drawing
# XXX: this was manually tested as faster then using the
# QGraphicsItem around a painter path.. probably needs further
# testing to figure out why tf that's true.
line.markers.append((path, 0, marker_size))
# scale marker size with dpi-aware font size
marker_size = floor(1.375 * font_size)
orient_v = 'top' if action == 'sell' else 'bottom'
if action == 'alert':
# completely different labelling for alerts
fmt_str = 'alert => {level}'
label = Label(
view=line.getViewBox(),
color=line.color,
# completely different labelling for alerts
fmt_str='alert => {level}',
)
# for now, we're just duplicating the label contents i guess..
llabel = line.add_label(
side='left',
fmt_str=fmt_str,
)
llabel.fields = {
line._labels.append(label)
# anchor to left side of view / line
label.set_x_anchor_func(vbr_left(label))
label.fields = {
'level': level,
'level_digits': level_digits,
}
llabel.orient_v = orient_v
llabel.render()
llabel.show()
marker_size = marker_size * 0.666
else:
# # left side label
# llabel = line.add_label(
# side='left',
# fmt_str=' {exec_type}-{order_type}:\n ${$value}',
# )
# llabel.fields = {
# 'order_type': order_type,
# 'level': level,
# '$value': lambda f: f['level'] * f['size'],
# 'size': size,
# 'exec_type': exec_type,
# }
# llabel.orient_v = orient_v
# llabel.render()
# llabel.show()
view = line.getViewBox()
# right before L1 label
rlabel = line.add_label(
side='right',
side_of_axis='left',
x_offset=4*marker_size,
# far-side label
label = Label(
view=view,
# display the order pos size, which is some multiple
# of the user defined base unit size
fmt_str=(
'{size:.{size_digits}f} '
'{size:.{size_digits}f}u{fiat_text}'
),
color=line.color,
)
rlabel.fields = {
label.set_x_anchor_func(vbr_left(label))
line._labels.append(label)
def maybe_show_fiat_text(fields: dict) -> str:
fiat_size = fields.get('fiat_size')
if not fiat_size:
return ''
return f' -> ${humanize(fiat_size)}'
label.fields = {
'size': size,
'size_digits': size_digits,
'size_digits': 0,
'fiat_size': None,
'fiat_text': maybe_show_fiat_text,
}
rlabel.orient_v = orient_v
rlabel.render()
rlabel.show()
label.orient_v = orient_v
label.render()
label.show()
if show_markers:
# add arrow marker on end of line nearest y-axis
marker_style = marker_style or {
'buy': '|<',
'sell': '>|',
'alert': 'v',
}[action]
# the old way which is still somehow faster?
marker = LevelMarker(
chart=chart,
style=marker_style,
get_level=line.value,
size=marker_size,
keep_in_view=False,
)
# XXX: this is our new approach but seems slower?
marker = line.add_marker(marker)
# XXX: DON'T COMMENT THIS!
# this fixes it the artifact issue! .. of course, bounding rect stuff
line._maxMarkerSize = marker_size
assert line._marker is marker
assert not line.markers
# above we use ``QPathGraphicsItem``s directly to draw markers
# in scene coords instead of the way ``InfiniteLine`` does it
# internally: by resetting the graphics item transform
# intermittently inside ``.paint()`` which we've copied and
# seperated as ``qgo_draw_markers()`` if we ever want to go back
# to it; likely we can remove this.
# manually append for later ``InfiniteLine.paint()`` drawing
# XXX: this was manually tested as faster then using the
# QGraphicsItem around a painter path.. probably needs further
# testing to figure out why tf that's true.
# line.markers.append((marker, 0, marker_size))
if action != 'alert':
# add a partial position label if we also added a level marker
pp_size_label = Label(
view=view,
color=line.color,
# this is "static" label
# update_on_range_change=False,
fmt_str='\n'.join((
'{slots_used:.1f}x',
)),
fields={
'slots_used': 0,
},
)
pp_size_label.render()
pp_size_label.show()
line._labels.append(pp_size_label)
# TODO: pretty sure one of the reasons these "label
# updatess" are a bit "jittery" is because we aren't
# leveraging the "scene coordinates hierarchy" stuff:
# i.e. using some parent object as the coord "origin"
# which i presume would result in better pixel caching
# results? def something to dig into..
pp_size_label.scene_anchor = partial(
gpath_pin,
gpath=marker,
label=pp_size_label,
)
# XXX: without this the pp proportion label next the marker
# seems to lag? this is the same issue we had with position
# lines which we handle with ``.update_graphcis()``.
# marker._on_paint=lambda marker: pp_size_label.update()
marker._on_paint = lambda marker: pp_size_label.update()
marker.label = label
# sanity check
line.update_labels({'level': level})
@ -729,104 +779,27 @@ def order_line(
return line
def position_line(
chart,
size: float,
# TODO: should probably consider making this a more general
# purpose class method on the type?
def copy_from_order_line(
level: float,
orient_v: str = 'bottom',
chart: 'ChartPlotWidget', # noqa
line: LevelLine
) -> LevelLine:
"""Convenience routine to add a line graphic representing an order
execution submitted to the EMS via the chart's "order mode".
"""
line = level_line(
return order_line(
chart,
level,
color='default_light',
add_label=False,
hl_on_hover=False,
movable=False,
always_show_labels=False,
hide_xhair_on_hover=False,
use_marker_margin=True,
# label fields default values
level=line.value(),
marker_style=line._marker.style,
# LevelLine kwargs
color=line.color,
dotted=line._dotted,
show_markers=line.show_markers,
only_show_markers_on_hover=line.only_show_markers_on_hover,
)
# hide position marker when out of view (for now)
vb = line.getViewBox()
def update_pp_nav(chartview):
vr = vb.state['viewRange']
ymn, ymx = vr[1]
level = line.value()
path = line._marker
# provide "nav hub" like indicator for where
# the position is on the y-dimension
# print(path._height)
# print(vb.shape())
# print(vb.boundingRect())
# print(vb.height())
_, marker_right, _ = line.marker_right_points()
if level > ymx: # pin to top of view
path.setPos(
QPointF(
marker_right,
2 + path._height,
)
)
elif level < ymn: # pin to bottom of view
path.setPos(
QPointF(
marker_right,
vb.height() - 16 + path._height,
)
)
else:
# pp line is viewable so show marker
line._marker.show()
vb.sigYRangeChanged.connect(update_pp_nav)
rlabel = line.add_label(
side='right',
fmt_str='{direction}: {size} -> ${$:.2f}',
)
rlabel.fields = {
'direction': 'long' if size > 0 else 'short',
'$': size * level,
'size': size,
}
rlabel.orient_v = orient_v
rlabel.render()
rlabel.show()
# arrow marker
# scale marker size with dpi-aware font size
font_size = _font.font.pixelSize()
# scale marker size with dpi-aware font size
arrow_size = floor(1.375 * font_size)
if size > 0:
style = '|<'
elif size < 0:
style = '>|'
arrow_path = mk_marker(style, size=arrow_size)
# monkey-cache height for sizing on pp nav-hub
arrow_path._height = arrow_path.boundingRect().height()
# XXX: uses new marker drawing approach
line.add_marker(arrow_path)
line.set_level(level)
# sanity check
line.update_labels({'level': level})
return line

129
piker/ui/_orm.py 100644
View File

@ -0,0 +1,129 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
micro-ORM for coupling ``pydantic`` models with Qt input/output widgets.
"""
from __future__ import annotations
from typing import (
Optional, Generic,
TypeVar, Callable,
Literal,
)
import enum
import sys
from pydantic import BaseModel, validator
from pydantic.generics import GenericModel
from PyQt5.QtWidgets import (
QWidget,
QComboBox,
)
from ._forms import (
# FontScaledDelegate,
FontAndChartAwareLineEdit,
)
DataType = TypeVar('DataType')
class Field(GenericModel, Generic[DataType]):
widget_factory: Optional[
Callable[
[QWidget, 'Field'],
QWidget
]
]
value: Optional[DataType] = None
class Selection(Field[DataType], Generic[DataType]):
'''Model which maps to a finite set of drop down entries declared as
a ``dict[str, DataType]``.
'''
widget_factory = QComboBox
options: dict[str, DataType]
# value: DataType = None
@validator('value') # , always=True)
def set_value_first(
cls,
v: DataType,
values: dict[str, DataType],
) -> DataType:
'''If no initial value is set, use the first in
the ``options`` dict.
'''
# breakpoint()
options = values['options']
if v is None:
return next(options.values())
else:
assert v in options, f'{v} is not in {options}'
return v
# class SizeUnit(Enum):
# currency = '$ size'
# percent_of_port = '% of port'
# shares = '# shares'
# class Weighter(str, Enum):
# uniform = 'uniform'
class Edit(Field[DataType], Generic[DataType]):
'''An edit field which takes a number.
'''
widget_factory = FontAndChartAwareLineEdit
class AllocatorPane(BaseModel):
account = Selection[str](
options=dict.fromkeys(
['paper', 'ib.paper', 'ib.margin'],
'paper',
),
)
allocate = Selection[str](
# options=list(Size),
options={
'$ size': 'currency',
'% of port': 'percent_of_port',
'# shares': 'shares',
},
# TODO: save/load from config and/or last session
# value='currency'
)
weight = Selection[str](
options={
'uniform': 'uniform',
},
# value='uniform',
)
size = Edit[float](value=1000)
slots = Edit[int](value=4)

View File

@ -0,0 +1,785 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Position info and display
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from functools import partial
from math import floor
from typing import Optional
from bidict import bidict
from pyqtgraph import functions as fn
from pydantic import BaseModel, validator
from ._annotate import LevelMarker
from ._anchors import (
pp_tight_and_right, # wanna keep it straight in the long run
gpath_pin,
)
from ..calc import humanize
from ..clearing._messages import BrokerdPosition, Status
from ..data._source import Symbol
from ._label import Label
from ._lines import LevelLine, order_line
from ._style import _font
from ._forms import FieldsForm, FillStatusBar, QLabel
from ..log import get_logger
from ..clearing._messages import Order
log = get_logger(__name__)
class Position(BaseModel):
'''Basic pp (personal position) model with attached fills history.
This type should be IPC wire ready?
'''
symbol: Symbol
# last size and avg entry price
size: float
avg_price: float # TODO: contextual pricing
# ordered record of known constituent trade messages
fills: list[Status] = []
_size_units = bidict({
'currency': '$ size',
'units': '# units',
# TODO: but we'll need a `<brokermod>.get_accounts()` or something
# 'percent_of_port': '% of port',
})
SizeUnit = Enum(
'SizeUnit',
_size_units,
)
class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
# underscore_attrs_are_private = False
symbol: Symbol
account: Optional[str] = 'paper'
_accounts: bidict[str, Optional[str]]
@validator('account', pre=True)
def set_account(cls, v, values):
if v:
return values['_accounts'][v]
size_unit: SizeUnit = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit')
def lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
return v.name
# TODO: if we ever want ot support non-uniform entry-slot-proportion
# "sizes"
# disti_weight: str = 'uniform'
units_limit: float
currency_limit: float
slots: int
def step_sizes(
self,
) -> (float, float):
'''Return the units size for each unit type as a tuple.
'''
slots = self.slots
return (
self.units_limit / slots,
self.currency_limit / slots,
)
def limit(self) -> float:
if self.size_unit == 'currency':
return self.currency_limit
else:
return self.units_limit
def next_order_info(
self,
startup_pp: Position,
live_pp: Position,
price: float,
action: str,
) -> dict:
'''Generate order request info for the "next" submittable order
depending on position / order entry config.
'''
sym = self.symbol
ld = sym.lot_size_digits
size_unit = self.size_unit
live_size = live_pp.size
abs_live_size = abs(live_size)
abs_startup_size = abs(startup_pp.size)
u_per_slot, currency_per_slot = self.step_sizes()
if size_unit == 'units':
slot_size = u_per_slot
l_sub_pp = self.units_limit - abs_live_size
elif size_unit == 'currency':
live_cost_basis = abs_live_size * live_pp.avg_price
slot_size = currency_per_slot / price
l_sub_pp = (self.currency_limit - live_cost_basis) / price
# an entry (adding-to or starting a pp)
if (
action == 'buy' and live_size > 0 or
action == 'sell' and live_size < 0 or
live_size == 0
):
order_size = min(slot_size, l_sub_pp)
# an exit (removing-from or going to net-zero pp)
else:
# when exiting a pp we always try to slot the position
# in the instrument's units, since doing so in a derived
# size measure (eg. currency value, percent of port) would
# result in a mis-mapping of slots sizes in unit terms
# (i.e. it would take *more* slots to exit at a profit and
# *less* slots to exit at a loss).
pp_size = max(abs_startup_size, abs_live_size)
slotted_pp = pp_size / self.slots
if size_unit == 'currency':
# compute the "projected" limit's worth of units at the
# current pp (weighted) price:
slot_size = currency_per_slot / live_pp.avg_price
else:
slot_size = u_per_slot
# if our position is greater then our limit setting
# we'll want to use slot sizes which are larger then what
# the limit would normally determine
order_size = max(slotted_pp, slot_size)
if (
abs_live_size < slot_size or
# NOTE: front/back "loading" heurstic:
# if the remaining pp is in between 0-1.5x a slot's
# worth, dump the whole position in this last exit
# therefore conducting so called "back loading" but
# **without** going past a net-zero pp. if the pp is
# > 1.5x a slot size, then front load: exit a slot's and
# expect net-zero to be acquired on the final exit.
slot_size < pp_size < round((1.5*slot_size), ndigits=ld)
):
order_size = abs_live_size
slots_used = 1.0 # the default uniform policy
if order_size < slot_size:
# compute a fractional slots size to display
slots_used = self.slots_used(
Position(symbol=sym, size=order_size, avg_price=price)
)
return {
'size': abs(round(order_size, ndigits=ld)),
'size_digits': ld,
# TODO: incorporate multipliers for relevant derivatives
'fiat_size': round(order_size * price, ndigits=2),
'slots_used': slots_used,
}
def slots_used(
self,
pp: Position,
) -> float:
'''Calc and return the number of slots used by this ``Position``.
'''
abs_pp_size = abs(pp.size)
if self.size_unit == 'currency':
# live_currency_size = size or (abs_pp_size * pp.avg_price)
live_currency_size = abs_pp_size * pp.avg_price
prop = live_currency_size / self.currency_limit
else:
# return (size or abs_pp_size) / alloc.units_limit
prop = abs_pp_size / self.units_limit
# TODO: REALLY need a way to show partial slots..
# for now we round at the midway point between slots
return round(prop * self.slots)
@dataclass
class SettingsPane:
'''Composite set of widgets plus an allocator model for configuring
order entry sizes and position limits per tradable instrument.
'''
# config for and underlying validation model
tracker: PositionTracker
alloc: Allocator
# input fields
form: FieldsForm
# output fill status and labels
fill_bar: FillStatusBar
step_label: QLabel
pnl_label: QLabel
limit_label: QLabel
def transform_to(self, size_unit: str) -> None:
if self.alloc.size_unit == size_unit:
return
def on_selection_change(
self,
text: str,
key: str,
) -> None:
'''Called on any order pane drop down selection change.
'''
print(f'selection input: {text}')
setattr(self.alloc, key, text)
self.on_ui_settings_change(key, text)
def on_ui_settings_change(
self,
key: str,
value: str,
) -> bool:
'''Called on any order pane edit field value change.
'''
print(f'settings change: {key}: {value}')
alloc = self.alloc
size_unit = alloc.size_unit
# write any passed settings to allocator
if key == 'limit':
if size_unit == 'currency':
alloc.currency_limit = float(value)
else:
alloc.units_limit = float(value)
elif key == 'slots':
alloc.slots = int(value)
elif key == 'size_unit':
# TODO: if there's a limit size unit change re-compute
# the current settings in the new units
pass
elif key == 'account':
print(f'TODO: change account -> {value}')
else:
raise ValueError(f'Unknown setting {key}')
# read out settings and update UI
suffix = {'currency': ' $', 'units': ' u'}[size_unit]
limit = alloc.limit()
# TODO: a reverse look up from the position to the equivalent
# account(s), if none then look to user config for default?
self.update_status_ui()
step_size, currency_per_slot = alloc.step_sizes()
if size_unit == 'currency':
step_size = currency_per_slot
self.step_label.format(
step_size=str(humanize(step_size)) + suffix
)
self.limit_label.format(
limit=str(humanize(limit)) + suffix
)
# update size unit in UI
self.form.fields['size_unit'].setCurrentText(
alloc._size_units[alloc.size_unit]
)
self.form.fields['slots'].setText(str(alloc.slots))
self.form.fields['limit'].setText(str(limit))
# TODO: maybe return a diff of settings so if we can an error we
# can have general input handling code to report it through the
# UI in some way?
return True
def init_status_ui(
self,
):
alloc = self.alloc
asset_type = alloc.symbol.type_key
# form = self.form
# TODO: pull from piker.toml
# default config
slots = 4
currency_limit = 5e3
startup_pp = self.tracker.startup_pp
alloc.slots = slots
alloc.currency_limit = currency_limit
# default entry sizing
if asset_type in ('stock', 'crypto', 'forex'):
alloc.size_unit = '$ size'
elif asset_type in ('future', 'option', 'futures_option'):
# since it's harder to know how currency "applies" in this case
# given leverage properties
alloc.size_unit = '# units'
# set units limit to slots size thus making make the next
# entry step 1.0
alloc.units_limit = slots
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':
startup_size = startup_pp.size * startup_pp.avg_price
if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2)
limit_text = alloc.currency_limit
else:
startup_size = startup_pp.size
if startup_size > alloc.units_limit:
alloc.units_limit = startup_size
limit_text = alloc.units_limit
self.on_ui_settings_change('limit', limit_text)
self.update_status_ui(size=startup_size)
def update_status_ui(
self,
size: float = None,
) -> None:
alloc = self.alloc
slots = alloc.slots
used = alloc.slots_used(self.tracker.live_pp)
# calculate proportion of position size limit
# that exists and display in fill bar
# TODO: what should we do for fractional slot pps?
self.fill_bar.set_slots(
slots,
# TODO: how to show "partial" slots?
# min(round(prop * slots), slots)
min(used, slots)
)
def on_level_change_update_next_order_info(
self,
level: float,
line: LevelLine,
order: Order,
) -> None:
'''A callback applied for each level change to the line
which will recompute the order size based on allocator
settings. this is assigned inside
``OrderMode.line_from_order()``
'''
order_info = self.alloc.next_order_info(
startup_pp=self.tracker.startup_pp,
live_pp=self.tracker.live_pp,
price=level,
action=order.action,
)
line.update_labels(order_info)
# update bound-in staged order
order.price = level
order.size = order_info['size']
def position_line(
chart: 'ChartPlotWidget', # noqa
size: float,
level: float,
color: str,
orient_v: str = 'bottom',
marker: Optional[LevelMarker] = None,
) -> LevelLine:
'''Convenience routine to create a line graphic representing a "pp"
aka the acro for a,
"{piker, private, personal, puny, <place your p-word here>} position".
If ``marker`` is provided it will be configured appropriately for
the "direction" of the position.
'''
line = order_line(
chart,
level,
# TODO: could we maybe add a ``action=None`` which
# would be a mechanism to check a marker was passed in?
color=color,
highlight_on_hover=False,
movable=False,
hide_xhair_on_hover=False,
only_show_markers_on_hover=False,
always_show_labels=False,
# explicitly disable ``order_line()`` factory's creation
# of a level marker since we do it in this tracer thing.
show_markers=False,
)
if marker:
# configure marker to position data
if size > 0: # long
style = '|<' # point "up to" the line
elif size < 0: # short
style = '>|' # point "down to" the line
marker.style = style
# set marker color to same as line
marker.setPen(line.currentPen)
marker.setBrush(fn.mkBrush(line.currentPen.color()))
marker.level = level
marker.update()
marker.show()
# show position marker on view "edge" when out of view
vb = line.getViewBox()
vb.sigRangeChanged.connect(marker.position_in_view)
line.set_level(level)
return line
class PositionTracker:
'''Track and display a real-time position for a single symbol
on a chart.
Graphically composed of a level line and marker as well as labels
for indcating current position information. Updates are made to the
corresponding "settings pane" for the chart's "order mode" UX.
'''
# inputs
chart: 'ChartPlotWidget' # noqa
alloc: Allocator
# allocated
startup_pp: Position
live_pp: Position
pp_label: Label
size_label: Label
line: Optional[LevelLine] = None
_color: str = 'default_lightest'
def __init__(
self,
chart: 'ChartPlotWidget', # noqa
alloc: Allocator,
) -> None:
self.chart = chart
self.alloc = alloc
self.live_pp = Position(
symbol=chart.linked.symbol,
size=0,
avg_price=0,
)
self.startup_pp = self.live_pp.copy()
view = chart.getViewBox()
# literally the 'pp' (pee pee) label that's always in view
self.pp_label = pp_label = Label(
view=view,
fmt_str='pp',
color=self._color,
update_on_range_change=False,
)
# create placeholder 'up' level arrow
self._level_marker = None
self._level_marker = self.level_marker(size=1)
pp_label.scene_anchor = partial(
gpath_pin,
gpath=self._level_marker,
label=pp_label,
)
pp_label.render()
self.size_label = size_label = Label(
view=view,
color=self._color,
# this is "static" label
# update_on_range_change=False,
fmt_str='\n'.join((
':{slots_used:.1f}x',
)),
fields={
'slots_used': 0,
},
)
size_label.render()
size_label.scene_anchor = partial(
pp_tight_and_right,
label=self.pp_label,
)
@property
def pane(self) -> FieldsForm:
'''Return handle to pp side pane form.
'''
return self.chart.linked.godwidget.pp_pane
def update_graphics(
self,
marker: LevelMarker
) -> None:
'''Update all labels.
Meant to be called from the maker ``.paint()``
for immediate, lag free label draws.
'''
self.pp_label.update()
self.size_label.update()
def update_from_pp_msg(
self,
msg: BrokerdPosition,
position: Optional[Position] = None,
) -> None:
'''Update graphics and data from average price and size passed in our
EMS ``BrokerdPosition`` msg.
'''
# XXX: better place to do this?
symbol = self.chart.linked.symbol
lot_size_digits = symbol.lot_size_digits
avg_price, size = (
round(msg['avg_price'], ndigits=symbol.tick_size_digits),
round(msg['size'], ndigits=lot_size_digits),
)
# live pp updates
pp = position or self.live_pp
pp.avg_price = avg_price
pp.size = size
self.update_line(
avg_price,
size,
lot_size_digits,
)
# label updates
self.size_label.fields['slots_used'] = round(
self.alloc.slots_used(pp), ndigits=1)
self.size_label.render()
if size == 0:
self.hide()
else:
self._level_marker.level = avg_price
# these updates are critical to avoid lag on view/scene changes
self._level_marker.update() # trigger paint
self.pp_label.update()
self.size_label.update()
self.show()
# don't show side and status widgets unless
# order mode is "engaged" (which done via input controls)
self.hide_info()
def level(self) -> float:
if self.line:
return self.line.value()
else:
return 0
def show(self) -> None:
if self.live_pp.size:
self.line.show()
self.line.show_labels()
self._level_marker.show()
self.pp_label.show()
self.size_label.show()
def hide(self) -> None:
self.pp_label.hide()
self._level_marker.hide()
self.size_label.hide()
if self.line:
self.line.hide()
def hide_info(self) -> None:
'''Hide details (right now just size label?) of position.
'''
self.size_label.hide()
if self.line:
self.line.hide_labels()
# TODO: move into annoate module
def level_marker(
self,
size: float,
) -> LevelMarker:
if self._level_marker:
self._level_marker.delete()
# arrow marker
# scale marker size with dpi-aware font size
font_size = _font.font.pixelSize()
# scale marker size with dpi-aware font size
arrow_size = floor(1.375 * font_size)
if size > 0:
style = '|<'
elif size < 0:
style = '>|'
arrow = LevelMarker(
chart=self.chart,
style=style,
get_level=self.level,
size=arrow_size,
on_paint=self.update_graphics,
)
self.chart.getViewBox().scene().addItem(arrow)
arrow.show()
return arrow
# TODO: per account lines on a single (or very related) symbol
def update_line(
self,
price: float,
size: float,
size_digits: int,
) -> None:
'''Update personal position level line.
'''
# do line update
line = self.line
if size:
if line is None:
# create and show a pp line
line = self.line = position_line(
chart=self.chart,
level=price,
size=size,
color=self._color,
marker=self._level_marker,
)
else:
line.set_level(price)
self._level_marker.level = price
self._level_marker.update()
# update LHS sizing label
line.update_labels({
'size': size,
'size_digits': size_digits,
'fiat_size': round(price * size, ndigits=2)
})
line.show()
elif line: # remove pp line from view if it exists on a net-zero pp
line.delete()
self.line = None

View File

@ -35,9 +35,9 @@ from collections import defaultdict
from contextlib import asynccontextmanager
from functools import partial
from typing import (
List, Optional, Callable,
Awaitable, Sequence, Dict,
Any, AsyncIterator, Tuple,
Optional, Callable,
Awaitable, Sequence,
Any, AsyncIterator
)
import time
# from pprint import pformat
@ -45,7 +45,7 @@ import time
from fuzzywuzzy import process as fuzzy
import trio
from trio_typing import TaskStatus
from PyQt5 import QtCore, QtGui
from PyQt5 import QtCore
from PyQt5 import QtWidgets
from PyQt5.QtCore import (
Qt,
@ -63,40 +63,24 @@ from PyQt5.QtWidgets import (
QTreeView,
# QListWidgetItem,
# QAbstractScrollArea,
QStyledItemDelegate,
# QStyledItemDelegate,
)
from ..log import get_logger
from ._style import (
_font,
DpiAwareFont,
# hcolor,
hcolor,
)
from ._forms import FontAndChartAwareLineEdit, FontScaledDelegate
log = get_logger(__name__)
class SimpleDelegate(QStyledItemDelegate):
"""
Super simple view delegate to render text in the same
font size as the search widget.
"""
def __init__(
self,
parent=None,
font: DpiAwareFont = _font,
) -> None:
super().__init__(parent)
self.dpi_font = font
class CompleterView(QTreeView):
mode_name: str = 'mode: search-nav'
mode_name: str = 'search-nav'
# XXX: relevant docs links:
# - simple widget version of this:
@ -121,7 +105,7 @@ class CompleterView(QTreeView):
def __init__(
self,
parent=None,
labels: List[str] = [],
labels: list[str] = [],
) -> None:
super().__init__(parent)
@ -130,14 +114,12 @@ class CompleterView(QTreeView):
self.labels = labels
# a std "tabular" config
self.setItemDelegate(SimpleDelegate())
self.setItemDelegate(FontScaledDelegate(self))
self.setModel(model)
self.setAlternatingRowColors(True)
# TODO: size this based on DPI font
self.setIndentation(20)
self.pressed.connect(self.on_pressed)
# self.setUniformRowHeights(True)
# self.setColumnWidth(0, 3)
# self.setVerticalBarPolicy(Qt.ScrollBarAlwaysOff)
@ -154,12 +136,12 @@ class CompleterView(QTreeView):
self._font_size: int = 0 # pixels
def on_pressed(self, idx: QModelIndex) -> None:
async def on_pressed(self, idx: QModelIndex) -> None:
'''Mouse pressed on view handler.
'''
search = self.parent()
search.chart_current_item(clear_to_cache=False)
await search.chart_current_item(clear_to_cache=False)
search.focus()
def set_font_size(self, size: int = 18):
@ -425,59 +407,28 @@ class CompleterView(QTreeView):
self.resize()
class SearchBar(QtWidgets.QLineEdit):
class SearchBar(FontAndChartAwareLineEdit):
mode_name: str = 'mode: search'
mode_name: str = 'search'
def __init__(
self,
parent: QWidget,
parent_chart: QWidget, # noqa
godwidget: QWidget,
view: Optional[CompleterView] = None,
font: DpiAwareFont = _font,
**kwargs,
) -> None:
super().__init__(parent)
# self.setContextMenuPolicy(Qt.CustomContextMenu)
# self.customContextMenuRequested.connect(self.show_menu)
# self.setStyleSheet(f"font: 18px")
self.godwidget = godwidget
super().__init__(parent, **kwargs)
self.view: CompleterView = view
self.dpi_font = font
self.godwidget = parent_chart
# size it as we specify
# https://doc.qt.io/qt-5/qsizepolicy.html#Policy-enum
self.setSizePolicy(
QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Fixed,
)
self.setFont(font.font)
# witty bit of margin
self.setTextMargins(2, 2, 2, 2)
def focus(self) -> None:
self.selectAll()
self.show()
self.setFocus()
def show(self) -> None:
super().show()
self.view.show_matches()
def sizeHint(self) -> QtCore.QSize:
"""
Scale edit box to size of dpi aware font.
"""
psh = super().sizeHint()
psh.setHeight(self.dpi_font.px_size + 2)
return psh
def unfocus(self) -> None:
self.parent().hide()
self.clearFocus()
@ -492,12 +443,12 @@ class SearchWidget(QtWidgets.QWidget):
Includes helper methods for item management in the sub-widgets.
'''
mode_name: str = 'mode: search'
mode_name: str = 'search'
def __init__(
self,
godwidget: 'GodWidget', # type: ignore # noqa
columns: List[str] = ['src', 'symbol'],
columns: list[str] = ['src', 'symbol'],
parent=None,
) -> None:
@ -512,7 +463,7 @@ class SearchWidget(QtWidgets.QWidget):
self.godwidget = godwidget
self.vbox = QtWidgets.QVBoxLayout(self)
self.vbox.setContentsMargins(0, 0, 0, 0)
self.vbox.setContentsMargins(0, 4, 4, 0)
self.vbox.setSpacing(4)
# split layout for the (label:| search bar entry)
@ -522,10 +473,17 @@ class SearchWidget(QtWidgets.QWidget):
# add label to left of search bar
self.label = label = QtWidgets.QLabel(parent=self)
label.setStyleSheet(
f"""QLabel {{
color : {hcolor('default_lightest')};
font-size : {_font.px_size - 2}px;
}}
"""
)
label.setTextFormat(3) # markdown
label.setFont(_font.font)
label.setMargin(4)
label.setText("`search`:")
label.setText("search:")
label.show()
label.setAlignment(
QtCore.Qt.AlignVCenter
@ -540,8 +498,8 @@ class SearchWidget(QtWidgets.QWidget):
)
self.bar = SearchBar(
parent=self,
parent_chart=godwidget,
view=self.view,
godwidget=godwidget,
)
self.bar_hbox.addWidget(self.bar)
@ -564,7 +522,7 @@ class SearchWidget(QtWidgets.QWidget):
self.bar.focus()
self.show()
def get_current_item(self) -> Optional[Tuple[str, str]]:
def get_current_item(self) -> Optional[tuple[str, str]]:
'''Return the current completer tree selection as
a tuple ``(parent: str, child: str)`` if valid, else ``None``.
@ -596,14 +554,15 @@ class SearchWidget(QtWidgets.QWidget):
else:
return None
def chart_current_item(
async def chart_current_item(
self,
clear_to_cache: bool = True,
) -> Optional[str]:
'''Attempt to load and switch the current selected
completion result to the affiliated chart app.
Return any loaded symbol
Return any loaded symbol.
'''
value = self.get_current_item()
@ -615,7 +574,7 @@ class SearchWidget(QtWidgets.QWidget):
log.info(f'Requesting symbol: {symbol}.{provider}')
chart.load_symbol(
await chart.load_symbol(
provider,
symbol,
'info',
@ -653,10 +612,11 @@ async def pack_matches(
view: CompleterView,
has_results: dict[str, set[str]],
matches: dict[(str, str), List[str]],
matches: dict[(str, str), list[str]],
provider: str,
pattern: str,
search: Callable[..., Awaitable[dict]],
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -834,7 +794,7 @@ async def handle_keyboard_input(
# startup
bar = searchbar
search = searchbar.parent()
chart = search.godwidget
godwidget = search.godwidget
view = bar.view
view.set_font_size(bar.dpi_font.px_size)
@ -853,7 +813,8 @@ async def handle_keyboard_input(
)
)
async for event, etype, key, mods, txt in recv_chan:
async for kbmsg in recv_chan:
event, etype, key, mods, txt = kbmsg.to_tuple()
log.debug(f'key: {key}, mods: {mods}, txt: {txt}')
@ -861,14 +822,9 @@ async def handle_keyboard_input(
if mods == Qt.ControlModifier:
ctl = True
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
if key in (Qt.Key_Enter, Qt.Key_Return):
search.chart_current_item(clear_to_cache=True)
await search.chart_current_item(clear_to_cache=True)
_search_enabled = False
continue
@ -876,7 +832,7 @@ async def handle_keyboard_input(
# if nothing in search text show the cache
view.set_section_entries(
'cache',
list(reversed(chart._chart_cache)),
list(reversed(godwidget._chart_cache)),
clear_all=True,
)
continue
@ -890,8 +846,8 @@ async def handle_keyboard_input(
search.bar.unfocus()
# kill the search and focus back on main chart
if chart:
chart.linkedsplits.focus()
if godwidget:
godwidget.focus()
continue
@ -938,7 +894,7 @@ async def handle_keyboard_input(
if parent_item and parent_item.text() == 'cache':
# if it's a cache item, switch and show it immediately
search.chart_current_item(clear_to_cache=False)
await search.chart_current_item(clear_to_cache=False)
elif not ctl:
# relay to completer task
@ -950,7 +906,7 @@ async def handle_keyboard_input(
async def search_simple_dict(
text: str,
source: dict,
) -> Dict[str, Any]:
) -> dict[str, Any]:
# search routine can be specified as a function such
# as in the case of the current app's local symbol cache
@ -964,7 +920,7 @@ async def search_simple_dict(
# cache of provider names to async search routines
_searcher_cache: Dict[str, Callable[..., Awaitable]] = {}
_searcher_cache: dict[str, Callable[..., Awaitable]] = {}
@asynccontextmanager

View File

@ -56,7 +56,6 @@ class DpiAwareFont:
self._qfont = QtGui.QFont(name)
self._font_size: str = font_size
self._qfm = QtGui.QFontMetrics(self._qfont)
self._physical_dpi = None
self._font_inches: float = None
self._screen = None
@ -82,6 +81,10 @@ class DpiAwareFont:
def font(self):
return self._qfont
def scale(self) -> float:
screen = self.screen
return screen.logicalDotsPerInch() / screen.physicalDotsPerInch()
@property
def px_size(self) -> int:
return self._qfont.pixelSize()
@ -99,6 +102,12 @@ class DpiAwareFont:
# take the max since scaling can make things ugly in some cases
pdpi = screen.physicalDotsPerInch()
ldpi = screen.logicalDotsPerInch()
# XXX: this is needed on sway/wayland where you set
# ``QT_WAYLAND_FORCE_DPI=physical``
if ldpi == 0:
ldpi = pdpi
mx_dpi = max(pdpi, ldpi)
mn_dpi = min(pdpi, ldpi)
scale = round(ldpi/pdpi)
@ -114,14 +123,14 @@ class DpiAwareFont:
# dpi is likely somewhat scaled down so use slightly larger font size
if scale > 1 and self._font_size:
# TODO: this denominator should probably be determined from
# relative aspect rations or something?
# relative aspect ratios or something?
inches = inches * (1 / scale) * (1 + 6/16)
dpi = mx_dpi
self._font_inches = inches
font_size = math.floor(inches * dpi)
log.info(
log.debug(
f"\nscreen:{screen.name()} with pDPI: {pdpi}, lDPI: {ldpi}"
f"\nOur best guess font size is {font_size}\n"
)

View File

@ -124,8 +124,10 @@ class MultiStatus:
if not subs:
group_clear()
self._status_groups[group_key][0].add(msg)
ret = pop_from_group_and_maybe_clear_group
group = self._status_groups.get(group_key)
if group:
group[0].add(msg)
ret = pop_from_group_and_maybe_clear_group
self.render()
@ -146,12 +148,17 @@ class MultiStatus:
class MainWindow(QtGui.QMainWindow):
size = (800, 500)
# XXX: for tiling wms this should scale
# with the alloted window size.
# TODO: detect for tiling and if untrue set some size?
# size = (300, 500)
size = (0, 0)
title = 'piker chart (ur symbol is loading bby)'
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumSize(*self.size)
# self.setMinimumSize(*self.size)
self.setWindowTitle(self.title)
self._status_bar: QStatusBar = None
@ -165,7 +172,11 @@ class MainWindow(QtGui.QMainWindow):
self._status_label = label = QtGui.QLabel()
label.setStyleSheet(
f"QLabel {{ color : {hcolor('gunmetal')}; }}"
f"""QLabel {{
color : {hcolor('gunmetal')};
}}
"""
# font-size : {font_size}px;
)
label.setTextFormat(3) # markdown
label.setFont(_font_small.font)
@ -181,11 +192,13 @@ class MainWindow(QtGui.QMainWindow):
def closeEvent(
self,
event: QtGui.QCloseEvent,
) -> None:
"""Cancel the root actor asap.
"""
event: QtGui.QCloseEvent,
) -> None:
'''Cancel the root actor asap.
'''
# raising KBI seems to get intercepted by by Qt so just use the system.
os.kill(os.getpid(), signal.SIGINT)
@ -209,18 +222,28 @@ class MainWindow(QtGui.QMainWindow):
return self._status_bar
def on_focus_change(
def set_mode_name(
self,
old: QtGui.QWidget,
new: QtGui.QWidget,
name: str,
) -> None:
log.debug(f'widget focus changed from {old} -> {new}')
self.mode_label.setText(f'mode:{name}')
if new is not None:
def on_focus_change(
self,
last: QtGui.QWidget,
current: QtGui.QWidget,
) -> None:
log.info(f'widget focus changed from {last} -> {current}')
if current is not None:
# cursor left window?
name = getattr(new, 'mode_name', '')
self.mode_label.setText(name)
name = getattr(current, 'mode_name', '')
self.set_mode_name(name)
def current_screen(self) -> QtGui.QScreen:
"""Get a frickin screen (if we can, gawd).
@ -230,7 +253,7 @@ class MainWindow(QtGui.QMainWindow):
for _ in range(3):
screen = app.screenAt(self.pos())
print('trying to access QScreen...')
log.debug('trying to access QScreen...')
if screen is None:
time.sleep(0.5)
continue

File diff suppressed because it is too large Load Diff