Compare commits

..

No commits in common. "7592ae7be70c1e061aa05aa837a3465951611aeb" and "6f30ae448a44c422160fc29f17b0e57fa5bb203b" have entirely different histories.

15 changed files with 96 additions and 199 deletions

View File

@ -14,14 +14,9 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
NB: this is the old original implementation that was used way way back Real-time data feed machinery
when the project started with ``kivy``. """
This code is left for reference but will likely be merged in
appropriately and removed.
'''
import time import time
from functools import partial from functools import partial
from dataclasses import dataclass, field from dataclasses import dataclass, field

View File

@ -38,7 +38,7 @@ log = get_logger(__name__)
@dataclass @dataclass
class OrderBook: class OrderBook:
'''EMS-client-side order book ctl and tracking. """Buy-side (client-side ?) order book ctl and tracking.
A style similar to "model-view" is used here where this api is A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the provided as a supervised control for an EMS actor which does all the
@ -48,7 +48,7 @@ class OrderBook:
Currently, this is mostly for keeping local state to match the EMS Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates. and use received events to trigger graphics updates.
''' """
# mem channels used to relay order requests to the EMS daemon # mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel _to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel _from_order_book: trio.abc.ReceiveChannel

View File

@ -32,8 +32,9 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed from ..data.feed import Feed, open_feed
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from .._cacheables import maybe_open_ctx
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Status, Order, Status, Order,
@ -958,11 +959,15 @@ async def _emsd_main(
# spawn one task per broker feed # spawn one task per broker feed
async with ( async with (
maybe_open_feed( maybe_open_ctx(
key=(broker, symbol),
mngr=open_feed(
broker, broker,
[symbol], [symbol],
loglevel=loglevel, loglevel=loglevel,
) as (feed, stream), ),
loglevel=loglevel,
) as feed,
): ):
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
@ -1006,7 +1011,7 @@ async def _emsd_main(
brokerd_stream, brokerd_stream,
ems_client_order_stream, ems_client_order_stream,
stream, feed.stream,
broker, broker,
symbol, symbol,
book book

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
NumPy compatible shared memory buffers for real-time IPC streaming. NumPy compatible shared memory buffers for real-time FSP.
""" """
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
@ -207,16 +207,11 @@ class ShmArray:
def push( def push(
self, self,
data: np.ndarray, data: np.ndarray,
prepend: bool = False, prepend: bool = False,
) -> int: ) -> int:
'''Ring buffer like "push" to append data """Ring buffer like "push" to append data
into the buffer and return updated "last" index. into the buffer and return updated "last" index.
"""
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
length = len(data) length = len(data)
if prepend: if prepend:

View File

@ -441,7 +441,7 @@ async def open_feed(
tick_throttle: Optional[float] = None, # Hz tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False, shielded_stream: bool = False,
) -> Feed: ) -> ReceiveChannel[dict[str, Any]]:
''' '''
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
@ -522,7 +522,7 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
try: try:
yield feed yield feed, bstream
finally: finally:
# drop the infinite stream connection # drop the infinite stream connection
await ctx.cancel() await ctx.cancel()
@ -538,7 +538,7 @@ async def maybe_open_feed(
tick_throttle: Optional[float] = None, # Hz tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False, shielded_stream: bool = False,
) -> (Feed, ReceiveChannel[dict[str, Any]]): ) -> ReceiveChannel[dict[str, Any]]:
'''Maybe open a data to a ``brokerd`` daemon only if there is no '''Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver. in a tractor broadcast receiver.
@ -553,12 +553,12 @@ async def maybe_open_feed(
[sym], [sym],
loglevel=loglevel, loglevel=loglevel,
), ),
) as (cache_hit, feed): ) as (cache_hit, (feed, stream)):
if cache_hit: if cache_hit:
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
# if this feed is likely already in use # if this feed is likely already in use
async with feed.stream.subscribe() as bstream: async with stream.subscribe() as bstream:
yield feed, bstream yield feed, bstream
else: else:
yield feed, stream yield feed, stream

View File

