Compare commits

..

No commits in common. "7592ae7be70c1e061aa05aa837a3465951611aeb" and "6f30ae448a44c422160fc29f17b0e57fa5bb203b" have entirely different histories.

15 changed files with 96 additions and 199 deletions

View File

@ -14,14 +14,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
NB: this is the old original implementation that was used way way back
when the project started with ``kivy``.
This code is left for reference but will likely be merged in
appropriately and removed.
'''
"""
Real-time data feed machinery
"""
import time
from functools import partial
from dataclasses import dataclass, field

View File

@ -38,7 +38,7 @@ log = get_logger(__name__)
@dataclass
class OrderBook:
'''EMS-client-side order book ctl and tracking.
"""Buy-side (client-side ?) order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
@ -48,7 +48,7 @@ class OrderBook:
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
'''
"""
# mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel

View File

@ -32,8 +32,9 @@ import tractor
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed
from ..data.feed import Feed, open_feed
from .._daemon import maybe_spawn_brokerd
from .._cacheables import maybe_open_ctx
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -958,11 +959,15 @@ async def _emsd_main(
# spawn one task per broker feed
async with (
maybe_open_feed(
maybe_open_ctx(
key=(broker, symbol),
mngr=open_feed(
broker,
[symbol],
loglevel=loglevel,
) as (feed, stream),
),
loglevel=loglevel,
) as feed,
):
# XXX: this should be initial price quote from target provider
@ -1006,7 +1011,7 @@ async def _emsd_main(
brokerd_stream,
ems_client_order_stream,
stream,
feed.stream,
broker,
symbol,
book

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
NumPy compatible shared memory buffers for real-time IPC streaming.
NumPy compatible shared memory buffers for real-time FSP.
"""
from dataclasses import dataclass, asdict
@ -207,16 +207,11 @@ class ShmArray:
def push(
self,
data: np.ndarray,
prepend: bool = False,
) -> int:
'''Ring buffer like "push" to append data
"""Ring buffer like "push" to append data
into the buffer and return updated "last" index.
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
"""
length = len(data)
if prepend:

View File

@ -441,7 +441,7 @@ async def open_feed(
tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> Feed:
) -> ReceiveChannel[dict[str, Any]]:
'''
Open a "data feed" which provides streamed real-time quotes.
@ -522,7 +522,7 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
yield feed, bstream
finally:
# drop the infinite stream connection
await ctx.cancel()
@ -538,7 +538,7 @@ async def maybe_open_feed(
tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> (Feed, ReceiveChannel[dict[str, Any]]):
) -> ReceiveChannel[dict[str, Any]]:
'''Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
@ -553,12 +553,12 @@ async def maybe_open_feed(
[sym],
loglevel=loglevel,
),
) as (cache_hit, feed):
) as (cache_hit, (feed, stream)):
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with feed.stream.subscribe() as bstream:
async with stream.subscribe() as bstream:
yield feed, bstream
else:
yield feed, stream

View File

@ -69,7 +69,6 @@ async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
@ -94,14 +93,14 @@ async def fsp_compute(
yield {}
# task cancellation won't kill the channel
# since we shielded at the `open_feed()` call
with stream.shield():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, stream),
filter_by_sym(symbol, feed.stream),
feed.shm,
)
@ -165,8 +164,7 @@ async def cascade(
dst_shm_token: Tuple[str, np.dtype],
symbol: str,
fsp_func_name: str,
) -> None:
) -> AsyncIterator[dict]:
"""Chain streaming signal processors and deliver output to
destination mem buf.
@ -177,11 +175,7 @@ async def cascade(
func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker
async with data.feed.maybe_open_feed(
brokername,
[symbol],
shielded_stream=True,
) as (feed, stream):
async with data.open_feed(brokername, [symbol]) as feed:
assert src.token == feed.shm.token
@ -192,7 +186,6 @@ async def cascade(
ctx=ctx,
symbol=symbol,
feed=feed,
stream=stream,
src=src,
dst=dst,

View File

@ -344,7 +344,9 @@ class LinkedSplits(QWidget):
def focus(self) -> None:
if self.chart is not None:
print("FOCUSSING CHART")
self.chart.focus()
# self.chart.parent().show()
def unfocus(self) -> None:
if self.chart is not None:
@ -388,8 +390,8 @@ class LinkedSplits(QWidget):
# style?
self.chart.setFrameStyle(
QFrame.StyledPanel |
QFrame.Plain
QtWidgets.QFrame.StyledPanel |
QtWidgets.QFrame.Plain
)
return self.chart
@ -1062,7 +1064,7 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev)
_clear_throttle_rate: int = 60 # Hz
_clear_throttle_rate: int = 35 # Hz
_book_throttle_rate: int = 16 # Hz
@ -1392,14 +1394,14 @@ async def run_fsp(
parent=linkedsplits.godwidget,
fields_schema={
'name': {
'label': '**fsp**:',
'key': '**fsp**:',
'type': 'select',
'default_value': [
f'{display_name}'
],
},
'period': {
'label': '**period**:',
'key': '**period**:',
'type': 'edit',
'default_value': 14,
},
@ -1635,7 +1637,8 @@ async def display_symbol_data(
# )
async with(
data.feed.open_feed(
data.open_feed(
provider,
[sym],
loglevel=loglevel,
@ -1644,21 +1647,8 @@ async def display_symbol_data(
tick_throttle=_clear_throttle_rate,
) as feed,
trio.open_nursery() as n,
):
async def print_quotes():
async with feed.stream.subscribe() as bstream:
last_tick = time.time()
async for quotes in bstream:
now = time.time()
period = now - last_tick
for sym, quote in quotes.items():
ticks = quote.get('ticks', ())
if ticks:
print(f'{1/period} Hz')
last_tick = time.time()
n.start_soon(print_quotes)
):
ohlcv: ShmArray = feed.shm
bars = ohlcv.array
@ -1839,8 +1829,6 @@ async def _async_main(
sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz')
# generate order mode side-pane UI
async with (
trio.open_nursery() as root_n,
@ -1850,6 +1838,7 @@ async def _async_main(
parent=godwidget,
fields_schema={
'account': {
'key': '**account**:',
'type': 'select',
'default_value': [
'paper',
@ -1857,8 +1846,8 @@ async def _async_main(
# 'ib.paper',
],
},
'size_unit': {
'label': '**allocate**:',
'allocator': {
'key': '**allocate**:',
'type': 'select',
'default_value': [
'$ size',
@ -1866,17 +1855,18 @@ async def _async_main(
'# shares'
],
},
'disti_weight': {
'label': '**weight**:',
'disti_policy': {
'key': '**weight**:',
'type': 'select',
'default_value': ['uniform'],
},
'size': {
'label': '**size**:',
'dollar_size': {
'key': '**$size**:',
'type': 'edit',
'default_value': 5000,
'default_value': '5k',
},
'slots': {
'key': '**slots**:',
'type': 'edit',
'default_value': 4,
},

View File

@ -25,35 +25,6 @@ from PyQt5 import QtCore
from PyQt5.QtCore import QEvent
from PyQt5.QtWidgets import QWidget
import trio
from pydantic import BaseModel
# TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel):
'''Unpacked Qt keyboard event data.
'''
event: QEvent
etype: int
key: int
mods: int
txt: str
class Config:
arbitrary_types_allowed = True
def to_tuple(self) -> tuple:
return tuple(self.dict().values())
# TODO: maybe add some methods to detect key combos? Or is that gonna be
# better with pattern matching?
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
class EventRelay(QtCore.QObject):
@ -96,26 +67,22 @@ class EventRelay(QtCore.QObject):
if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
msg = KeyboardMsg(
event=ev,
etype=ev.type(),
key=ev.key(),
mods=ev.modifiers(),
txt=ev.text(),
)
# TODO: is there a global setting for this?
if ev.isAutoRepeat() and self._filter_auto_repeats:
ev.ignore()
return True
key = ev.key()
mods = ev.modifiers()
txt = ev.text()
# NOTE: the event object instance coming out
# the other side is mutated since Qt resumes event
# processing **before** running a ``trio`` guest mode
# tick, thus special handling or copying must be done.
# send keyboard msg to async handler
self._send_chan.send_nowait(msg)
# send elements to async handler
self._send_chan.send_nowait((ev, etype, key, mods, txt))
else:
# send event to async handler

View File

@ -99,9 +99,6 @@ def run_qtractor(
# "This is substantially faster than using a signal... for some
# reason Qt signal dispatch is really slow (and relies on events
# underneath anyway, so this is strictly less work)."
# source gist and credit to njs:
# https://gist.github.com/njsmith/d996e80b700a339e0623f97f48bcf0cb
REENTER_EVENT = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
class ReenterEvent(QtCore.QEvent):

View File

@ -18,7 +18,6 @@
Text entry "forms" widgets (mostly for configuration and UI user input).
'''
from __future__ import annotations
from contextlib import asynccontextmanager
from functools import partial
from textwrap import dedent
@ -154,13 +153,6 @@ class FontScaledDelegate(QStyledItemDelegate):
return super().sizeHint(option, index)
# slew of resources which helped get this where it is:
# https://stackoverflow.com/questions/20648210/qcombobox-adjusttocontents-changing-height
# https://stackoverflow.com/questions/3151798/how-do-i-set-the-qcombobox-width-to-fit-the-largest-item
# https://stackoverflow.com/questions/6337589/qlistwidget-adjust-size-to-content#6370892
# https://stackoverflow.com/questions/25304267/qt-resize-of-qlistview
# https://stackoverflow.com/questions/28227406/how-to-set-qlistview-rows-height-permanently
class FieldsForm(QWidget):
godwidget: 'GodWidget' # noqa
@ -253,6 +245,8 @@ class FieldsForm(QWidget):
name: str,
value: str,
widget: Optional[QWidget] = None,
) -> FontAndChartAwareLineEdit:
# TODO: maybe a distint layout per "field" item?
@ -287,7 +281,6 @@ class FieldsForm(QWidget):
label = self.add_field_label(name)
select = QComboBox(self)
select._key = name
for i, value in enumerate(values):
select.insertItem(i, str(value))
@ -337,19 +330,15 @@ async def handle_field_input(
# last_widget: QWidget, # had focus prior
recv_chan: trio.abc.ReceiveChannel,
fields: FieldsForm,
allocator: Allocator, # noqa
) -> None:
async for kbmsg in recv_chan:
if kbmsg.etype in {QEvent.KeyPress, QEvent.KeyRelease}:
event, etype, key, mods, txt = kbmsg.to_tuple()
print(f'key: {kbmsg.key}, mods: {kbmsg.mods}, txt: {kbmsg.txt}')
async for event, etype, key, mods, txt in recv_chan:
print(f'key: {key}, mods: {mods}, txt: {txt}')
# default controls set
ctl = False
if kbmsg.mods == Qt.ControlModifier:
if mods == Qt.ControlModifier:
ctl = True
if ctl and key in { # cancel and refocus
@ -361,14 +350,8 @@ async def handle_field_input(
widget.clearFocus()
fields.godwidget.focus()
continue
# process field input
if key in (Qt.Key_Enter, Qt.Key_Return):
value = widget.text()
key = widget._key
setattr(allocator, key, value)
print(allocator.dict())
continue
@asynccontextmanager
@ -377,54 +360,33 @@ async def open_form(
godwidget: QWidget,
parent: QWidget,
fields_schema: dict,
# alloc: Allocator,
# orientation: str = 'horizontal',
) -> FieldsForm:
fields = FieldsForm(godwidget, parent=parent)
from ._position import mk_pp_alloc
alloc = mk_pp_alloc()
fields.model = alloc
for name, config in fields_schema.items():
wtype = config['type']
label = str(config.get('label', name))
key = str(config['key'])
# plain (line) edit field
if wtype == 'edit':
w = fields.add_edit_field(
label,
config['default_value']
)
fields.add_edit_field(key, config['default_value'])
# drop-down selection
elif wtype == 'select':
values = list(config['default_value'])
w = fields.add_select_field(
label,
values
)
def write_model(text: str):
print(f'{text}')
setattr(alloc, name, text)
w.currentTextChanged.connect(write_model)
w._key = name
fields.add_select_field(key, values)
async with open_handlers(
list(fields.fields.values()),
event_types={
QEvent.KeyPress,
},
event_types={QEvent.KeyPress},
async_handler=partial(
handle_field_input,
fields=fields,
allocator=alloc,
),
# block key repeats?
@ -433,7 +395,7 @@ async def open_form(
yield fields
def mk_fill_status_bar(
def mk_health_bar(
fields: FieldsForm,
pane_vbox: QVBoxLayout,
@ -591,7 +553,7 @@ def mk_order_pane_layout(
# _, h = fields.width(), fields.height()
# print(f'w, h: {w, h}')
hbox, bar = mk_fill_status_bar(fields, pane_vbox=vbox)
hbox, bar = mk_health_bar(fields, pane_vbox=vbox)
# add pp fill bar + spacing
vbox.addLayout(hbox, stretch=1/3)

View File

@ -64,8 +64,7 @@ async def handle_viewmode_inputs(
'cc': mode.cancel_all_orders,
}
async for kbmsg in recv_chan:
event, etype, key, mods, text = kbmsg.to_tuple()
async for event, etype, key, mods, text in recv_chan:
log.debug(f'key: {key}, mods: {mods}, text: {text}')
now = time.time()
period = now - last

View File

@ -204,24 +204,15 @@ class LevelLine(pg.InfiniteLine):
def on_tracked_source(
self,
x: int,
y: float
) -> None:
'''Chart coordinates cursor tracking callback.
this is called by our ``Cursor`` type once this line is set to
track the cursor: for every movement this callback is invoked to
reposition the line
'''
# XXX: this is called by our ``Cursor`` type once this
# line is set to track the cursor: for every movement
# this callback is invoked to reposition the line
self.movable = True
self.set_level(y) # implictly calls reposition handler
self._chart.linked.godwidget.pp_config.model.get_order_info(
price=y
)
def mouseDragEvent(self, ev):
"""Override the ``InfiniteLine`` handler since we need more
detailed control and start end signalling.

View File

@ -45,7 +45,8 @@ from ._style import _font
class Position(BaseModel):
'''Basic pp (personal position) model with attached fills history.
'''Basic pp (personal position) data representation with attached
fills history.
This type should be IPC wire ready?
@ -90,9 +91,6 @@ def mk_pp_alloc(
class Allocator(BaseModel):
class Config:
validate_assignment = True
account: Account = None
_accounts: dict[str, Optional[str]] = accounts
@ -156,9 +154,11 @@ class PositionTracker:
avg_price=0,
)
self.pp_label = None
view = chart.getViewBox()
# literally the 'pp' (pee pee) label that's always in view
# literally 'pp' label that's always in view
self.pp_label = pp_label = Label(
view=view,
fmt_str='pp',

View File

@ -815,8 +815,7 @@ async def handle_keyboard_input(
)
)
async for kbmsg in recv_chan:
event, etype, key, mods, txt = kbmsg.to_tuple()
async for event, etype, key, mods, txt in recv_chan:
log.debug(f'key: {key}, mods: {mods}, txt: {txt}')
@ -824,6 +823,11 @@ async def handle_keyboard_input(
if mods == Qt.ControlModifier:
ctl = True
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
if key in (Qt.Key_Enter, Qt.Key_Return):
search.chart_current_item(clear_to_cache=True)

View File

@ -413,7 +413,7 @@ async def run_order_mode(
),
):
view = chart.view
view = chart._vb
lines = LineEditor(chart=chart)
arrows = ArrowEditor(chart, {})
@ -431,9 +431,8 @@ async def run_order_mode(
pp,
)
# TODO: create a mode "manager" of sorts?
# -> probably just call it "UxModes" err sumthin?
# so that view handlers can access it
mode.pp = pp
view.mode = mode
asset_type = symbol.type_key