diff --git a/piker/_ems.py b/piker/_ems.py index a95d2aa4..aad8f9b4 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -18,12 +18,14 @@ In suit parlance: "Execution management systems" """ +from pprint import pformat import time from dataclasses import dataclass, field from typing import ( AsyncIterator, Dict, Callable, Tuple, ) +from bidict import bidict import trio from trio_typing import TaskStatus import tractor @@ -54,7 +56,7 @@ class OrderBook: """ _sent_orders: Dict[str, dict] = field(default_factory=dict) - _confirmed_orders: Dict[str, dict] = field(default_factory=dict) + # _confirmed_orders: Dict[str, dict] = field(default_factory=dict) _to_ems: trio.abc.SendChannel = _to_ems _from_order_book: trio.abc.ReceiveChannel = _from_order_book @@ -72,7 +74,7 @@ class OrderBook: action: str, ) -> str: cmd = { - 'msg': action, + 'action': action, 'price': price, 'symbol': symbol.key, 'brokers': symbol.brokers, @@ -81,24 +83,20 @@ class OrderBook: self._sent_orders[uuid] = cmd self._to_ems.send_nowait(cmd) + async def modify(self, oid: str, price) -> bool: + ... + def cancel(self, uuid: str) -> bool: """Cancel an order (or alert) from the EMS. """ - cmd = { - 'msg': 'cancel', + cmd = self._sent_orders[uuid] + msg = { + 'action': 'cancel', 'oid': uuid, + 'symbol': cmd['symbol'], } - self._sent_orders[uuid] = cmd - self._to_ems.send_nowait(cmd) - - # higher level operations - - async def transmit_to_broker(self, price: float) -> str: - ... - - async def modify(self, oid: str, price) -> bool: - ... + self._to_ems.send_nowait(msg) _orders: OrderBook = None @@ -123,13 +121,16 @@ async def send_order_cmds(): """Order streaming task: deliver orders transmitted from UI to downstream consumers. - This is run in the UI actor (usually the one running Qt). - The UI simply delivers order messages to the above ``_to_ems`` - send channel (from sync code using ``.send_nowait()``), these values - are pulled from the channel here and send to any consumer(s). + This is run in the UI actor (usually the one running Qt but could be + any other client service code). This process simply delivers order + messages to the above ``_to_ems`` send channel (from sync code using + ``.send_nowait()``), these values are pulled from the channel here + and relayed to any consumer(s) that called this function using + a ``tractor`` portal. This effectively makes order messages look like they're being - "pushed" from the parent to the EMS actor. + "pushed" from the parent to the EMS where local sync code is likely + doing the pushing from some UI. """ global _from_order_book @@ -181,9 +182,12 @@ class _ExecBook: A singleton instance is created per EMS actor (for now). """ + broker: str + # levels which have an executable action (eg. alert, order, signal) orders: Dict[ - Tuple[str, str], + # Tuple[str, str], + str, # symbol Dict[ str, # uuid Tuple[ @@ -200,17 +204,21 @@ class _ExecBook: float ] = field(default_factory=dict) - -_book = None + # mapping of broker order ids to piker ems ids + _broker2ems_ids: Dict[str, str] = field(default_factory=bidict) -def get_book() -> _ExecBook: - global _book +_books: Dict[str, _ExecBook] = {} - if _book is None: - _book = _ExecBook() - return _book +def get_book(broker: str) -> _ExecBook: + + global _books + return _books.setdefault(broker, _ExecBook(broker)) + + +# def scan_quotes( +# quotes: dict, async def exec_loop( @@ -226,32 +234,38 @@ async def exec_loop( loglevel='info', ) as feed: - # TODO: get initial price - + # TODO: get initial price quote from target broker first_quote = await feed.receive() - - book = get_book() + book = get_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + # TODO: wrap this in a more re-usable general api client = feed.mod.get_client_proxy(feed._brokerd_portal) + # return control to parent task task_status.started((first_quote, feed, client)) + ############################## + # begin price actions sequence + # XXX: optimize this for speed + ############################## + # shield this field so the remote brokerd does not get cancelled stream = feed.stream - with stream.shield(): + + # this stream may eventually contain multiple + # symbols async for quotes in stream: - ############################## - # begin price actions sequence - # XXX: optimize this for speed - ############################## + # TODO: numba all this! # start = time.time() for sym, quote in quotes.items(): execs = book.orders.get((broker, sym)) + if not execs: + continue for tick in quote.get('ticks', ()): price = tick.get('price') @@ -262,29 +276,33 @@ async def exec_loop( # update to keep new cmds informed book.lasts[(broker, symbol)] = price - if not execs: - continue - for oid, (pred, name, cmd) in tuple(execs.items()): # push trigger msg back to parent as an "alert" # (mocking for eg. a "fill") if pred(price): + # register broker id for ems id + order_id = await client.submit_limit( + oid=oid, + symbol=sym, + action=cmd['action'], + price=round(price, 2), + ) + # resp = book._broker2ems_ids.setdefault( + book._broker2ems_ids[order_id] = oid + resp = { - 'msg': 'executed', + 'resp': 'submitted', 'name': name, - 'time_ns': time.time_ns(), + 'ems_trigger_time_ns': time.time_ns(), # current shm array index 'index': feed.shm._last.value - 1, - 'exec_price': price, + 'trigger_price': price, } await ctx.send_yield(resp) - print( - f"GOT ALERT FOR {name} @ \n{tick}\n") - log.info(f'removing pred for {oid}') pred, name, cmd = execs.pop(oid) @@ -294,116 +312,227 @@ async def exec_loop( # feed teardown +# XXX: right now this is very very ad-hoc to IB +# TODO: lots of cases still to handle +# - short-sale but securities haven't been located, in this case we +# should probably keep the order in some kind of weird state or cancel +# it outright? +# status='PendingSubmit', message=''), +# status='Cancelled', message='Error 404, reqId 1550: Order held while securities are located.'), +# status='PreSubmitted', message='')], + async def receive_trade_updates( ctx: tractor.Context, feed: 'Feed', # noqa + book: _ExecBook, + task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, ) -> AsyncIterator[dict]: - # await tractor.breakpoint() - print("TRADESZ") - async for update in await feed.recv_trades_data(): - log.info(update) + """Trades update loop - receive updates from broker, convert + to EMS responses, transmit to ordering client(s). + + This is where trade confirmations from the broker are processed + and appropriate responses relayed back to the original EMS client + actor. There is a messaging translation layer throughout. + + """ + trades_stream = await feed.recv_trades_data() + first = await trades_stream.__anext__() + + # startup msg + assert first['trade_events'] == 'started' + task_status.started() + + async for trade_event in trades_stream: + event = trade_event['trade_events'] + + try: + order = event['order'] + except KeyError: + + # Relay broker error messages + err = event['error'] + + # broker request id - must be normalized + # into error transmission by broker backend. + reqid = err['brid'] + + # TODO: handle updates! + oid = book._broker2ems_ids.get(reqid) + + # XXX should we make one when it's blank? + log.error(pformat(err['message'])) + + else: + log.info(f'Received broker trade event:\n{pformat(event)}') + + status = event['orderStatus']['status'] + reqid = order['orderId'] + + # TODO: handle updates! + oid = book._broker2ems_ids.get(reqid) + + if status in {'Cancelled'}: + resp = {'resp': 'cancelled'} + + elif status in {'Submitted'}: + # ack-response that order is live/submitted + # to the broker + resp = {'resp': 'submitted'} + + # elif status in {'Executed', 'Filled'}: + elif status in {'Filled'}: + + # order was filled by broker + fills = [] + for fill in event['fills']: + e = fill['execution'] + fills.append( + (e.time, e.price, e.shares, e.side) + ) + + resp = { + 'resp': 'executed', + 'fills': fills, + } + + else: # active in EMS + # ack-response that order is live in EMS + # (aka as a client side limit) + resp = {'resp': 'active'} + + # send response packet to EMS client(s) + resp['oid'] = oid + + await ctx.send_yield(resp) @tractor.stream -async def stream_and_route(ctx, ui_name): - """Order router (sub)actor entrypoint. +async def stream_and_route( + ctx: tractor.Context, + client_actor_name: str, + broker: str, + symbol: str, + mode: str = 'live', # ('paper', 'dark', 'live') +) -> None: + """EMS (sub)actor entrypoint. This is the daemon (child) side routine which starts an EMS runtime per broker/feed and and begins streaming back alerts - from executions back to subscribers. + from executions to order clients. """ actor = tractor.current_actor() - book = get_book() - - _active_execs: Dict[str, (str, str)] = {} + book = get_book(broker) # new router entry point - async with tractor.wait_for_actor(ui_name) as portal: + async with tractor.wait_for_actor(client_actor_name) as portal: # spawn one task per broker feed async with trio.open_nursery() as n: + # TODO: eventually support N-brokers + quote, feed, client = await n.start( + exec_loop, + ctx, + broker, + symbol, + ) + + # for paper mode we need to mock this trades response feed + await n.start( + receive_trade_updates, + ctx, + feed, + book, + ) + async for cmd in await portal.run(send_order_cmds): log.info(f'{cmd} received in {actor.uid}') - msg = cmd['msg'] + + action = cmd['action'] oid = cmd['oid'] + sym = cmd['symbol'] - if msg == 'cancel': - # destroy exec - pred, name, cmd = book.orders[_active_execs[oid]].pop(oid) + if action == 'cancel': - # ack-cmd that order is live - await ctx.send_yield({'msg': 'cancelled', 'oid': oid}) + # check for live-broker order + brid = book._broker2ems_ids.inverse[oid] + if brid: + log.info("Submitting cancel for live order") + await client.submit_cancel(oid=brid) - continue + # check for EMS active exec + else: + book.orders[symbol].pop(oid, None) + await ctx.send_yield( + {'action': 'cancelled', + 'oid': oid} + ) - elif msg in ('alert', 'buy', 'sell',): + elif action in ('alert', 'buy', 'sell',): trigger_price = cmd['price'] - sym = cmd['symbol'] brokers = cmd['brokers'] - broker = brokers[0] - last = book.lasts.get((broker, sym)) - - if last is None: # spawn new brokerd feed task - - quote, feed, client = await n.start( - exec_loop, - ctx, - - # TODO: eventually support N-brokers? - broker, - sym, - - trigger_price, - ) - - # TODO: eventually support N-brokers - n.start_soon( - receive_trade_updates, - ctx, - feed, - ) last = book.lasts[(broker, sym)] + # print(f'Known last is {last}') - print(f'Known last is {last}') + if action in ('buy', 'sell',): - # Auto-gen scanner predicate: - # we automatically figure out what the alert check - # condition should be based on the current first - # price received from the feed, instead of being - # like every other shitty tina platform that makes - # the user choose the predicate operator. - pred, name = mk_check(trigger_price, last) + # if the predicate resolves immediately send the + # execution to the broker asap + # if pred(last): + if mode == 'live': + # send order + log.warning("ORDER FILLED IMMEDIATELY!?!?!?!") + # IF SEND ORDER RIGHT AWAY CONDITION - # if the predicate resolves immediately send the - # execution to the broker asap - if pred(last): - # send order - print("ORDER FILLED IMMEDIATELY!?!?!?!") + # register broker id for ems id + order_id = await client.submit_limit( + oid=oid, + symbol=sym, + action=action, + price=round(trigger_price, 2), + ) + book._broker2ems_ids[order_id] = oid - # create list of executions on first entry - book.orders.setdefault( - (broker, sym), {})[oid] = (pred, name, cmd) + # book.orders[symbol][oid] = None - # reverse lookup for cancellations - _active_execs[oid] = (broker, sym) + # XXX: the trades data broker response loop + # (``receive_trade_updates()`` above) will + # handle sending the ems side acks back to + # the cmd sender from here - # ack-response that order is live here - await ctx.send_yield({ - 'msg': 'active', - 'oid': oid - }) + elif mode in {'dark', 'paper'}: + + # Auto-gen scanner predicate: + # we automatically figure out what the alert check + # condition should be based on the current first + # price received from the feed, instead of being + # like every other shitty tina platform that makes + # the user choose the predicate operator. + pred, name = mk_check(trigger_price, last) + + # submit execution/order to EMS scanner loop + # create list of executions on first entry + book.orders.setdefault( + (broker, sym), {} + )[oid] = (pred, name, cmd) + + # ack-response that order is live here + await ctx.send_yield({ + 'resp': 'ems_active', + 'oid': oid + }) # continue and wait on next order cmd -async def spawn_router_stream_alerts( +async def _ems_main( order_mode, + broker: str, symbol: Symbol, # lines: 'LinesEditor', task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, @@ -425,7 +554,10 @@ async def spawn_router_stream_alerts( ) stream = await portal.run( stream_and_route, - ui_name=actor.name + client_actor_name=actor.name, + broker=broker, + symbol=symbol.key, + ) async with tractor.wait_for_actor(subactor_name): @@ -439,49 +571,22 @@ async def spawn_router_stream_alerts( # delete the line from view oid = msg['oid'] - resp = msg['msg'] + resp = msg['resp'] - if resp in ('active',): - print(f"order accepted: {msg}") + # response to 'action' request (buy/sell) + if resp in ('ems_active', 'submitted'): + log.info(f"order accepted: {msg}") # show line label once order is live - order_mode.lines.commit_line(oid) - - continue + order_mode.on_submit(oid) + # response to 'cancel' request elif resp in ('cancelled',): # delete level from view - order_mode.lines.remove_line(uuid=oid) - print(f'deleting line with oid: {oid}') + order_mode.on_cancel(oid) + log.info(f'deleting line with oid: {oid}') + # response to 'action' request (buy/sell) elif resp in ('executed',): - - line = order_mode.lines.remove_line(uuid=oid) - print(f'deleting line with oid: {oid}') - - order_mode.arrows.add( - oid, - msg['index'], - msg['price'], - pointing='up' if msg['name'] == 'up' else 'down', - color=line.color - ) - - # DESKTOP NOTIFICATIONS - # - # TODO: this in another task? - # not sure if this will ever be a bottleneck, - # we probably could do graphics stuff first tho? - - # XXX: linux only for now - result = await trio.run_process( - [ - 'notify-send', - '-u', 'normal', - '-t', '10000', - 'piker', - f'alert: {msg}', - ], - ) - log.runtime(result) + await order_mode.on_exec(oid, msg) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 79dfc5f1..4b9c6579 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -119,6 +119,8 @@ class NonShittyWrapper(Wrapper): """ Get rid of datetime on executions. """ + # this is the IB server's execution time supposedly + # https://interactivebrokers.github.io/tws-api/classIBApi_1_1Execution.html#a2e05cace0aa52d809654c7248e052ef2 execu.time = execu.time.timestamp() return super().execDetails(reqId, contract, execu) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index cbdd7e9f..782aa933 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -89,7 +89,6 @@ async def maybe_spawn_brokerd( brokername: str, sleep: float = 0.5, loglevel: Optional[str] = None, - expose_mods: List = [], **tractor_kwargs, ) -> tractor._portal.Portal: """If no ``brokerd.{brokername}`` daemon-actor can be found, @@ -180,8 +179,14 @@ class Feed: if not self._trade_stream: self._trade_stream = await self._brokerd_portal.run( + self.mod.stream_trades, - topics=['all'], # do we need this? + + # do we need this? -> yes + # the broker side must declare this key + # in messages, though we could probably use + # more then one? + topics=['trade_events'], ) return self._trade_stream diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index dad829f4..9e97617f 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -59,7 +59,7 @@ from ..log import get_logger from ._exec import run_qtractor, current_screen from ._interaction import ChartView, open_order_mode from .. import fsp -from .._ems import spawn_router_stream_alerts +from .._ems import _ems_main log = get_logger(__name__) @@ -959,8 +959,9 @@ async def _async_main( # spawn EMS actor-service to_ems_chan = await n.start( - spawn_router_stream_alerts, + _ems_main, order_mode, + brokername, symbol, ) diff --git a/piker/ui/_graphics/_lines.py b/piker/ui/_graphics/_lines.py index 70e8e5ea..20c68399 100644 --- a/piker/ui/_graphics/_lines.py +++ b/piker/ui/_graphics/_lines.py @@ -202,6 +202,9 @@ class L1Labels: self.ask_label._size_br_from_str(self.max_value) +# TODO: probably worth investigating if we can +# make .boundingRect() faster: +# https://stackoverflow.com/questions/26156486/determine-bounding-rect-of-line-in-qt class LevelLine(pg.InfiniteLine): # TODO: fill in these slots for orders diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 81d3ccf2..47db7493 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -17,8 +17,10 @@ """ UX interaction customs. """ +import time from contextlib import asynccontextmanager from dataclasses import dataclass, field +from pprint import pformat from typing import Optional, Dict, Callable import uuid @@ -427,6 +429,57 @@ class OrderMode: self._action = name self.lines.stage_line(color=self._colors[name]) + def on_submit(self, uuid: str) -> dict: + self.lines.commit_line(uuid) + req_msg = self.book._sent_orders.get(uuid) + req_msg['ack_time_ns'] = time.time_ns() + # self.book._confirmed_orders[uuid] = req_msg + return req_msg + + async def on_exec( + self, + uuid: str, + msg: Dict[str, str], + ) -> None: + + line = self.lines.remove_line(uuid=uuid) + log.debug(f'deleting line with oid: {uuid}') + + for fill in msg['fills']: + + self.arrows.add( + uuid, + msg['index'], + msg['price'], + pointing='up' if msg['action'] == 'buy' else 'down', + color=line.color + ) + + # DESKTOP NOTIFICATIONS + # + # TODO: this in another task? + # not sure if this will ever be a bottleneck, + # we probably could do graphics stuff first tho? + + # XXX: linux only for now + result = await trio.run_process( + [ + 'notify-send', + '-u', 'normal', + '-t', '10000', + 'piker', + f'alert: {msg}', + ], + ) + log.runtime(result) + + def on_cancel(self, uuid: str) -> None: + msg = self.book._sent_orders.pop(uuid, None) + if msg is not None: + self.lines.remove_line(uuid=uuid) + else: + log.warning(f'Received cancel for unsubmitted order {pformat(msg)}') + def submit_exec(self) -> None: """Send execution order to EMS.