@ -69,7 +69,6 @@ async def fsp_compute(
ctx: tractor.Context, ctx: tractor.Context,
symbol: str, symbol: str,
feed: Feed, feed: Feed,
stream: trio.abc.ReceiveChannel,
src: ShmArray, src: ShmArray,
dst: ShmArray, dst: ShmArray,
@ -94,14 +93,14 @@ async def fsp_compute(
yield {} yield {}
# task cancellation won't kill the channel # task cancellation won't kill the channel
# since we shielded at the `open_feed()` call with stream.shield():
async for quotes in stream: async for quotes in stream:
for symbol, quotes in quotes.items(): for symbol, quotes in quotes.items():
if symbol == sym: if symbol == sym:
yield quotes yield quotes
out_stream = func( out_stream = func(
filter_by_sym(symbol, stream), filter_by_sym(symbol, feed.stream),
feed.shm, feed.shm,
) )
@ -165,8 +164,7 @@ async def cascade(
dst_shm_token: Tuple[str, np.dtype], dst_shm_token: Tuple[str, np.dtype],
symbol: str, symbol: str,
fsp_func_name: str, fsp_func_name: str,
) -> AsyncIterator[dict]:
) -> None:
"""Chain streaming signal processors and deliver output to """Chain streaming signal processors and deliver output to
destination mem buf. destination mem buf.
@ -177,11 +175,7 @@ async def cascade(
func: Callable = _fsps[fsp_func_name] func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker # open a data feed stream with requested broker
async with data.feed.maybe_open_feed( async with data.open_feed(brokername, [symbol]) as feed:
brokername,
[symbol],
shielded_stream=True,
) as (feed, stream):
assert src.token == feed.shm.token assert src.token == feed.shm.token
@ -192,7 +186,6 @@ async def cascade(
ctx=ctx, ctx=ctx,
symbol=symbol, symbol=symbol,
feed=feed, feed=feed,
stream=stream,
src=src, src=src,
dst=dst, dst=dst,

View File

@ -344,7 +344,9 @@ class LinkedSplits(QWidget):
def focus(self) -> None: def focus(self) -> None:
if self.chart is not None: if self.chart is not None:
print("FOCUSSING CHART")
self.chart.focus() self.chart.focus()
# self.chart.parent().show()
def unfocus(self) -> None: def unfocus(self) -> None:
if self.chart is not None: if self.chart is not None:
@ -388,8 +390,8 @@ class LinkedSplits(QWidget):
# style? # style?
self.chart.setFrameStyle( self.chart.setFrameStyle(
QFrame.StyledPanel | QtWidgets.QFrame.StyledPanel |
QFrame.Plain QtWidgets.QFrame.Plain
) )
return self.chart return self.chart
@ -1062,7 +1064,7 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev) self.scene().leaveEvent(ev)
_clear_throttle_rate: int = 60 # Hz _clear_throttle_rate: int = 35 # Hz
_book_throttle_rate: int = 16 # Hz _book_throttle_rate: int = 16 # Hz
@ -1392,14 +1394,14 @@ async def run_fsp(
parent=linkedsplits.godwidget, parent=linkedsplits.godwidget,
fields_schema={ fields_schema={
'name': { 'name': {
'label': '**fsp**:', 'key': '**fsp**:',
'type': 'select', 'type': 'select',
'default_value': [ 'default_value': [
f'{display_name}' f'{display_name}'
], ],
}, },
'period': { 'period': {
'label': '**period**:', 'key': '**period**:',
'type': 'edit', 'type': 'edit',
'default_value': 14, 'default_value': 14,
}, },
@ -1635,7 +1637,8 @@ async def display_symbol_data(
# ) # )
async with( async with(
data.feed.open_feed(
data.open_feed(
provider, provider,
[sym], [sym],
loglevel=loglevel, loglevel=loglevel,
@ -1644,21 +1647,8 @@ async def display_symbol_data(
tick_throttle=_clear_throttle_rate, tick_throttle=_clear_throttle_rate,
) as feed, ) as feed,
trio.open_nursery() as n,
):
async def print_quotes():
async with feed.stream.subscribe() as bstream:
last_tick = time.time()
async for quotes in bstream:
now = time.time()
period = now - last_tick
for sym, quote in quotes.items():
ticks = quote.get('ticks', ())
if ticks:
print(f'{1/period} Hz')
last_tick = time.time()
n.start_soon(print_quotes) ):
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
bars = ohlcv.array bars = ohlcv.array
@ -1839,8 +1829,6 @@ async def _async_main(
sbar = godwidget.window.status_bar sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz') starting_done = sbar.open_status('starting ze sexy chartz')
# generate order mode side-pane UI
async with ( async with (
trio.open_nursery() as root_n, trio.open_nursery() as root_n,
@ -1850,6 +1838,7 @@ async def _async_main(
parent=godwidget, parent=godwidget,
fields_schema={ fields_schema={
'account': { 'account': {
'key': '**account**:',
'type': 'select', 'type': 'select',
'default_value': [ 'default_value': [
'paper', 'paper',
@ -1857,8 +1846,8 @@ async def _async_main(
# 'ib.paper', # 'ib.paper',
], ],
}, },
'size_unit': { 'allocator': {
'label': '**allocate**:', 'key': '**allocate**:',
'type': 'select', 'type': 'select',
'default_value': [ 'default_value': [
'$ size', '$ size',
@ -1866,17 +1855,18 @@ async def _async_main(
'# shares' '# shares'
], ],
}, },
'disti_weight': { 'disti_policy': {
'label': '**weight**:', 'key': '**weight**:',
'type': 'select', 'type': 'select',
'default_value': ['uniform'], 'default_value': ['uniform'],
}, },
'size': { 'dollar_size': {
'label': '**size**:', 'key': '**$size**:',
'type': 'edit', 'type': 'edit',
'default_value': 5000, 'default_value': '5k',
}, },
'slots': { 'slots': {
'key': '**slots**:',
'type': 'edit', 'type': 'edit',
'default_value': 4, 'default_value': 4,
}, },

View File

@ -25,35 +25,6 @@ from PyQt5 import QtCore
from PyQt5.QtCore import QEvent from PyQt5.QtCore import QEvent
from PyQt5.QtWidgets import QWidget from PyQt5.QtWidgets import QWidget
import trio import trio
from pydantic import BaseModel
# TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel):
'''Unpacked Qt keyboard event data.
'''
event: QEvent
etype: int
key: int
mods: int
txt: str
class Config:
arbitrary_types_allowed = True
def to_tuple(self) -> tuple:
return tuple(self.dict().values())
# TODO: maybe add some methods to detect key combos? Or is that gonna be
# better with pattern matching?
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
class EventRelay(QtCore.QObject): class EventRelay(QtCore.QObject):
@ -96,26 +67,22 @@ class EventRelay(QtCore.QObject):
if etype in {QEvent.KeyPress, QEvent.KeyRelease}: if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
msg = KeyboardMsg(
event=ev,
etype=ev.type(),
key=ev.key(),
mods=ev.modifiers(),
txt=ev.text(),
)
# TODO: is there a global setting for this? # TODO: is there a global setting for this?
if ev.isAutoRepeat() and self._filter_auto_repeats: if ev.isAutoRepeat() and self._filter_auto_repeats:
ev.ignore() ev.ignore()
return True return True
key = ev.key()
mods = ev.modifiers()
txt = ev.text()
# NOTE: the event object instance coming out # NOTE: the event object instance coming out
# the other side is mutated since Qt resumes event # the other side is mutated since Qt resumes event
# processing **before** running a ``trio`` guest mode # processing **before** running a ``trio`` guest mode
# tick, thus special handling or copying must be done. # tick, thus special handling or copying must be done.
# send keyboard msg to async handler # send elements to async handler
self._send_chan.send_nowait(msg) self._send_chan.send_nowait((ev, etype, key, mods, txt))
else: else:
# send event to async handler # send event to async handler

View File

@ -99,9 +99,6 @@ def run_qtractor(
# "This is substantially faster than using a signal... for some # "This is substantially faster than using a signal... for some
# reason Qt signal dispatch is really slow (and relies on events # reason Qt signal dispatch is really slow (and relies on events
# underneath anyway, so this is strictly less work)." # underneath anyway, so this is strictly less work)."
# source gist and credit to njs:
# https://gist.github.com/njsmith/d996e80b700a339e0623f97f48bcf0cb
REENTER_EVENT = QtCore.QEvent.Type(QtCore.QEvent.registerEventType()) REENTER_EVENT = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
class ReenterEvent(QtCore.QEvent): class ReenterEvent(QtCore.QEvent):

View File

@ -18,7 +18,6 @@
Text entry "forms" widgets (mostly for configuration and UI user input). Text entry "forms" widgets (mostly for configuration and UI user input).
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial from functools import partial
from textwrap import dedent from textwrap import dedent
@ -154,13 +153,6 @@ class FontScaledDelegate(QStyledItemDelegate):
return super().sizeHint(option, index) return super().sizeHint(option, index)
# slew of resources which helped get this where it is:
# https://stackoverflow.com/questions/20648210/qcombobox-adjusttocontents-changing-height
# https://stackoverflow.com/questions/3151798/how-do-i-set-the-qcombobox-width-to-fit-the-largest-item
# https://stackoverflow.com/questions/6337589/qlistwidget-adjust-size-to-content#6370892
# https://stackoverflow.com/questions/25304267/qt-resize-of-qlistview
# https://stackoverflow.com/questions/28227406/how-to-set-qlistview-rows-height-permanently
class FieldsForm(QWidget): class FieldsForm(QWidget):
godwidget: 'GodWidget' # noqa godwidget: 'GodWidget' # noqa
@ -253,6 +245,8 @@ class FieldsForm(QWidget):
name: str, name: str,
value: str, value: str,
widget: Optional[QWidget] = None,
) -> FontAndChartAwareLineEdit: ) -> FontAndChartAwareLineEdit:
# TODO: maybe a distint layout per "field" item? # TODO: maybe a distint layout per "field" item?
@ -287,7 +281,6 @@ class FieldsForm(QWidget):
label = self.add_field_label(name) label = self.add_field_label(name)
select = QComboBox(self) select = QComboBox(self)
select._key = name
for i, value in enumerate(values): for i, value in enumerate(values):
select.insertItem(i, str(value)) select.insertItem(i, str(value))
@ -337,19 +330,15 @@ async def handle_field_input(
# last_widget: QWidget, # had focus prior # last_widget: QWidget, # had focus prior
recv_chan: trio.abc.ReceiveChannel, recv_chan: trio.abc.ReceiveChannel,
fields: FieldsForm, fields: FieldsForm,
allocator: Allocator, # noqa
) -> None: ) -> None:
async for kbmsg in recv_chan: async for event, etype, key, mods, txt in recv_chan:
print(f'key: {key}, mods: {mods}, txt: {txt}')
if kbmsg.etype in {QEvent.KeyPress, QEvent.KeyRelease}:
event, etype, key, mods, txt = kbmsg.to_tuple()
print(f'key: {kbmsg.key}, mods: {kbmsg.mods}, txt: {kbmsg.txt}')
# default controls set # default controls set
ctl = False ctl = False
if kbmsg.mods == Qt.ControlModifier: if mods == Qt.ControlModifier:
ctl = True ctl = True
if ctl and key in { # cancel and refocus if ctl and key in { # cancel and refocus
@ -361,14 +350,8 @@ async def handle_field_input(
widget.clearFocus() widget.clearFocus()
fields.godwidget.focus() fields.godwidget.focus()
continue
# process field input continue
if key in (Qt.Key_Enter, Qt.Key_Return):
value = widget.text()
key = widget._key
setattr(allocator, key, value)
print(allocator.dict())
@asynccontextmanager @asynccontextmanager
@ -377,54 +360,33 @@ async def open_form(
godwidget: QWidget, godwidget: QWidget,
parent: QWidget, parent: QWidget,
fields_schema: dict, fields_schema: dict,
# alloc: Allocator,
# orientation: str = 'horizontal', # orientation: str = 'horizontal',
) -> FieldsForm: ) -> FieldsForm:
fields = FieldsForm(godwidget, parent=parent) fields = FieldsForm(godwidget, parent=parent)
from ._position import mk_pp_alloc
alloc = mk_pp_alloc()
fields.model = alloc
for name, config in fields_schema.items(): for name, config in fields_schema.items():
wtype = config['type'] wtype = config['type']
label = str(config.get('label', name)) key = str(config['key'])
# plain (line) edit field # plain (line) edit field
if wtype == 'edit': if wtype == 'edit':
w = fields.add_edit_field( fields.add_edit_field(key, config['default_value'])
label,
config['default_value']
)
# drop-down selection # drop-down selection
elif wtype == 'select': elif wtype == 'select':
values = list(config['default_value']) values = list(config['default_value'])
w = fields.add_select_field( fields.add_select_field(key, values)
label,
values
)
def write_model(text: str):
print(f'{text}')
setattr(alloc, name, text)
w.currentTextChanged.connect(write_model)
w._key = name
async with open_handlers( async with open_handlers(
list(fields.fields.values()), list(fields.fields.values()),
event_types={ event_types={QEvent.KeyPress},
QEvent.KeyPress,
},
async_handler=partial( async_handler=partial(
handle_field_input, handle_field_input,
fields=fields, fields=fields,
allocator=alloc,
), ),
# block key repeats? # block key repeats?
@ -433,7 +395,7 @@ async def open_form(
yield fields yield fields
def mk_fill_status_bar( def mk_health_bar(
fields: FieldsForm, fields: FieldsForm,
pane_vbox: QVBoxLayout, pane_vbox: QVBoxLayout,
@ -591,7 +553,7 @@ def mk_order_pane_layout(
# _, h = fields.width(), fields.height() # _, h = fields.width(), fields.height()
# print(f'w, h: {w, h}') # print(f'w, h: {w, h}')
hbox, bar = mk_fill_status_bar(fields, pane_vbox=vbox) hbox, bar = mk_health_bar(fields, pane_vbox=vbox)
# add pp fill bar + spacing # add pp fill bar + spacing
vbox.addLayout(hbox, stretch=1/3) vbox.addLayout(hbox, stretch=1/3)

View File

@ -64,8 +64,7 @@ async def handle_viewmode_inputs(
'cc': mode.cancel_all_orders, 'cc': mode.cancel_all_orders,
} }
async for kbmsg in recv_chan: async for event, etype, key, mods, text in recv_chan:
event, etype, key, mods, text = kbmsg.to_tuple()
log.debug(f'key: {key}, mods: {mods}, text: {text}') log.debug(f'key: {key}, mods: {mods}, text: {text}')
now = time.time() now = time.time()
period = now - last period = now - last

View File

@ -204,24 +204,15 @@ class LevelLine(pg.InfiniteLine):
def on_tracked_source( def on_tracked_source(
self, self,
x: int, x: int,
y: float y: float
) -> None: ) -> None:
'''Chart coordinates cursor tracking callback. # XXX: this is called by our ``Cursor`` type once this
# line is set to track the cursor: for every movement
this is called by our ``Cursor`` type once this line is set to # this callback is invoked to reposition the line
track the cursor: for every movement this callback is invoked to
reposition the line
'''
self.movable = True self.movable = True
self.set_level(y) # implictly calls reposition handler self.set_level(y) # implictly calls reposition handler
self._chart.linked.godwidget.pp_config.model.get_order_info(
price=y
)
def mouseDragEvent(self, ev): def mouseDragEvent(self, ev):
"""Override the ``InfiniteLine`` handler since we need more """Override the ``InfiniteLine`` handler since we need more
detailed control and start end signalling. detailed control and start end signalling.

View File

@ -45,7 +45,8 @@ from ._style import _font
class Position(BaseModel): class Position(BaseModel):
'''Basic pp (personal position) model with attached fills history. '''Basic pp (personal position) data representation with attached
fills history.
This type should be IPC wire ready? This type should be IPC wire ready?
@ -90,9 +91,6 @@ def mk_pp_alloc(
class Allocator(BaseModel): class Allocator(BaseModel):
class Config:
validate_assignment = True
account: Account = None account: Account = None
_accounts: dict[str, Optional[str]] = accounts _accounts: dict[str, Optional[str]] = accounts
@ -156,9 +154,11 @@ class PositionTracker:
avg_price=0, avg_price=0,
) )
self.pp_label = None
view = chart.getViewBox() view = chart.getViewBox()
# literally the 'pp' (pee pee) label that's always in view # literally 'pp' label that's always in view
self.pp_label = pp_label = Label( self.pp_label = pp_label = Label(
view=view, view=view,
fmt_str='pp', fmt_str='pp',

View File

@ -815,8 +815,7 @@ async def handle_keyboard_input(
) )
) )
async for kbmsg in recv_chan: async for event, etype, key, mods, txt in recv_chan:
event, etype, key, mods, txt = kbmsg.to_tuple()
log.debug(f'key: {key}, mods: {mods}, txt: {txt}') log.debug(f'key: {key}, mods: {mods}, txt: {txt}')
@ -824,6 +823,11 @@ async def handle_keyboard_input(
if mods == Qt.ControlModifier: if mods == Qt.ControlModifier:
ctl = True ctl = True
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
if key in (Qt.Key_Enter, Qt.Key_Return): if key in (Qt.Key_Enter, Qt.Key_Return):
search.chart_current_item(clear_to_cache=True) search.chart_current_item(clear_to_cache=True)

View File

@ -413,7 +413,7 @@ async def run_order_mode(
), ),
): ):
view = chart.view view = chart._vb
lines = LineEditor(chart=chart) lines = LineEditor(chart=chart)
arrows = ArrowEditor(chart, {}) arrows = ArrowEditor(chart, {})
@ -431,9 +431,8 @@ async def run_order_mode(
pp, pp,
) )
# TODO: create a mode "manager" of sorts?
# -> probably just call it "UxModes" err sumthin?
# so that view handlers can access it # so that view handlers can access it
mode.pp = pp
view.mode = mode view.mode = mode
asset_type = symbol.type_key asset_type = symbol.type_key