Create an "order mode"

Our first major UI "mode" (yes kinda like the modes in emacs) that has
handles to a client side order book api, line and arrow editors, and
interacts with a spawned `emsd` (the EMS daemon actor).

Buncha cleaning and fixes in here for various thingers as well.
basic_alerts
Tyler Goodlet 2021-01-07 12:03:18 -05:00
parent 8d66a17daf
commit 282cc85ba0
4 changed files with 375 additions and 172 deletions

View File

@ -20,12 +20,11 @@ In suit parlance: "Execution management systems"
"""
from dataclasses import dataclass, field
from typing import (
AsyncIterator, List, Dict, Callable, Tuple,
AsyncIterator, Dict, Callable, Tuple,
Any,
)
# import uuid
import pyqtgraph as pg
import trio
from trio_typing import TaskStatus
import tractor
@ -33,32 +32,59 @@ import tractor
from . import data
from .log import get_logger
from .data._source import Symbol
from .ui._style import hcolor
log = get_logger(__name__)
_to_router: trio.abc.SendChannel = None
_from_ui: trio.abc.ReceiveChannel = None
_lines = {}
_local_book = {}
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_to_ems, _from_order_book = trio.open_memory_channel(100)
@dataclass
class OrderBoi:
"""'Buy' (client ?) side order book ctl and tracking.
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
Mostly for keeping local state to match the EMS and use
events to trigger graphics updates.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
hard/fast work of talking to brokers/exchanges to conduct
executions.
Currently, mostly for keeping local state to match the EMS and use
received events to trigger graphics updates.
"""
orders: Dict[str, dict] = field(default_factory=dict)
_cmds_from_ui: trio.abc.ReceiveChannel = _from_ui
_sent_orders: Dict[str, dict] = field(default_factory=dict)
_confirmed_orders: Dict[str, dict] = field(default_factory=dict)
async def alert(self, price: float) -> str:
...
_to_ems: trio.abc.SendChannel = _to_ems
_from_order_book: trio.abc.ReceiveChannel = _from_order_book
def on_fill(self, uuid: str) -> None:
cmd = self._sent_orders[uuid]
log.info(f"Order executed: {cmd}")
self._confirmed_orders[uuid] = cmd
def alert(
self,
uuid: str,
symbol: 'Symbol',
price: float
) -> str:
# XXX: should make this an explicit attr
# it's assigned inside ``.add_plot()``
# lc = self.view.linked_charts
# uid = str(uuid.uuid4())
cmd = {
'msg': 'alert',
'price': price,
'symbol': symbol.key,
'brokers': symbol.brokers,
'oid': uuid,
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
async def buy(self, price: float) -> str:
...
@ -66,21 +92,34 @@ class OrderBoi:
async def sell(self, price: float) -> str:
...
async def cancel(self, oid: str) -> bool:
"""Cancel an order (or alert) from the EMS.
"""
...
# higher level operations
async def transmit_to_broker(self, price: float) -> str:
...
async def modify(self, oid: str, price) -> bool:
...
async def cancel(self, oid: str) -> bool:
...
_orders: OrderBook = None
_orders: OrderBoi = None
def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook:
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
def get_orders() -> OrderBoi:
global _orders
if _orders is None:
_orders = OrderBoi
_orders = OrderBook()
return _orders
@ -91,48 +130,44 @@ async def send_order_cmds():
to downstream consumers.
This is run in the UI actor (usually the one running Qt).
The UI simply delivers order messages to the above ``_to_router``
The UI simply delivers order messages to the above ``_to_ems``
send channel (from sync code using ``.send_nowait()``), these values
are pulled from the channel here and send to any consumer(s).
This effectively makes order messages look like they're being
"pushed" from the parent to the EMS actor.
"""
global _from_ui
async for order in _from_ui:
global _from_order_book
# book = get_orders()
lc = order['chart']
symbol = lc.symbol
tp = order['type']
price = order['price']
oid = order['oid']
async for cmd in _from_order_book:
# send msg over IPC / wire
log.info(f'sending order cmd: {cmd}')
yield cmd
# lc = order['chart']
# symbol = order['symol']
# msg = order['msg']
# price = order['price']
# oid = order['oid']
print(f'oid: {oid}')
# TODO
# oid = str(uuid.uuid4())
cmd = {
'price': price,
'action': 'alert',
'symbol': symbol.key,
'brokers': symbol.brokers,
'type': tp,
'price': price,
'oid': oid,
}
# cmd = {
# 'price': price,
# 'action': 'alert',
# 'symbol': symbol.key,
# 'brokers': symbol.brokers,
# 'msg': msg,
# 'price': price,
# 'oid': oid,
# }
_local_book[oid] = cmd
yield cmd
# streaming tasks which check for conditions per symbol per broker
_scan_tasks: Dict[str, List] = {}
# levels which have an executable action (eg. alert, order, signal)
_levels: Dict[str, list] = {}
# up to date last values from target streams
_last_values: Dict[str, float] = {}
# book._sent_orders[oid] = cmd
# TODO: numba all of this
@ -146,26 +181,22 @@ def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]:
avoiding the case where the a predicate returns true immediately.
"""
# str compares:
# https://stackoverflow.com/questions/46708708/compare-strings-in-numba-compiled-function
if trigger_price >= known_last:
def check_gt(price: float) -> bool:
if price >= trigger_price:
return True
else:
return False
return price >= trigger_price
return check_gt, 'gt'
return check_gt, 'down'
elif trigger_price <= known_last:
def check_lt(price: float) -> bool:
if price <= trigger_price:
return True
else:
return False
return price <= trigger_price
return check_lt, 'lt'
return check_lt, 'up'
@dataclass
@ -173,9 +204,10 @@ class _ExecBook:
"""EMS-side execution book.
Contains conditions for executions (aka "orders").
A singleton instance is created per EMS actor.
A singleton instance is created per EMS actor (for now).
"""
# levels which have an executable action (eg. alert, order, signal)
orders: Dict[
Tuple[str, str],
Tuple[
@ -188,7 +220,7 @@ class _ExecBook:
]
] = field(default_factory=dict)
# most recent values
# tracks most recent values per symbol each from data feed
lasts: Dict[
Tuple[str, str],
float
@ -236,6 +268,11 @@ async def exec_orders(
with stream.shield():
async for quotes in stream:
##############################
# begin price actions sequence
# XXX: optimize this for speed
##############################
for sym, quote in quotes.items():
execs = book.orders.get((broker, sym))
@ -249,27 +286,36 @@ async def exec_orders(
# update to keep new cmds informed
book.lasts[(broker, symbol)] = price
# begin price actions sequence
if not execs:
continue
for oid, pred, action in tuple(execs):
for oid, pred, name, cmd in tuple(execs):
# push trigger msg back to parent as an "alert"
# (mocking for eg. a "fill")
if pred(price):
name = action(price)
await ctx.send_yield({
'type': 'alert',
'price': price,
# current shm array index
'index': feed.shm._last.value - 1,
'name': name,
'oid': oid,
})
execs.remove((oid, pred, action))
cmd['name'] = name
cmd['index'] = feed.shm._last.value - 1
# current shm array index
cmd['trigger_price'] = price
await ctx.send_yield(cmd)
# await ctx.send_yield({
# 'type': 'alert',
# 'price': price,
# # current shm array index
# 'index': feed.shm._last.value - 1,
# 'name': name,
# 'oid': oid,
# })
print(
f"GOT ALERT FOR {exec_price} @ \n{tick}\n")
print(f'removing pred for {oid}')
execs.remove((oid, pred, name, cmd))
print(f'execs are {execs}')
# feed teardown
@ -279,6 +325,10 @@ async def exec_orders(
async def stream_and_route(ctx, ui_name):
"""Order router (sub)actor entrypoint.
This is the daemon (child) side routine which starts an EMS
runtime per broker/feed and and begins streaming back alerts
from executions back to subscribers.
"""
actor = tractor.current_actor()
book = get_book()
@ -291,19 +341,18 @@ async def stream_and_route(ctx, ui_name):
async for cmd in await portal.run(send_order_cmds):
action = cmd.pop('action')
msg = cmd['msg']
if action == 'cancel':
if msg == 'cancel':
# TODO:
pass
tp = cmd.pop('type')
trigger_price = cmd['price']
sym = cmd['symbol']
brokers = cmd['brokers']
oid = cmd['oid']
if tp == 'alert':
if msg == 'alert':
log.info(f'Alert {cmd} received in {actor.uid}')
broker = brokers[0]
@ -333,14 +382,17 @@ async def stream_and_route(ctx, ui_name):
# create list of executions on first entry
book.orders.setdefault((broker, sym), []).append(
(oid, pred, lambda p: name)
(oid, pred, name, cmd)
)
# ack-respond that order is live
await ctx.send_yield({'msg': 'ack', 'oid': oid})
# continue and wait on next order cmd
async def spawn_router_stream_alerts(
chart,
order_mode,
symbol: Symbol,
# lines: 'LinesEditor',
task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
@ -349,13 +401,11 @@ async def spawn_router_stream_alerts(
alerts.
"""
# setup local ui event streaming channels
global _from_ui, _to_router, _lines
_to_router, _from_ui = trio.open_memory_channel(100)
actor = tractor.current_actor()
subactor_name = 'piker.ems'
subactor_name = 'emsd'
# TODO: add ``maybe_spawn_emsd()`` for this
async with tractor.open_nursery() as n:
portal = await n.start_actor(
@ -369,32 +419,40 @@ async def spawn_router_stream_alerts(
async with tractor.wait_for_actor(subactor_name):
# let parent task continue
task_status.started(_to_router)
task_status.started(_to_ems)
# begin the trigger-alert stream
# this is where we receive **back** messages
# about executions **from** the EMS actor
async for alert in stream:
yb = pg.mkBrush(hcolor('alert_yellow'))
angle = 90 if alert['name'] == 'lt' else -90
arrow = pg.ArrowItem(
angle=angle,
baseAngle=0,
headLen=5,
headWidth=2,
tailLen=None,
brush=yb,
)
arrow.setPos(alert['index'], alert['price'])
chart.plotItem.addItem(arrow)
# delete the line from view
oid = alert['oid']
print(f'_lines: {_lines}')
msg_type = alert['msg']
if msg_type == 'ack':
print(f"order accepted: {alert}")
# show line label once order is live
order_mode.lines.commit_line(oid)
continue
order_mode.arrows.add(
oid,
alert['index'],
alert['price'],
pointing='up' if alert['name'] == 'up' else 'down'
)
# print(f'_lines: {_lines}')
print(f'deleting line with oid: {oid}')
chart._vb._lines_editor
_lines.pop(oid).delete()
# delete level from view
order_mode.lines.remove_line(uuid=oid)
# chart._vb._lines_editor
# _lines.pop(oid).delete()
# TODO: this in another task?
# not sure if this will ever be a bottleneck,
@ -411,3 +469,6 @@ async def spawn_router_stream_alerts(
],
)
log.runtime(result)
# do we need this?
# await _from_ems.put(alert)

View File

@ -57,7 +57,7 @@ from .. import data
from ..data import maybe_open_shm_array
from ..log import get_logger
from ._exec import run_qtractor, current_screen
from ._interaction import ChartView
from ._interaction import ChartView, open_order_mode
from .. import fsp
from .._ems import spawn_router_stream_alerts
@ -301,6 +301,7 @@ class LinkedSplitCharts(QtGui.QWidget):
array=array,
parent=self.splitter,
linked_charts=self,
axisItems={
'bottom': xaxis,
'right': PriceAxis(linked_charts=self)
@ -310,9 +311,9 @@ class LinkedSplitCharts(QtGui.QWidget):
**cpw_kwargs,
)
# give viewbox a reference to primary chart
# allowing for kb controls and interactions
# (see our custom view in `._interactions.py`)
# give viewbox as reference to chart
# allowing for kb controls and interactions on **this** widget
# (see our custom view mode in `._interactions.py`)
cv.chart = cpw
cpw.plotItem.vb.linked_charts = self
@ -375,6 +376,7 @@ class ChartPlotWidget(pg.PlotWidget):
# the data view we generate graphics from
name: str,
array: np.ndarray,
linked_charts: LinkedSplitCharts,
static_yrange: Optional[Tuple[float, float]] = None,
cursor: Optional[Cursor] = None,
**kwargs,
@ -390,6 +392,7 @@ class ChartPlotWidget(pg.PlotWidget):
**kwargs
)
self.name = name
self._lc = linked_charts
# self.setViewportMargins(0, 0, 0, 0)
self._ohlc = array # readonly view of ohlc data
@ -934,17 +937,6 @@ async def _async_main(
wap_in_history,
)
# spawn EMS actor-service
router_send_chan = await n.start(
spawn_router_stream_alerts,
chart,
symbol,
)
# wait for router to come up before setting
# enabling send channel on chart
linked_charts._to_router = router_send_chan
# wait for a first quote before we start any update tasks
quote = await feed.receive()
@ -958,8 +950,26 @@ async def _async_main(
linked_charts
)
# probably where we'll eventually start the user input loop
await trio.sleep_forever()
async with open_order_mode(
chart,
) as order_mode:
# TODO: this should probably be implicitly spawned
# inside the above mngr?
# spawn EMS actor-service
to_ems_chan = await n.start(
spawn_router_stream_alerts,
order_mode,
symbol,
)
# wait for router to come up before setting
# enabling send channel on chart
linked_charts._to_ems = to_ems_chan
# probably where we'll eventually start the user input loop
await trio.sleep_forever()
async def chart_from_quotes(
@ -1019,7 +1029,7 @@ async def chart_from_quotes(
chart,
# determine precision/decimal lengths
digits=max(float_digits(last), 2),
size_digits=min(float_digits(volume), 3)
size_digits=min(float_digits(last), 3)
)
# TODO:

View File

@ -157,7 +157,7 @@ class L1Labels:
self,
chart: 'ChartPlotWidget', # noqa
digits: int = 2,
size_digits: int = 0,
size_digits: int = 3,
font_size_inches: float = _down_2_font_inches_we_like,
) -> None:

