Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 7592ae7be7 Pass labels to form builder, toy with broadcast consumer task 2021-08-10 17:04:19 -04:00
Tyler Goodlet 112615e374 Add (lack of proper) ring buffer note 2021-08-10 17:02:52 -04:00
Tyler Goodlet ef27a4f4e2 Position tracker is passed at init 2021-08-10 17:02:17 -04:00
Tyler Goodlet 27ba57217a Lol, initial size calcs on order line update 2021-08-10 17:01:46 -04:00
Tyler Goodlet d7cc234a78 Basic allocator state updates from pp sidepane 2021-08-10 17:00:52 -04:00
Tyler Goodlet 7a8e612228 Validate allocator assignments with pydantic 2021-08-10 16:59:44 -04:00
Tyler Goodlet ebfb700cd2 Add reference gist for Qt guest mode stuff 2021-08-10 16:58:41 -04:00
Tyler Goodlet 61c6bbb592 Add disclaimer to old data mod 2021-08-10 16:58:10 -04:00
Tyler Goodlet cc40048ab2 Unpack keyboard events into an explicit msg model 2021-08-10 16:57:19 -04:00
Tyler Goodlet 3d4898c4d5 Use `maybe_open_feed()` in ems and fsp daemons 2021-08-10 16:50:40 -04:00
15 changed files with 199 additions and 96 deletions

View File

