diff --git a/piker/_ems.py b/piker/_ems.py index 93175e12..50578da6 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -20,12 +20,11 @@ In suit parlance: "Execution management systems" """ from dataclasses import dataclass, field from typing import ( - AsyncIterator, List, Dict, Callable, Tuple, + AsyncIterator, Dict, Callable, Tuple, Any, ) # import uuid -import pyqtgraph as pg import trio from trio_typing import TaskStatus import tractor @@ -33,32 +32,59 @@ import tractor from . import data from .log import get_logger from .data._source import Symbol -from .ui._style import hcolor log = get_logger(__name__) -_to_router: trio.abc.SendChannel = None -_from_ui: trio.abc.ReceiveChannel = None -_lines = {} - - -_local_book = {} +# setup local ui event streaming channels for request/resp +# streamging with EMS daemon +_to_ems, _from_order_book = trio.open_memory_channel(100) @dataclass -class OrderBoi: - """'Buy' (client ?) side order book ctl and tracking. +class OrderBook: + """Buy-side (client-side ?) order book ctl and tracking. - Mostly for keeping local state to match the EMS and use - events to trigger graphics updates. + A style similar to "model-view" is used here where this api is + provided as a supervised control for an EMS actor which does all the + hard/fast work of talking to brokers/exchanges to conduct + executions. + + Currently, mostly for keeping local state to match the EMS and use + received events to trigger graphics updates. """ - orders: Dict[str, dict] = field(default_factory=dict) - _cmds_from_ui: trio.abc.ReceiveChannel = _from_ui + _sent_orders: Dict[str, dict] = field(default_factory=dict) + _confirmed_orders: Dict[str, dict] = field(default_factory=dict) - async def alert(self, price: float) -> str: - ... + _to_ems: trio.abc.SendChannel = _to_ems + _from_order_book: trio.abc.ReceiveChannel = _from_order_book + + def on_fill(self, uuid: str) -> None: + cmd = self._sent_orders[uuid] + log.info(f"Order executed: {cmd}") + self._confirmed_orders[uuid] = cmd + + def alert( + self, + uuid: str, + symbol: 'Symbol', + price: float + ) -> str: + # XXX: should make this an explicit attr + # it's assigned inside ``.add_plot()`` + # lc = self.view.linked_charts + + # uid = str(uuid.uuid4()) + cmd = { + 'msg': 'alert', + 'price': price, + 'symbol': symbol.key, + 'brokers': symbol.brokers, + 'oid': uuid, + } + self._sent_orders[uuid] = cmd + self._to_ems.send_nowait(cmd) async def buy(self, price: float) -> str: ... @@ -66,21 +92,34 @@ class OrderBoi: async def sell(self, price: float) -> str: ... + async def cancel(self, oid: str) -> bool: + """Cancel an order (or alert) from the EMS. + + """ + ... + + # higher level operations + + async def transmit_to_broker(self, price: float) -> str: + ... + async def modify(self, oid: str, price) -> bool: ... - async def cancel(self, oid: str) -> bool: - ... + +_orders: OrderBook = None -_orders: OrderBoi = None +def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook: + if emsd_uid is not None: + # TODO: read in target emsd's active book on startup + pass -def get_orders() -> OrderBoi: global _orders if _orders is None: - _orders = OrderBoi + _orders = OrderBook() return _orders @@ -91,48 +130,44 @@ async def send_order_cmds(): to downstream consumers. This is run in the UI actor (usually the one running Qt). - The UI simply delivers order messages to the above ``_to_router`` + The UI simply delivers order messages to the above ``_to_ems`` send channel (from sync code using ``.send_nowait()``), these values are pulled from the channel here and send to any consumer(s). + This effectively makes order messages look like they're being + "pushed" from the parent to the EMS actor. + """ - global _from_ui - async for order in _from_ui: + global _from_order_book + # book = get_orders() - lc = order['chart'] - symbol = lc.symbol - tp = order['type'] - price = order['price'] - oid = order['oid'] + async for cmd in _from_order_book: + + # send msg over IPC / wire + log.info(f'sending order cmd: {cmd}') + yield cmd + + # lc = order['chart'] + # symbol = order['symol'] + # msg = order['msg'] + # price = order['price'] + # oid = order['oid'] - print(f'oid: {oid}') # TODO # oid = str(uuid.uuid4()) - cmd = { - 'price': price, - 'action': 'alert', - 'symbol': symbol.key, - 'brokers': symbol.brokers, - 'type': tp, - 'price': price, - 'oid': oid, - } + # cmd = { + # 'price': price, + # 'action': 'alert', + # 'symbol': symbol.key, + # 'brokers': symbol.brokers, + # 'msg': msg, + # 'price': price, + # 'oid': oid, + # } - _local_book[oid] = cmd - - yield cmd - - -# streaming tasks which check for conditions per symbol per broker -_scan_tasks: Dict[str, List] = {} - -# levels which have an executable action (eg. alert, order, signal) -_levels: Dict[str, list] = {} - -# up to date last values from target streams -_last_values: Dict[str, float] = {} + # book._sent_orders[oid] = cmd # TODO: numba all of this @@ -146,26 +181,22 @@ def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: avoiding the case where the a predicate returns true immediately. """ + # str compares: + # https://stackoverflow.com/questions/46708708/compare-strings-in-numba-compiled-function if trigger_price >= known_last: def check_gt(price: float) -> bool: - if price >= trigger_price: - return True - else: - return False + return price >= trigger_price - return check_gt, 'gt' + return check_gt, 'down' elif trigger_price <= known_last: def check_lt(price: float) -> bool: - if price <= trigger_price: - return True - else: - return False + return price <= trigger_price - return check_lt, 'lt' + return check_lt, 'up' @dataclass @@ -173,9 +204,10 @@ class _ExecBook: """EMS-side execution book. Contains conditions for executions (aka "orders"). - A singleton instance is created per EMS actor. + A singleton instance is created per EMS actor (for now). """ + # levels which have an executable action (eg. alert, order, signal) orders: Dict[ Tuple[str, str], Tuple[ @@ -188,7 +220,7 @@ class _ExecBook: ] ] = field(default_factory=dict) - # most recent values + # tracks most recent values per symbol each from data feed lasts: Dict[ Tuple[str, str], float @@ -236,6 +268,11 @@ async def exec_orders( with stream.shield(): async for quotes in stream: + ############################## + # begin price actions sequence + # XXX: optimize this for speed + ############################## + for sym, quote in quotes.items(): execs = book.orders.get((broker, sym)) @@ -249,27 +286,36 @@ async def exec_orders( # update to keep new cmds informed book.lasts[(broker, symbol)] = price - # begin price actions sequence - if not execs: continue - for oid, pred, action in tuple(execs): + for oid, pred, name, cmd in tuple(execs): + # push trigger msg back to parent as an "alert" # (mocking for eg. a "fill") if pred(price): - name = action(price) - await ctx.send_yield({ - 'type': 'alert', - 'price': price, - # current shm array index - 'index': feed.shm._last.value - 1, - 'name': name, - 'oid': oid, - }) - execs.remove((oid, pred, action)) + + cmd['name'] = name + cmd['index'] = feed.shm._last.value - 1 + # current shm array index + cmd['trigger_price'] = price + + await ctx.send_yield(cmd) + # await ctx.send_yield({ + # 'type': 'alert', + # 'price': price, + # # current shm array index + # 'index': feed.shm._last.value - 1, + # 'name': name, + # 'oid': oid, + # }) + print( f"GOT ALERT FOR {exec_price} @ \n{tick}\n") + + print(f'removing pred for {oid}') + execs.remove((oid, pred, name, cmd)) + print(f'execs are {execs}') # feed teardown @@ -279,6 +325,10 @@ async def exec_orders( async def stream_and_route(ctx, ui_name): """Order router (sub)actor entrypoint. + This is the daemon (child) side routine which starts an EMS + runtime per broker/feed and and begins streaming back alerts + from executions back to subscribers. + """ actor = tractor.current_actor() book = get_book() @@ -291,19 +341,18 @@ async def stream_and_route(ctx, ui_name): async for cmd in await portal.run(send_order_cmds): - action = cmd.pop('action') + msg = cmd['msg'] - if action == 'cancel': + if msg == 'cancel': + # TODO: pass - tp = cmd.pop('type') - trigger_price = cmd['price'] sym = cmd['symbol'] brokers = cmd['brokers'] oid = cmd['oid'] - if tp == 'alert': + if msg == 'alert': log.info(f'Alert {cmd} received in {actor.uid}') broker = brokers[0] @@ -333,14 +382,17 @@ async def stream_and_route(ctx, ui_name): # create list of executions on first entry book.orders.setdefault((broker, sym), []).append( - (oid, pred, lambda p: name) + (oid, pred, name, cmd) ) + # ack-respond that order is live + await ctx.send_yield({'msg': 'ack', 'oid': oid}) + # continue and wait on next order cmd async def spawn_router_stream_alerts( - chart, + order_mode, symbol: Symbol, # lines: 'LinesEditor', task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, @@ -349,13 +401,11 @@ async def spawn_router_stream_alerts( alerts. """ - # setup local ui event streaming channels - global _from_ui, _to_router, _lines - _to_router, _from_ui = trio.open_memory_channel(100) actor = tractor.current_actor() - subactor_name = 'piker.ems' + subactor_name = 'emsd' + # TODO: add ``maybe_spawn_emsd()`` for this async with tractor.open_nursery() as n: portal = await n.start_actor( @@ -369,32 +419,40 @@ async def spawn_router_stream_alerts( async with tractor.wait_for_actor(subactor_name): # let parent task continue - task_status.started(_to_router) + task_status.started(_to_ems) + # begin the trigger-alert stream + # this is where we receive **back** messages + # about executions **from** the EMS actor async for alert in stream: - yb = pg.mkBrush(hcolor('alert_yellow')) - - angle = 90 if alert['name'] == 'lt' else -90 - - arrow = pg.ArrowItem( - angle=angle, - baseAngle=0, - headLen=5, - headWidth=2, - tailLen=None, - brush=yb, - ) - arrow.setPos(alert['index'], alert['price']) - chart.plotItem.addItem(arrow) - # delete the line from view oid = alert['oid'] - print(f'_lines: {_lines}') + msg_type = alert['msg'] + + if msg_type == 'ack': + print(f"order accepted: {alert}") + + # show line label once order is live + order_mode.lines.commit_line(oid) + + continue + + order_mode.arrows.add( + oid, + alert['index'], + alert['price'], + pointing='up' if alert['name'] == 'up' else 'down' + ) + + # print(f'_lines: {_lines}') print(f'deleting line with oid: {oid}') - chart._vb._lines_editor - _lines.pop(oid).delete() + # delete level from view + order_mode.lines.remove_line(uuid=oid) + + # chart._vb._lines_editor + # _lines.pop(oid).delete() # TODO: this in another task? # not sure if this will ever be a bottleneck, @@ -411,3 +469,6 @@ async def spawn_router_stream_alerts( ], ) log.runtime(result) + + # do we need this? + # await _from_ems.put(alert) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index c743b5f0..39211e61 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -57,7 +57,7 @@ from .. import data from ..data import maybe_open_shm_array from ..log import get_logger from ._exec import run_qtractor, current_screen -from ._interaction import ChartView +from ._interaction import ChartView, open_order_mode from .. import fsp from .._ems import spawn_router_stream_alerts @@ -301,6 +301,7 @@ class LinkedSplitCharts(QtGui.QWidget): array=array, parent=self.splitter, + linked_charts=self, axisItems={ 'bottom': xaxis, 'right': PriceAxis(linked_charts=self) @@ -310,9 +311,9 @@ class LinkedSplitCharts(QtGui.QWidget): **cpw_kwargs, ) - # give viewbox a reference to primary chart - # allowing for kb controls and interactions - # (see our custom view in `._interactions.py`) + # give viewbox as reference to chart + # allowing for kb controls and interactions on **this** widget + # (see our custom view mode in `._interactions.py`) cv.chart = cpw cpw.plotItem.vb.linked_charts = self @@ -375,6 +376,7 @@ class ChartPlotWidget(pg.PlotWidget): # the data view we generate graphics from name: str, array: np.ndarray, + linked_charts: LinkedSplitCharts, static_yrange: Optional[Tuple[float, float]] = None, cursor: Optional[Cursor] = None, **kwargs, @@ -390,6 +392,7 @@ class ChartPlotWidget(pg.PlotWidget): **kwargs ) self.name = name + self._lc = linked_charts # self.setViewportMargins(0, 0, 0, 0) self._ohlc = array # readonly view of ohlc data @@ -934,17 +937,6 @@ async def _async_main( wap_in_history, ) - # spawn EMS actor-service - router_send_chan = await n.start( - spawn_router_stream_alerts, - chart, - symbol, - ) - - # wait for router to come up before setting - # enabling send channel on chart - linked_charts._to_router = router_send_chan - # wait for a first quote before we start any update tasks quote = await feed.receive() @@ -958,8 +950,26 @@ async def _async_main( linked_charts ) - # probably where we'll eventually start the user input loop - await trio.sleep_forever() + async with open_order_mode( + chart, + ) as order_mode: + + # TODO: this should probably be implicitly spawned + # inside the above mngr? + + # spawn EMS actor-service + to_ems_chan = await n.start( + spawn_router_stream_alerts, + order_mode, + symbol, + ) + + # wait for router to come up before setting + # enabling send channel on chart + linked_charts._to_ems = to_ems_chan + + # probably where we'll eventually start the user input loop + await trio.sleep_forever() async def chart_from_quotes( @@ -1019,7 +1029,7 @@ async def chart_from_quotes( chart, # determine precision/decimal lengths digits=max(float_digits(last), 2), - size_digits=min(float_digits(volume), 3) + size_digits=min(float_digits(last), 3) ) # TODO: diff --git a/piker/ui/_graphics/_lines.py b/piker/ui/_graphics/_lines.py index c6bf1818..b697692b 100644 --- a/piker/ui/_graphics/_lines.py +++ b/piker/ui/_graphics/_lines.py @@ -157,7 +157,7 @@ class L1Labels: self, chart: 'ChartPlotWidget', # noqa digits: int = 2, - size_digits: int = 0, + size_digits: int = 3, font_size_inches: float = _down_2_font_inches_we_like, ) -> None: diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index fdaeb137..ba779c34 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -17,8 +17,9 @@ """ UX interaction customs. """ +from contextlib import asynccontextmanager from dataclasses import dataclass, field -from typing import Optional +from typing import Optional, Dict, Callable import uuid import pyqtgraph as pg @@ -29,7 +30,7 @@ import numpy as np from ..log import get_logger from ._style import _min_points_to_show, hcolor, _font from ._graphics._lines import level_line, LevelLine -from .._ems import _lines +from .._ems import get_orders, OrderBook log = get_logger(__name__) @@ -198,10 +199,17 @@ class SelectRect(QtGui.QGraphicsRectItem): self.hide() +# global store of order-lines graphics +# keyed by uuid4 strs - used to sync draw +# order lines **after** the order is 100% +# active in emsd +_order_lines: Dict[str, LevelLine] = {} + + @dataclass class LineEditor: view: 'ChartView' - _lines: field(default_factory=dict) + _order_lines: field(default_factory=_order_lines) chart: 'ChartPlotWidget' = None # type: ignore # noqa _active_staged_line: LevelLine = None _stage_line: LevelLine = None @@ -223,6 +231,7 @@ class LineEditor: line = level_line( chart, level=y, + digits=chart._lc.symbol.digits(), color=color, # don't highlight the "staging" line @@ -264,46 +273,48 @@ class LineEditor: self._stage_line.hide() self._stage_line.label.hide() - # if line: - # line.delete() - self._active_staged_line = None # show the crosshair y line hl = cursor.graphics[chart]['hl'] hl.show() - def commit_line(self) -> LevelLine: + def create_line(self, uuid: str) -> LevelLine: + line = self._active_staged_line - if line: - chart = self.chart._cursor.active_plot + if not line: + raise RuntimeError("No line commit is currently staged!?") - y = chart._cursor._datum_xy[1] + chart = self.chart._cursor.active_plot + y = chart._cursor._datum_xy[1] - # XXX: should make this an explicit attr - # it's assigned inside ``.add_plot()`` - lc = self.view.linked_charts + line = level_line( + chart, + level=y, + color='alert_yellow', + digits=chart._lc.symbol.digits(), + show_label=False, + ) - oid = str(uuid.uuid4()) - lc._to_router.send_nowait({ - 'chart': lc, - 'type': 'alert', - 'price': y, - 'oid': oid, - # 'symbol': lc.chart.name, - # 'brokers': lc.symbol.brokers, - # 'price': y, - }) + # register for later lookup/deletion + self._order_lines[uuid] = line + return line, y - line = level_line( - chart, - level=y, - color='alert_yellow', - ) - # register for later - _lines[oid] = line + def commit_line(self, uuid: str) -> LevelLine: + """Commit a "staged line" to view. - log.debug(f'clicked y: {y}') + Submits the line graphic under the cursor as a (new) permanent + graphic in view. + + """ + line = self._order_lines[uuid] + line.label.show() + + # TODO: other flashy things to indicate the order is active + + log.debug(f'Level active for level: {line.value()}') + + return line def remove_line( self, @@ -316,18 +327,114 @@ class LineEditor: cursor position. """ - # Delete any hoverable under the cursor - cursor = self.chart._cursor - if line: + # If line is passed delete it line.delete() + + elif uuid: + # try to look up line from our registry + self._order_lines.pop(uuid).delete() + else: + # Delete any hoverable under the cursor + cursor = self.chart._cursor + for item in cursor._hovered: # hovered items must also offer # a ``.delete()`` method item.delete() +@dataclass +class ArrowEditor: + + chart: 'ChartPlotWidget' # noqa + _arrows: field(default_factory=dict) + + def add( + self, + uid: str, + x: float, + y: float, + color='default', + pointing: str = 'up', + ) -> pg.ArrowItem: + """Add an arrow graphic to view at given (x, y). + + """ + yb = pg.mkBrush(hcolor('alert_yellow')) + + angle = 90 if pointing == 'up' else -90 + + arrow = pg.ArrowItem( + angle=angle, + baseAngle=0, + headLen=5, + headWidth=2, + tailLen=None, + brush=yb, + ) + arrow.setPos(x, y) + + self._arrows[uid] = arrow + + # render to view + self.chart.plotItem.addItem(arrow) + + return arrow + + def remove(self, arrow) -> bool: + self.chart.plotItem.removeItem(arrow) + + +@dataclass +class OrderMode: + """Major mode for placing orders on a chart view. + + """ + chart: 'ChartPlotWidget' + book: OrderBook + lines: LineEditor + arrows: ArrowEditor + + key_map: Dict[str, Callable] = field(default_factory=dict) + + def uuid(self) -> str: + return str(uuid.uuid4()) + + +@asynccontextmanager +async def open_order_mode( + chart, +): + # global _order_lines + + view = chart._vb + book = get_orders() + lines = LineEditor(view=view, _order_lines=_order_lines, chart=chart) + arrows = ArrowEditor(chart, {}) + + log.info("Opening order mode") + + mode = OrderMode(chart, book, lines, arrows) + view.mode = mode + + # # setup local ui event streaming channels for request/resp + # # streamging with EMS daemon + # global _to_ems, _from_order_book + # _to_ems, _from_order_book = trio.open_memory_channel(100) + + try: + yield mode + + finally: + # XXX special teardown handling like for ex. + # - cancelling orders if needed? + # - closing positions if desired? + # - switching special condition orders to safer/more reliable variants + log.info("Closing order mode") + + class ChartView(ViewBox): """Price chart view box with interaction behaviors you'd expect from any interactive platform: @@ -336,6 +443,7 @@ class ChartView(ViewBox): - vertical scrolling on y-axis - zoom on x to most recent in view datum - zoom on right-click-n-drag to cursor position + """ def __init__( self, @@ -350,9 +458,11 @@ class ChartView(ViewBox): self.addItem(self.select_box, ignoreBounds=True) self._chart: 'ChartPlotWidget' = None # noqa - self._lines_editor = LineEditor(view=self, _lines=_lines) + # self._lines_editor = LineEditor(view=self, _lines=_lines) + self.mode = None + + # kb ctrls processing self._key_buffer = [] - self._active_staged_line: LevelLine = None # noqa @property def chart(self) -> 'ChartPlotWidget': # type: ignore # noqa @@ -362,7 +472,7 @@ class ChartView(ViewBox): def chart(self, chart: 'ChartPlotWidget') -> None: # type: ignore # noqa self._chart = chart self.select_box.chart = chart - self._lines_editor.chart = chart + # self._lines_editor.chart = chart def wheelEvent(self, ev, axis=None): """Override "center-point" location for scrolling. @@ -533,8 +643,27 @@ class ChartView(ViewBox): ev.accept() - # commit the "staged" line under the cursor - self._lines_editor.commit_line() + # self._lines_editor.commit_line() + + # send order to EMS + + # register the "staged" line under the cursor + # to be displayed when above order ack arrives + # (means the line graphic doesn't show on screen until the + # order is live in the emsd). + mode = self.mode + uuid = mode.uuid() + + # make line graphic + line, y = mode.lines.create_line(uuid) + + # send order cmd to ems + mode.book.alert( + uuid=uuid, + symbol=mode.chart._lc._symbol, + price=y + ) + def keyReleaseEvent(self, ev): """ @@ -557,7 +686,8 @@ class ChartView(ViewBox): if text == 'a': # draw "staged" line under cursor position - self._lines_editor.unstage_line() + # self._lines_editor.unstage_line() + self.mode.lines.unstage_line() def keyPressEvent(self, ev): """ @@ -580,12 +710,11 @@ class ChartView(ViewBox): # ctl if mods == QtCore.Qt.ControlModifier: - # print("CTRL") # TODO: ctrl-c as cancel? # https://forum.qt.io/topic/532/how-to-catch-ctrl-c-on-a-widget/9 # if ev.text() == 'c': # self.rbScaleBox.hide() - pass + print(f"CTRL + key:{key} + text:{text}") # alt if mods == QtCore.Qt.AltModifier: @@ -603,13 +732,16 @@ class ChartView(ViewBox): elif text == 'a': # add a line at the current cursor - self._lines_editor.stage_line() + # self._lines_editor.stage_line() + self.mode.lines.stage_line() elif text == 'd': # delete any lines under the cursor - self._lines_editor.remove_line() + # self._lines_editor.remove_line() + self.mode.lines.remove_line() - # Leaving this for light reference purposes + # XXX: Leaving this for light reference purposes, there + # seems to be some work to at least gawk at for history mgmt. # Key presses are used only when mouse mode is RectMode # The following events are implemented: