diff --git a/piker/_ems.py b/piker/_ems.py index 50578da6..8ede8f31 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -18,12 +18,11 @@ In suit parlance: "Execution management systems" """ +import time from dataclasses import dataclass, field from typing import ( AsyncIterator, Dict, Callable, Tuple, - Any, ) -# import uuid import trio from trio_typing import TaskStatus @@ -71,11 +70,6 @@ class OrderBook: symbol: 'Symbol', price: float ) -> str: - # XXX: should make this an explicit attr - # it's assigned inside ``.add_plot()`` - # lc = self.view.linked_charts - - # uid = str(uuid.uuid4()) cmd = { 'msg': 'alert', 'price': price, @@ -86,17 +80,22 @@ class OrderBook: self._sent_orders[uuid] = cmd self._to_ems.send_nowait(cmd) - async def buy(self, price: float) -> str: + def buy(self, price: float) -> str: ... - async def sell(self, price: float) -> str: + def sell(self, price: float) -> str: ... - async def cancel(self, oid: str) -> bool: + def cancel(self, uuid: str) -> bool: """Cancel an order (or alert) from the EMS. """ - ... + cmd = { + 'msg': 'cancel', + 'oid': uuid, + } + self._sent_orders[uuid] = cmd + self._to_ems.send_nowait(cmd) # higher level operations @@ -138,9 +137,7 @@ async def send_order_cmds(): "pushed" from the parent to the EMS actor. """ - global _from_order_book - # book = get_orders() async for cmd in _from_order_book: @@ -148,27 +145,6 @@ async def send_order_cmds(): log.info(f'sending order cmd: {cmd}') yield cmd - # lc = order['chart'] - # symbol = order['symol'] - # msg = order['msg'] - # price = order['price'] - # oid = order['oid'] - - # TODO - # oid = str(uuid.uuid4()) - - # cmd = { - # 'price': price, - # 'action': 'alert', - # 'symbol': symbol.key, - # 'brokers': symbol.brokers, - # 'msg': msg, - # 'price': price, - # 'oid': oid, - # } - - # book._sent_orders[oid] = cmd - # TODO: numba all of this def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: @@ -210,13 +186,13 @@ class _ExecBook: # levels which have an executable action (eg. alert, order, signal) orders: Dict[ Tuple[str, str], - Tuple[ - # predicates - Callable[[float], bool], - - # actions - Callable[[float], Dict[str, Any]], - + Dict[ + str, # uuid + Tuple[ + Callable[[float], bool], # predicate + str, # name + dict, # cmd / msg type + ] ] ] = field(default_factory=dict) @@ -273,6 +249,7 @@ async def exec_orders( # XXX: optimize this for speed ############################## + start = time.time() for sym, quote in quotes.items(): execs = book.orders.get((broker, sym)) @@ -289,7 +266,7 @@ async def exec_orders( if not execs: continue - for oid, pred, name, cmd in tuple(execs): + for oid, (pred, name, cmd) in tuple(execs.items()): # push trigger msg back to parent as an "alert" # (mocking for eg. a "fill") @@ -299,25 +276,19 @@ async def exec_orders( cmd['index'] = feed.shm._last.value - 1 # current shm array index cmd['trigger_price'] = price + cmd['msg'] = 'executed' await ctx.send_yield(cmd) - # await ctx.send_yield({ - # 'type': 'alert', - # 'price': price, - # # current shm array index - # 'index': feed.shm._last.value - 1, - # 'name': name, - # 'oid': oid, - # }) print( f"GOT ALERT FOR {exec_price} @ \n{tick}\n") print(f'removing pred for {oid}') - execs.remove((oid, pred, name, cmd)) + pred, name, cmd = execs.pop(oid) print(f'execs are {execs}') + print(f'execs scan took: {time.time() - start}') # feed teardown @@ -333,6 +304,8 @@ async def stream_and_route(ctx, ui_name): actor = tractor.current_actor() book = get_book() + _active_execs: Dict[str, (str, str)] = {} + # new router entry point async with tractor.wait_for_actor(ui_name) as portal: @@ -341,52 +314,63 @@ async def stream_and_route(ctx, ui_name): async for cmd in await portal.run(send_order_cmds): + log.info(f'{cmd} received in {actor.uid}') msg = cmd['msg'] - - if msg == 'cancel': - # TODO: - pass - - trigger_price = cmd['price'] - sym = cmd['symbol'] - brokers = cmd['brokers'] oid = cmd['oid'] - if msg == 'alert': - log.info(f'Alert {cmd} received in {actor.uid}') + if msg == 'cancel': + # destroy exec + pred, name, cmd = book.orders[_active_execs[oid]].pop(oid) - broker = brokers[0] - last = book.lasts.get((broker, sym)) + # ack-cmdond that order is live + await ctx.send_yield({'msg': 'cancelled', 'oid': oid}) - if last is None: # spawn new brokerd feed task + continue - quote = await n.start( - exec_orders, - ctx, - # TODO: eventually support N-brokers - broker, - sym, - trigger_price, - ) - print(f"received first quote {quote}") + elif msg in ('alert', 'buy', 'sell',): - last = book.lasts[(broker, sym)] - print(f'Known last is {last}') + trigger_price = cmd['price'] + sym = cmd['symbol'] + brokers = cmd['brokers'] - # Auto-gen scanner predicate: - # we automatically figure out what the alert check condition - # should be based on the current first price received from the - # feed, instead of being like every other shitty tina platform - # that makes the user choose the predicate operator. - pred, name = mk_check(trigger_price, last) + broker = brokers[0] + last = book.lasts.get((broker, sym)) - # create list of executions on first entry - book.orders.setdefault((broker, sym), []).append( - (oid, pred, name, cmd) - ) + if last is None: # spawn new brokerd feed task - # ack-respond that order is live - await ctx.send_yield({'msg': 'ack', 'oid': oid}) + quote = await n.start( + exec_orders, + ctx, + # TODO: eventually support N-brokers + broker, + sym, + trigger_price, + ) + print(f"received first quote {quote}") + + last = book.lasts[(broker, sym)] + print(f'Known last is {last}') + + # Auto-gen scanner predicate: + # we automatically figure out what the alert check + # condition should be based on the current first + # price received from the feed, instead of being + # like every other shitty tina platform that makes + # the user choose the predicate operator. + pred, name = mk_check(trigger_price, last) + + # create list of executions on first entry + book.orders.setdefault( + (broker, sym), {})[oid] = (pred, name, cmd) + + # reverse lookup for cancellations + _active_execs[oid] = (broker, sym) + + # ack-cmdond that order is live + await ctx.send_yield({ + 'msg': 'active', + 'oid': oid + }) # continue and wait on next order cmd @@ -410,7 +394,7 @@ async def spawn_router_stream_alerts( portal = await n.start_actor( subactor_name, - rpc_module_paths=[__name__], + enable_modules=[__name__], ) stream = await portal.run( stream_and_route, @@ -424,51 +408,52 @@ async def spawn_router_stream_alerts( # begin the trigger-alert stream # this is where we receive **back** messages # about executions **from** the EMS actor - async for alert in stream: + async for msg in stream: # delete the line from view - oid = alert['oid'] - msg_type = alert['msg'] + oid = msg['oid'] + resp = msg['msg'] - if msg_type == 'ack': - print(f"order accepted: {alert}") + if resp in ('active',): + print(f"order accepted: {msg}") # show line label once order is live order_mode.lines.commit_line(oid) continue - order_mode.arrows.add( - oid, - alert['index'], - alert['price'], - pointing='up' if alert['name'] == 'up' else 'down' - ) + elif resp in ('cancelled',): - # print(f'_lines: {_lines}') - print(f'deleting line with oid: {oid}') + # delete level from view + order_mode.lines.remove_line(uuid=oid) + print(f'deleting line with oid: {oid}') - # delete level from view - order_mode.lines.remove_line(uuid=oid) + elif resp in ('executed',): - # chart._vb._lines_editor - # _lines.pop(oid).delete() + order_mode.lines.remove_line(uuid=oid) + print(f'deleting line with oid: {oid}') - # TODO: this in another task? - # not sure if this will ever be a bottleneck, - # we probably could do graphics stuff first tho? + order_mode.arrows.add( + oid, + msg['index'], + msg['price'], + pointing='up' if msg['name'] == 'up' else 'down' + ) - # XXX: linux only for now - result = await trio.run_process( - [ - 'notify-send', - '-u', 'normal', - '-t', '10000', - 'piker', - f'alert: {alert}', - ], - ) - log.runtime(result) + # DESKTOP NOTIFICATIONS + # + # TODO: this in another task? + # not sure if this will ever be a bottleneck, + # we probably could do graphics stuff first tho? - # do we need this? - # await _from_ems.put(alert) + # XXX: linux only for now + result = await trio.run_process( + [ + 'notify-send', + '-u', 'normal', + '-t', '10000', + 'piker', + f'alert: {msg}', + ], + ) + log.runtime(result) diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index ba779c34..02c95230 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -308,6 +308,7 @@ class LineEditor: """ line = self._order_lines[uuid] + line.oid = uuid line.label.show() # TODO: other flashy things to indicate the order is active @@ -316,6 +317,13 @@ class LineEditor: return line + def lines_under_cursor(self): + """Get the line(s) under the cursor position. + + """ + # Delete any hoverable under the cursor + return self.chart._cursor._hovered + def remove_line( self, line: LevelLine = None, @@ -328,21 +336,17 @@ class LineEditor: """ if line: - # If line is passed delete it - line.delete() + uuid = line.oid - elif uuid: - # try to look up line from our registry - self._order_lines.pop(uuid).delete() + # try to look up line from our registry + line = self._order_lines.pop(uuid) - else: - # Delete any hoverable under the cursor - cursor = self.chart._cursor + # if hovered remove from cursor set + hovered = self.chart._cursor._hovered + if line in hovered: + hovered.remove(line) - for item in cursor._hovered: - # hovered items must also offer - # a ``.delete()`` method - item.delete() + line.delete() @dataclass @@ -392,10 +396,15 @@ class OrderMode: """Major mode for placing orders on a chart view. """ - chart: 'ChartPlotWidget' + chart: 'ChartPlotWidget' # type: ignore # noqa book: OrderBook lines: LineEditor arrows: ArrowEditor + _arrow_colors = { + 'alert': 'alert_yellow', + 'buy': 'buy_green', + 'sell': 'sell_red', + } key_map: Dict[str, Callable] = field(default_factory=dict) @@ -664,7 +673,6 @@ class ChartView(ViewBox): price=y ) - def keyReleaseEvent(self, ev): """ Key release to normally to trigger release of input mode @@ -686,7 +694,6 @@ class ChartView(ViewBox): if text == 'a': # draw "staged" line under cursor position - # self._lines_editor.unstage_line() self.mode.lines.unstage_line() def keyPressEvent(self, ev): @@ -732,13 +739,14 @@ class ChartView(ViewBox): elif text == 'a': # add a line at the current cursor - # self._lines_editor.stage_line() self.mode.lines.stage_line() elif text == 'd': + # delete any lines under the cursor - # self._lines_editor.remove_line() - self.mode.lines.remove_line() + mode = self.mode + for line in mode.lines.lines_under_cursor(): + mode.book.cancel(uuid=line.oid) # XXX: Leaving this for light reference purposes, there # seems to be some work to at least gawk at for history mgmt.