@ -14,9 +14,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Real-time data feed machinery
"""
'''
NB: this is the old original implementation that was used way way back
when the project started with ``kivy``.
This code is left for reference but will likely be merged in
appropriately and removed.
'''
import time
from functools import partial
from dataclasses import dataclass, field

View File

@ -38,7 +38,7 @@ log = get_logger(__name__)
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
'''EMS-client-side order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
@ -48,7 +48,7 @@ class OrderBook:
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
"""
'''
# mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel

View File

@ -32,9 +32,8 @@ import tractor
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed, open_feed
from ..data.feed import Feed, maybe_open_feed
from .._daemon import maybe_spawn_brokerd
from .._cacheables import maybe_open_ctx
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -959,15 +958,11 @@ async def _emsd_main(
# spawn one task per broker feed
async with (
maybe_open_ctx(
key=(broker, symbol),
mngr=open_feed(
broker,
[symbol],
loglevel=loglevel,
),
maybe_open_feed(
broker,
[symbol],
loglevel=loglevel,
) as feed,
) as (feed, stream),
):
# XXX: this should be initial price quote from target provider
@ -1011,7 +1006,7 @@ async def _emsd_main(
brokerd_stream,
ems_client_order_stream,
feed.stream,
stream,
broker,
symbol,
book

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
NumPy compatible shared memory buffers for real-time FSP.
NumPy compatible shared memory buffers for real-time IPC streaming.
"""
from dataclasses import dataclass, asdict
@ -207,11 +207,16 @@ class ShmArray:
def push(
self,
data: np.ndarray,
prepend: bool = False,
) -> int:
"""Ring buffer like "push" to append data
'''Ring buffer like "push" to append data
into the buffer and return updated "last" index.
"""
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
length = len(data)
if prepend:

View File

@ -441,7 +441,7 @@ async def open_feed(
tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> ReceiveChannel[dict[str, Any]]:
) -> Feed:
'''
Open a "data feed" which provides streamed real-time quotes.
@ -522,7 +522,7 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed, bstream
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()
@ -538,7 +538,7 @@ async def maybe_open_feed(
tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> ReceiveChannel[dict[str, Any]]:
) -> (Feed, ReceiveChannel[dict[str, Any]]):
'''Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
@ -553,12 +553,12 @@ async def maybe_open_feed(
[sym],
loglevel=loglevel,
),
) as (cache_hit, (feed, stream)):
) as (cache_hit, feed):
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with stream.subscribe() as bstream:
async with feed.stream.subscribe() as bstream:
yield feed, bstream
else:
yield feed, stream

View File

@ -69,6 +69,7 @@ async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
@ -93,14 +94,14 @@ async def fsp_compute(
yield {}
# task cancellation won't kill the channel
with stream.shield():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
# since we shielded at the `open_feed()` call
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, feed.stream),
filter_by_sym(symbol, stream),
feed.shm,
)
@ -164,7 +165,8 @@ async def cascade(
dst_shm_token: Tuple[str, np.dtype],
symbol: str,
fsp_func_name: str,
) -> AsyncIterator[dict]:
) -> None:
"""Chain streaming signal processors and deliver output to
destination mem buf.
@ -175,7 +177,11 @@ async def cascade(
func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker
async with data.open_feed(brokername, [symbol]) as feed:
async with data.feed.maybe_open_feed(
brokername,
[symbol],
shielded_stream=True,
) as (feed, stream):
assert src.token == feed.shm.token
@ -186,6 +192,7 @@ async def cascade(
ctx=ctx,
symbol=symbol,
feed=feed,
stream=stream,
src=src,
dst=dst,

View File

@ -344,9 +344,7 @@ class LinkedSplits(QWidget):
def focus(self) -> None:
if self.chart is not None:
print("FOCUSSING CHART")
self.chart.focus()
# self.chart.parent().show()
def unfocus(self) -> None:
if self.chart is not None:
@ -390,8 +388,8 @@ class LinkedSplits(QWidget):
# style?
self.chart.setFrameStyle(
QtWidgets.QFrame.StyledPanel |
QtWidgets.QFrame.Plain
QFrame.StyledPanel |
QFrame.Plain
)
return self.chart
@ -1064,7 +1062,7 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev)
_clear_throttle_rate: int = 35 # Hz
_clear_throttle_rate: int = 60 # Hz
_book_throttle_rate: int = 16 # Hz
@ -1394,14 +1392,14 @@ async def run_fsp(
parent=linkedsplits.godwidget,
fields_schema={
'name': {
'key': '**fsp**:',
'label': '**fsp**:',
'type': 'select',
'default_value': [
f'{display_name}'
],
},
'period': {
'key': '**period**:',
'label': '**period**:',
'type': 'edit',
'default_value': 14,
},
@ -1637,8 +1635,7 @@ async def display_symbol_data(
# )
async with(
data.open_feed(
data.feed.open_feed(
provider,
[sym],
loglevel=loglevel,
@ -1647,8 +1644,21 @@ async def display_symbol_data(
tick_throttle=_clear_throttle_rate,
) as feed,
trio.open_nursery() as n,
):
async def print_quotes():
async with feed.stream.subscribe() as bstream:
last_tick = time.time()
async for quotes in bstream:
now = time.time()
period = now - last_tick
for sym, quote in quotes.items():
ticks = quote.get('ticks', ())
if ticks:
print(f'{1/period} Hz')
last_tick = time.time()
n.start_soon(print_quotes)
ohlcv: ShmArray = feed.shm
bars = ohlcv.array
@ -1829,6 +1839,8 @@ async def _async_main(
sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz')
# generate order mode side-pane UI
async with (
trio.open_nursery() as root_n,
@ -1838,7 +1850,6 @@ async def _async_main(
parent=godwidget,
fields_schema={
'account': {
'key': '**account**:',
'type': 'select',
'default_value': [
'paper',
@ -1846,8 +1857,8 @@ async def _async_main(
# 'ib.paper',
],
},
'allocator': {
'key': '**allocate**:',
'size_unit': {
'label': '**allocate**:',
'type': 'select',
'default_value': [
'$ size',
@ -1855,18 +1866,17 @@ async def _async_main(
'# shares'
],
},
'disti_policy': {
'key': '**weight**:',
'disti_weight': {
'label': '**weight**:',
'type': 'select',
'default_value': ['uniform'],
},
'dollar_size': {
'key': '**$size**:',
'size': {
'label': '**size**:',
'type': 'edit',
'default_value': '5k',
'default_value': 5000,
},
'slots': {
'key': '**slots**:',
'type': 'edit',
'default_value': 4,
},

View File

@ -25,6 +25,35 @@ from PyQt5 import QtCore
from PyQt5.QtCore import QEvent
from PyQt5.QtWidgets import QWidget
import trio
from pydantic import BaseModel
# TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel):
'''Unpacked Qt keyboard event data.
'''
event: QEvent
etype: int
key: int
mods: int
txt: str
class Config:
arbitrary_types_allowed = True
def to_tuple(self) -> tuple:
return tuple(self.dict().values())
# TODO: maybe add some methods to detect key combos? Or is that gonna be
# better with pattern matching?
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
class EventRelay(QtCore.QObject):
@ -67,22 +96,26 @@ class EventRelay(QtCore.QObject):
if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
msg = KeyboardMsg(
event=ev,
etype=ev.type(),
key=ev.key(),
mods=ev.modifiers(),
txt=ev.text(),
)
# TODO: is there a global setting for this?
if ev.isAutoRepeat() and self._filter_auto_repeats:
ev.ignore()
return True
key = ev.key()
mods = ev.modifiers()
txt = ev.text()
# NOTE: the event object instance coming out
# the other side is mutated since Qt resumes event
# processing **before** running a ``trio`` guest mode
# tick, thus special handling or copying must be done.
# send elements to async handler
self._send_chan.send_nowait((ev, etype, key, mods, txt))
# send keyboard msg to async handler
self._send_chan.send_nowait(msg)
else:
# send event to async handler

View File

@ -99,6 +99,9 @@ def run_qtractor(
# "This is substantially faster than using a signal... for some
# reason Qt signal dispatch is really slow (and relies on events
# underneath anyway, so this is strictly less work)."
# source gist and credit to njs:
# https://gist.github.com/njsmith/d996e80b700a339e0623f97f48bcf0cb
REENTER_EVENT = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
class ReenterEvent(QtCore.QEvent):

View File

@ -18,6 +18,7 @@
Text entry "forms" widgets (mostly for configuration and UI user input).
'''
from __future__ import annotations
from contextlib import asynccontextmanager
from functools import partial
from textwrap import dedent
@ -153,6 +154,13 @@ class FontScaledDelegate(QStyledItemDelegate):
return super().sizeHint(option, index)
# slew of resources which helped get this where it is:
# https://stackoverflow.com/questions/20648210/qcombobox-adjusttocontents-changing-height
# https://stackoverflow.com/questions/3151798/how-do-i-set-the-qcombobox-width-to-fit-the-largest-item
# https://stackoverflow.com/questions/6337589/qlistwidget-adjust-size-to-content#6370892
# https://stackoverflow.com/questions/25304267/qt-resize-of-qlistview
# https://stackoverflow.com/questions/28227406/how-to-set-qlistview-rows-height-permanently
class FieldsForm(QWidget):
godwidget: 'GodWidget' # noqa
@ -245,8 +253,6 @@ class FieldsForm(QWidget):
name: str,
value: str,
widget: Optional[QWidget] = None,
) -> FontAndChartAwareLineEdit:
# TODO: maybe a distint layout per "field" item?
@ -281,6 +287,7 @@ class FieldsForm(QWidget):
label = self.add_field_label(name)
select = QComboBox(self)
select._key = name
for i, value in enumerate(values):
select.insertItem(i, str(value))
@ -330,28 +337,38 @@ async def handle_field_input(
# last_widget: QWidget, # had focus prior
recv_chan: trio.abc.ReceiveChannel,
fields: FieldsForm,
allocator: Allocator, # noqa
) -> None:
async for event, etype, key, mods, txt in recv_chan:
print(f'key: {key}, mods: {mods}, txt: {txt}')
async for kbmsg in recv_chan:
# default controls set
ctl = False
if mods == Qt.ControlModifier:
ctl = True
if kbmsg.etype in {QEvent.KeyPress, QEvent.KeyRelease}:
event, etype, key, mods, txt = kbmsg.to_tuple()
print(f'key: {kbmsg.key}, mods: {kbmsg.mods}, txt: {kbmsg.txt}')
if ctl and key in { # cancel and refocus
# default controls set
ctl = False
if kbmsg.mods == Qt.ControlModifier:
ctl = True
Qt.Key_C,
Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt,
}:
if ctl and key in { # cancel and refocus
widget.clearFocus()
fields.godwidget.focus()
Qt.Key_C,
Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt,
}:
continue
widget.clearFocus()
fields.godwidget.focus()
continue
# process field input
if key in (Qt.Key_Enter, Qt.Key_Return):
value = widget.text()
key = widget._key
setattr(allocator, key, value)
print(allocator.dict())
@asynccontextmanager
@ -360,33 +377,54 @@ async def open_form(
godwidget: QWidget,
parent: QWidget,
fields_schema: dict,
# alloc: Allocator,
# orientation: str = 'horizontal',
) -> FieldsForm:
fields = FieldsForm(godwidget, parent=parent)
from ._position import mk_pp_alloc
alloc = mk_pp_alloc()
fields.model = alloc
for name, config in fields_schema.items():
wtype = config['type']
key = str(config['key'])
label = str(config.get('label', name))
# plain (line) edit field
if wtype == 'edit':
fields.add_edit_field(key, config['default_value'])
w = fields.add_edit_field(
label,
config['default_value']
)
# drop-down selection
elif wtype == 'select':
values = list(config['default_value'])
fields.add_select_field(key, values)
w = fields.add_select_field(
label,
values
)
def write_model(text: str):
print(f'{text}')
setattr(alloc, name, text)
w.currentTextChanged.connect(write_model)
w._key = name
async with open_handlers(
list(fields.fields.values()),
event_types={QEvent.KeyPress},
event_types={
QEvent.KeyPress,
},
async_handler=partial(
handle_field_input,
fields=fields,
allocator=alloc,
),
# block key repeats?
@ -395,7 +433,7 @@ async def open_form(
yield fields
def mk_health_bar(
def mk_fill_status_bar(
fields: FieldsForm,
pane_vbox: QVBoxLayout,
@ -553,7 +591,7 @@ def mk_order_pane_layout(
# _, h = fields.width(), fields.height()
# print(f'w, h: {w, h}')
hbox, bar = mk_health_bar(fields, pane_vbox=vbox)
hbox, bar = mk_fill_status_bar(fields, pane_vbox=vbox)
# add pp fill bar + spacing
vbox.addLayout(hbox, stretch=1/3)

View File

@ -64,7 +64,8 @@ async def handle_viewmode_inputs(
'cc': mode.cancel_all_orders,
}
async for event, etype, key, mods, text in recv_chan:
async for kbmsg in recv_chan:
event, etype, key, mods, text = kbmsg.to_tuple()
log.debug(f'key: {key}, mods: {mods}, text: {text}')
now = time.time()
period = now - last

View File

@ -204,15 +204,24 @@ class LevelLine(pg.InfiniteLine):
def on_tracked_source(
self,
x: int,
y: float
) -> None:
# XXX: this is called by our ``Cursor`` type once this
# line is set to track the cursor: for every movement
# this callback is invoked to reposition the line
'''Chart coordinates cursor tracking callback.
this is called by our ``Cursor`` type once this line is set to
track the cursor: for every movement this callback is invoked to
reposition the line
'''
self.movable = True
self.set_level(y) # implictly calls reposition handler
self._chart.linked.godwidget.pp_config.model.get_order_info(
price=y
)
def mouseDragEvent(self, ev):
"""Override the ``InfiniteLine`` handler since we need more
detailed control and start end signalling.

View File

@ -45,8 +45,7 @@ from ._style import _font
class Position(BaseModel):
'''Basic pp (personal position) data representation with attached
fills history.
'''Basic pp (personal position) model with attached fills history.
This type should be IPC wire ready?
@ -91,6 +90,9 @@ def mk_pp_alloc(
class Allocator(BaseModel):
class Config:
validate_assignment = True
account: Account = None
_accounts: dict[str, Optional[str]] = accounts
@ -154,11 +156,9 @@ class PositionTracker:
avg_price=0,
)
self.pp_label = None
view = chart.getViewBox()
# literally 'pp' label that's always in view
# literally the 'pp' (pee pee) label that's always in view
self.pp_label = pp_label = Label(
view=view,
fmt_str='pp',

View File

@ -815,7 +815,8 @@ async def handle_keyboard_input(
)
)
async for event, etype, key, mods, txt in recv_chan:
async for kbmsg in recv_chan:
event, etype, key, mods, txt = kbmsg.to_tuple()
log.debug(f'key: {key}, mods: {mods}, txt: {txt}')
@ -823,11 +824,6 @@ async def handle_keyboard_input(
if mods == Qt.ControlModifier:
ctl = True
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
if key in (Qt.Key_Enter, Qt.Key_Return):
search.chart_current_item(clear_to_cache=True)

View File

@ -413,7 +413,7 @@ async def run_order_mode(
),
):
view = chart._vb
view = chart.view
lines = LineEditor(chart=chart)
arrows = ArrowEditor(chart, {})
@ -431,8 +431,9 @@ async def run_order_mode(
pp,
)
# TODO: create a mode "manager" of sorts?
# -> probably just call it "UxModes" err sumthin?
# so that view handlers can access it
mode.pp = pp
view.mode = mode
asset_type = symbol.type_key