View File

@ -17,8 +17,9 @@
"""
UX interaction customs.
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, Dict, Callable
import uuid
import pyqtgraph as pg
@ -29,7 +30,7 @@ import numpy as np
from ..log import get_logger
from ._style import _min_points_to_show, hcolor, _font
from ._graphics._lines import level_line, LevelLine
from .._ems import _lines
from .._ems import get_orders, OrderBook
log = get_logger(__name__)
@ -198,10 +199,17 @@ class SelectRect(QtGui.QGraphicsRectItem):
self.hide()
# global store of order-lines graphics
# keyed by uuid4 strs - used to sync draw
# order lines **after** the order is 100%
# active in emsd
_order_lines: Dict[str, LevelLine] = {}
@dataclass
class LineEditor:
view: 'ChartView'
_lines: field(default_factory=dict)
_order_lines: field(default_factory=_order_lines)
chart: 'ChartPlotWidget' = None # type: ignore # noqa
_active_staged_line: LevelLine = None
_stage_line: LevelLine = None
@ -223,6 +231,7 @@ class LineEditor:
line = level_line(
chart,
level=y,
digits=chart._lc.symbol.digits(),
color=color,
# don't highlight the "staging" line
@ -264,46 +273,48 @@ class LineEditor:
self._stage_line.hide()
self._stage_line.label.hide()
# if line:
# line.delete()
self._active_staged_line = None
# show the crosshair y line
hl = cursor.graphics[chart]['hl']
hl.show()
def commit_line(self) -> LevelLine:
def create_line(self, uuid: str) -> LevelLine:
line = self._active_staged_line
if line:
chart = self.chart._cursor.active_plot
if not line:
raise RuntimeError("No line commit is currently staged!?")
y = chart._cursor._datum_xy[1]
chart = self.chart._cursor.active_plot
y = chart._cursor._datum_xy[1]
# XXX: should make this an explicit attr
# it's assigned inside ``.add_plot()``
lc = self.view.linked_charts
line = level_line(
chart,
level=y,
color='alert_yellow',
digits=chart._lc.symbol.digits(),
show_label=False,
)
oid = str(uuid.uuid4())
lc._to_router.send_nowait({
'chart': lc,
'type': 'alert',
'price': y,
'oid': oid,
# 'symbol': lc.chart.name,
# 'brokers': lc.symbol.brokers,
# 'price': y,
})
# register for later lookup/deletion
self._order_lines[uuid] = line
return line, y
line = level_line(
chart,
level=y,
color='alert_yellow',
)
# register for later
_lines[oid] = line
def commit_line(self, uuid: str) -> LevelLine:
"""Commit a "staged line" to view.
log.debug(f'clicked y: {y}')
Submits the line graphic under the cursor as a (new) permanent
graphic in view.
"""
line = self._order_lines[uuid]
line.label.show()
# TODO: other flashy things to indicate the order is active
log.debug(f'Level active for level: {line.value()}')
return line
def remove_line(
self,
@ -316,18 +327,114 @@ class LineEditor:
cursor position.
"""
# Delete any hoverable under the cursor
cursor = self.chart._cursor
if line:
# If line is passed delete it
line.delete()
elif uuid:
# try to look up line from our registry
self._order_lines.pop(uuid).delete()
else:
# Delete any hoverable under the cursor
cursor = self.chart._cursor
for item in cursor._hovered:
# hovered items must also offer
# a ``.delete()`` method
item.delete()
@dataclass
class ArrowEditor:
chart: 'ChartPlotWidget' # noqa
_arrows: field(default_factory=dict)
def add(
self,
uid: str,
x: float,
y: float,
color='default',
pointing: str = 'up',
) -> pg.ArrowItem:
"""Add an arrow graphic to view at given (x, y).
"""
yb = pg.mkBrush(hcolor('alert_yellow'))
angle = 90 if pointing == 'up' else -90
arrow = pg.ArrowItem(
angle=angle,
baseAngle=0,
headLen=5,
headWidth=2,
tailLen=None,
brush=yb,
)
arrow.setPos(x, y)
self._arrows[uid] = arrow
# render to view
self.chart.plotItem.addItem(arrow)
return arrow
def remove(self, arrow) -> bool:
self.chart.plotItem.removeItem(arrow)
@dataclass
class OrderMode:
"""Major mode for placing orders on a chart view.
"""
chart: 'ChartPlotWidget'
book: OrderBook
lines: LineEditor
arrows: ArrowEditor
key_map: Dict[str, Callable] = field(default_factory=dict)
def uuid(self) -> str:
return str(uuid.uuid4())
@asynccontextmanager
async def open_order_mode(
chart,
):
# global _order_lines
view = chart._vb
book = get_orders()
lines = LineEditor(view=view, _order_lines=_order_lines, chart=chart)
arrows = ArrowEditor(chart, {})
log.info("Opening order mode")
mode = OrderMode(chart, book, lines, arrows)
view.mode = mode
# # setup local ui event streaming channels for request/resp
# # streamging with EMS daemon
# global _to_ems, _from_order_book
# _to_ems, _from_order_book = trio.open_memory_channel(100)
try:
yield mode
finally:
# XXX special teardown handling like for ex.
# - cancelling orders if needed?
# - closing positions if desired?
# - switching special condition orders to safer/more reliable variants
log.info("Closing order mode")
class ChartView(ViewBox):
"""Price chart view box with interaction behaviors you'd expect from
any interactive platform:
@ -336,6 +443,7 @@ class ChartView(ViewBox):
- vertical scrolling on y-axis
- zoom on x to most recent in view datum
- zoom on right-click-n-drag to cursor position
"""
def __init__(
self,
@ -350,9 +458,11 @@ class ChartView(ViewBox):
self.addItem(self.select_box, ignoreBounds=True)
self._chart: 'ChartPlotWidget' = None # noqa
self._lines_editor = LineEditor(view=self, _lines=_lines)
# self._lines_editor = LineEditor(view=self, _lines=_lines)
self.mode = None
# kb ctrls processing
self._key_buffer = []
self._active_staged_line: LevelLine = None # noqa
@property
def chart(self) -> 'ChartPlotWidget': # type: ignore # noqa
@ -362,7 +472,7 @@ class ChartView(ViewBox):
def chart(self, chart: 'ChartPlotWidget') -> None: # type: ignore # noqa
self._chart = chart
self.select_box.chart = chart
self._lines_editor.chart = chart
# self._lines_editor.chart = chart
def wheelEvent(self, ev, axis=None):
"""Override "center-point" location for scrolling.
@ -533,8 +643,27 @@ class ChartView(ViewBox):
ev.accept()
# commit the "staged" line under the cursor
self._lines_editor.commit_line()
# self._lines_editor.commit_line()
# send order to EMS
# register the "staged" line under the cursor
# to be displayed when above order ack arrives
# (means the line graphic doesn't show on screen until the
# order is live in the emsd).
mode = self.mode
uuid = mode.uuid()
# make line graphic
line, y = mode.lines.create_line(uuid)
# send order cmd to ems
mode.book.alert(
uuid=uuid,
symbol=mode.chart._lc._symbol,
price=y
)
def keyReleaseEvent(self, ev):
"""
@ -557,7 +686,8 @@ class ChartView(ViewBox):
if text == 'a':
# draw "staged" line under cursor position
self._lines_editor.unstage_line()
# self._lines_editor.unstage_line()
self.mode.lines.unstage_line()
def keyPressEvent(self, ev):
"""
@ -580,12 +710,11 @@ class ChartView(ViewBox):
# ctl
if mods == QtCore.Qt.ControlModifier:
# print("CTRL")
# TODO: ctrl-c as cancel?
# https://forum.qt.io/topic/532/how-to-catch-ctrl-c-on-a-widget/9
# if ev.text() == 'c':
# self.rbScaleBox.hide()
pass
print(f"CTRL + key:{key} + text:{text}")
# alt
if mods == QtCore.Qt.AltModifier:
@ -603,13 +732,16 @@ class ChartView(ViewBox):
elif text == 'a':
# add a line at the current cursor
self._lines_editor.stage_line()
# self._lines_editor.stage_line()
self.mode.lines.stage_line()
elif text == 'd':
# delete any lines under the cursor
self._lines_editor.remove_line()
# self._lines_editor.remove_line()
self.mode.lines.remove_line()
# Leaving this for light reference purposes
# XXX: Leaving this for light reference purposes, there
# seems to be some work to at least gawk at for history mgmt.
# Key presses are used only when mouse mode is RectMode
# The following events are implemented: