From 140f3231e741786bd8c69a6af3a7503a92a6f8f1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Jan 2021 21:24:14 -0500 Subject: [PATCH] Add basic client-side order entry to EMS --- piker/_ems.py | 77 ++++++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/piker/_ems.py b/piker/_ems.py index aa1bdac5..a95d2aa4 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -18,7 +18,7 @@ In suit parlance: "Execution management systems" """ -# import time +import time from dataclasses import dataclass, field from typing import ( AsyncIterator, Dict, Callable, Tuple, @@ -59,19 +59,20 @@ class OrderBook: _to_ems: trio.abc.SendChannel = _to_ems _from_order_book: trio.abc.ReceiveChannel = _from_order_book - def on_fill(self, uuid: str) -> None: - cmd = self._sent_orders[uuid] - log.info(f"Order executed: {cmd}") - self._confirmed_orders[uuid] = cmd + # def on_fill(self, uuid: str) -> None: + # cmd = self._sent_orders[uuid] + # log.info(f"Order executed: {cmd}") + # self._confirmed_orders[uuid] = cmd - def alert( + def send( self, uuid: str, symbol: 'Symbol', - price: float + price: float, + action: str, ) -> str: cmd = { - 'msg': 'alert', + 'msg': action, 'price': price, 'symbol': symbol.key, 'brokers': symbol.brokers, @@ -80,12 +81,6 @@ class OrderBook: self._sent_orders[uuid] = cmd self._to_ems.send_nowait(cmd) - def buy(self, price: float) -> str: - ... - - def sell(self, price: float) -> str: - ... - def cancel(self, uuid: str) -> bool: """Cancel an order (or alert) from the EMS. @@ -218,11 +213,10 @@ def get_book() -> _ExecBook: return _book -async def exec_orders( +async def exec_loop( ctx: tractor.Context, broker: str, symbol: str, - exec_price: float, task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, ) -> AsyncIterator[dict]: @@ -239,7 +233,9 @@ async def exec_orders( book = get_book() book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - task_status.started((first_quote, feed)) + client = feed.mod.get_client_proxy(feed._brokerd_portal) + + task_status.started((first_quote, feed, client)) # shield this field so the remote brokerd does not get cancelled stream = feed.stream @@ -275,21 +271,24 @@ async def exec_orders( # (mocking for eg. a "fill") if pred(price): - cmd['name'] = name - cmd['index'] = feed.shm._last.value - 1 - # current shm array index - cmd['trigger_price'] = price - cmd['msg'] = 'executed' + resp = { + 'msg': 'executed', + 'name': name, + 'time_ns': time.time_ns(), + # current shm array index + 'index': feed.shm._last.value - 1, + 'exec_price': price, + } - await ctx.send_yield(cmd) + await ctx.send_yield(resp) print( - f"GOT ALERT FOR {exec_price} @ \n{tick}\n") + f"GOT ALERT FOR {name} @ \n{tick}\n") - print(f'removing pred for {oid}') + log.info(f'removing pred for {oid}') pred, name, cmd = execs.pop(oid) - print(f'execs are {execs}') + log.debug(f'execs are {execs}') # print(f'execs scan took: {time.time() - start}') # feed teardown @@ -335,7 +334,7 @@ async def stream_and_route(ctx, ui_name): # destroy exec pred, name, cmd = book.orders[_active_execs[oid]].pop(oid) - # ack-cmdond that order is live + # ack-cmd that order is live await ctx.send_yield({'msg': 'cancelled', 'oid': oid}) continue @@ -351,24 +350,26 @@ async def stream_and_route(ctx, ui_name): if last is None: # spawn new brokerd feed task - quote, feed = await n.start( - exec_orders, + quote, feed, client = await n.start( + exec_loop, ctx, - # TODO: eventually support N-brokers + + # TODO: eventually support N-brokers? broker, sym, + trigger_price, ) + # TODO: eventually support N-brokers n.start_soon( receive_trade_updates, ctx, - # TODO: eventually support N-brokers feed, ) - last = book.lasts[(broker, sym)] + print(f'Known last is {last}') # Auto-gen scanner predicate: @@ -379,6 +380,11 @@ async def stream_and_route(ctx, ui_name): # the user choose the predicate operator. pred, name = mk_check(trigger_price, last) + # if the predicate resolves immediately send the + # execution to the broker asap + if pred(last): + # send order + print("ORDER FILLED IMMEDIATELY!?!?!?!") # create list of executions on first entry book.orders.setdefault( @@ -387,7 +393,7 @@ async def stream_and_route(ctx, ui_name): # reverse lookup for cancellations _active_execs[oid] = (broker, sym) - # ack-cmdond that order is live + # ack-response that order is live here await ctx.send_yield({ 'msg': 'active', 'oid': oid @@ -451,14 +457,15 @@ async def spawn_router_stream_alerts( elif resp in ('executed',): - order_mode.lines.remove_line(uuid=oid) + line = order_mode.lines.remove_line(uuid=oid) print(f'deleting line with oid: {oid}') order_mode.arrows.add( oid, msg['index'], msg['price'], - pointing='up' if msg['name'] == 'up' else 'down' + pointing='up' if msg['name'] == 'up' else 'down', + color=line.color ) # DESKTOP NOTIFICATIONS