From 6e58f31fd8f74b9e236062824d408be3c0d9461f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 12:14:45 -0400 Subject: [PATCH] Port EMS to typed messaging + bidir streaming This moves the entire clearing system to use typed messages using `pydantic.BaseModel` such that the streamed request-response order submission protocols can be explicitly viewed in terms of message schema, flow, and sequencing. Using the explicit message formats we can now dig into simplifying and normalizing across broker provider apis to get the best uniformity and simplicity. The order submission sequence is now fully async: an order request is expected to be explicitly acked with a new message and if cancellation is requested by the client before the ack arrives, the cancel message is stashed and then later sent immediately on receipt of the order submission's ack from the backend broker. Backend brokers are now controlled using a 2-way request-response streaming dialogue which is fully api agnostic of the clearing system's core processing; This leverages the new bi-directional streaming apis from `tractor`. The clearing core (emsd) was also simplified by moving the paper engine to it's own sub-actor and making it api-symmetric with expected `brokerd` endpoints. A couple of the ems status messages were changed/added: 'dark_executed' -> 'dark_triggered' added 'alert_triggered' More cleaning of old code to come! --- piker/clearing/_client.py | 77 ++-- piker/clearing/_ems.py | 872 +++++++++++++++++++++++++------------- piker/ui/order_mode.py | 52 ++- 3 files changed, 653 insertions(+), 348 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 28fc54fa..47c79636 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -19,34 +19,23 @@ Orders and execution client API. """ from contextlib import asynccontextmanager -from typing import Dict, Tuple, List +from typing import Dict from pprint import pformat from dataclasses import dataclass, field import trio import tractor -# import msgspec from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main from .._daemon import maybe_open_emsd +from ._messages import Order, Cancel log = get_logger(__name__) -# TODO: some kinda validation like this -# class Order(msgspec.Struct): -# action: str -# price: float -# size: float -# symbol: str -# brokers: List[str] -# oid: str -# exec_mode: str - - @dataclass class OrderBook: """Buy-side (client-side ?) order book ctl and tracking. @@ -64,31 +53,34 @@ class OrderBook: _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel - _sent_orders: Dict[str, dict] = field(default_factory=dict) + _sent_orders: Dict[str, Order] = field(default_factory=dict) _ready_to_receive: trio.Event = trio.Event() def send( + self, uuid: str, symbol: str, - brokers: List[str], + brokers: list[str], price: float, size: float, action: str, exec_mode: str, + ) -> dict: - cmd = { - 'action': action, - 'price': price, - 'size': size, - 'symbol': symbol, - 'brokers': brokers, - 'oid': uuid, - 'exec_mode': exec_mode, # dark or live - } - self._sent_orders[uuid] = cmd - self._to_ems.send_nowait(cmd) - return cmd + msg = Order( + action=action, + price=price, + size=size, + symbol=symbol, + brokers=brokers, + oid=uuid, + exec_mode=exec_mode, # dark or live + ) + + self._sent_orders[uuid] = msg + self._to_ems.send_nowait(msg.dict()) + return msg def update( self, @@ -98,28 +90,27 @@ class OrderBook: cmd = self._sent_orders[uuid] msg = cmd.dict() msg.update(data) - self._sent_orders[uuid] = OrderMsg(**msg) + self._sent_orders[uuid] = Order(**msg) self._to_ems.send_nowait(msg) return cmd def cancel(self, uuid: str) -> bool: - """Cancel an order (or alert) from the EMS. + """Cancel an order (or alert) in the EMS. """ cmd = self._sent_orders[uuid] - msg = { - 'action': 'cancel', - 'oid': uuid, - 'symbol': cmd['symbol'], - } - self._to_ems.send_nowait(msg) + msg = Cancel( + oid=uuid, + symbol=cmd.symbol, + ) + self._to_ems.send_nowait(msg.dict()) _orders: OrderBook = None def get_orders( - emsd_uid: Tuple[str, str] = None + emsd_uid: tuple[str, str] = None ) -> OrderBook: """" OrderBook singleton factory per actor. @@ -139,7 +130,10 @@ def get_orders( return _orders +# TODO: we can get rid of this relay loop once we move +# order_mode inputs to async code! async def relay_order_cmds_from_sync_code( + symbol_key: str, to_ems_stream: tractor.MsgStream, @@ -184,7 +178,8 @@ async def relay_order_cmds_from_sync_code( async def open_ems( broker: str, symbol: Symbol, -) -> None: + +) -> (OrderBook, tractor.MsgStream, dict): """Spawn an EMS daemon and begin sending orders and receiving alerts. @@ -232,9 +227,9 @@ async def open_ems( broker=broker, symbol=symbol.key, - # TODO: ``first`` here should be the active orders/execs - # persistent on the ems so that loca UI's can be populated. - ) as (ctx, first), + # TODO: ``first`` here should be the active orders/execs + # persistent on the ems so that loca UI's can be populated. + ) as (ctx, positions), # open 2-way trade command stream ctx.open_stream() as trades_stream, @@ -246,4 +241,4 @@ async def open_ems( trades_stream ) - yield book, trades_stream + yield book, trades_stream, positions diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 50a44426..cd0795b3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,7 +32,12 @@ import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks -from ._paper_engine import PaperBoi, simulate_fills +from . import _paper_engine as paper +from ._messages import ( + Status, Order, + BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, + BrokerdFill, BrokerdError, BrokerdPosition, +) log = get_logger(__name__) @@ -106,8 +111,9 @@ class _DarkBook: float ] = field(default_factory=dict) - # mapping of broker order ids to piker ems ids - _broker2ems_ids: dict[str, str] = field(default_factory=bidict) + # mapping of piker ems order ids to current brokerd order flow message + _ems_entries: dict[str, str] = field(default_factory=dict) + _ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) # XXX: this is in place to prevent accidental positions that are too @@ -117,13 +123,20 @@ class _DarkBook: _DEFAULT_SIZE: float = 1.0 -async def execute_triggers( +async def clear_dark_triggers( + + # ctx: tractor.Context, + brokerd_orders_stream: tractor.MsgStream, + ems_client_order_stream: tractor.MsgStream, + quote_stream: tractor.ReceiveMsgStream, # noqa + broker: str, symbol: str, - stream: 'tractor.ReceiveStream', # noqa - ctx: tractor.Context, - client: 'Client', # noqa + # client: 'Client', # noqa + # order_msg_stream: 'Client', # noqa + book: _DarkBook, + ) -> None: """Core dark order trigger loop. @@ -133,7 +146,7 @@ async def execute_triggers( """ # this stream may eventually contain multiple symbols # XXX: optimize this for speed! - async for quotes in stream: + async for quotes in quote_stream: # TODO: numba all this! @@ -169,9 +182,15 @@ async def execute_triggers( # majority of iterations will be non-matches continue - action = cmd['action'] + action: str = cmd['action'] + symbol: str = cmd['symbol'] - if action != 'alert': + if action == 'alert': + # nothing to do but relay a status + # message back to the requesting ems client + resp = 'alert_triggered' + + else: # executable order submission # submit_price = price + price*percent_away @@ -181,47 +200,89 @@ async def execute_triggers( f'Dark order triggered for price {price}\n' f'Submitting order @ price {submit_price}') - reqid = await client.submit_limit( + # TODO: port to BrokerdOrder message sending + msg = BrokerdOrder( + action=cmd['action'], oid=oid, + time_ns=time.time_ns(), + # this is a brand new order request for the - # underlying broker so we set out "broker request - # id" (brid) as nothing so that the broker - # client knows that we aren't trying to modify - # an existing order. - brid=None, + # underlying broker so we set a "broker + # request id" (brid) to "nothing" so that the + # broker client knows that we aren't trying + # to modify an existing order-request. + reqid=None, symbol=sym, - action=cmd['action'], price=submit_price, size=cmd['size'], ) + await brokerd_orders_stream.send(msg.dict()) + # mark this entry as having send an order request + book._ems_entries[oid] = msg - # register broker request id to ems id - book._broker2ems_ids[reqid] = oid + resp = 'dark_triggered' - else: - # alerts have no broker request id - reqid = '' + # an internal brokerd-broker specific + # order-request id is expected to be generated - resp = { - 'resp': 'dark_executed', - 'time_ns': time.time_ns(), - 'trigger_price': price, + # reqid = await client.submit_limit( - 'cmd': cmd, # original request message + # oid=oid, - 'broker_reqid': reqid, - 'broker': broker, - 'oid': oid, # piker order id + # # this is a brand new order request for the + # # underlying broker so we set a "broker + # # request id" (brid) to "nothing" so that the + # # broker client knows that we aren't trying + # # to modify an existing order-request. + # brid=None, - } + # symbol=sym, + # action=cmd['action'], + # price=submit_price, + # size=cmd['size'], + # ) + + # # register broker request id to ems id + + # else: + # # alerts have no broker request id + # reqid = '' + + # resp = { + # 'resp': 'dark_executed', + # 'cmd': cmd, # original request message + + # 'time_ns': time.time_ns(), + # 'trigger_price': price, + + # 'broker_reqid': reqid, + # 'broker': broker, + # 'oid': oid, # piker order id + + # } + msg = Status( + oid=oid, # piker order id + resp=resp, + time_ns=time.time_ns(), + + symbol=symbol, + trigger_price=price, + + # broker_reqid=reqid, + broker_details={'name': broker}, + + cmd=cmd, # original request message + + ).dict() # remove exec-condition from set log.info(f'removing pred for {oid}') execs.pop(oid) - await ctx.send_yield(resp) + # await ctx.send_yield(resp) + await ems_client_order_stream.send(msg) else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -231,78 +292,49 @@ async def execute_triggers( # print(f'execs scan took: {time.time() - start}') -async def exec_loop( +# async def start_clearing( - ctx: tractor.Context, - feed: 'Feed', # noqa - broker: str, - symbol: str, - _exec_mode: str, - task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, +# # ctx: tractor.Context, +# brokerd_order_stream: tractor.MsgStream, +# quote_stream: tractor.MsgStream, -) -> AsyncIterator[dict]: - """Main scan loop for order execution conditions and submission - to brokers. +# # client: 'Client', - """ - global _router +# # feed: 'Feed', # noqa +# broker: str, +# symbol: str, +# _exec_mode: str, - # XXX: this should be initial price quote from target provider - first_quote = await feed.receive() +# book: _DarkBook, - book = _router.get_dark_book(broker) - book.lasts[(broker, symbol)] = first_quote[symbol]['last'] +# # task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, - # TODO: wrap this in a more re-usable general api - client_factory = getattr(feed.mod, 'get_client_proxy', None) +# ) -> AsyncIterator[dict]: +# """Main scan loop for order execution conditions and submission +# to brokers. - if client_factory is not None and _exec_mode != 'paper': +# """ +# async with trio.open_nursery() as n: - # we have an order API for this broker - client = client_factory(feed._brokerd_portal) +# # trigger scan and exec loop +# n.start_soon( +# trigger_executions, - else: - # force paper mode - log.warning(f'Entering paper trading mode for {broker}') +# brokerd_order_stream, +# quote_stream, - client = PaperBoi( - broker, - *trio.open_memory_channel(100), - _buys={}, - _sells={}, +# broker, +# symbol, +# book +# # ctx, +# # client, +# ) - _reqids={}, - ) - - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - feed._trade_stream = client.trade_stream - - # init the trades stream - client._to_trade_stream.send_nowait({'local_trades': 'start'}) - - _exec_mode = 'paper' - - # return control to parent task - task_status.started((first_quote, feed, client)) - - stream = feed.stream - async with trio.open_nursery() as n: - n.start_soon( - execute_triggers, - broker, - symbol, - stream, - ctx, - client, - book - ) - - if _exec_mode == 'paper': - # TODO: make this an actual broadcast channels as in: - # https://github.com/python-trio/trio/issues/987 - n.start_soon(simulate_fills, stream, client) +# # # paper engine simulator task +# # if _exec_mode == 'paper': +# # # TODO: make this an actual broadcast channels as in: +# # # https://github.com/python-trio/trio/issues/987 +# # n.start_soon(simulate_fills, quote_stream, client) # TODO: lots of cases still to handle @@ -315,11 +347,17 @@ async def exec_loop( # reqId 1550: Order held while securities are located.'), # status='PreSubmitted', message='')], -async def process_broker_trades( - ctx: tractor.Context, - feed: 'Feed', # noqa +async def translate_and_relay_brokerd_events( + + # ctx: tractor.Context, + broker: str, + ems_client_order_stream: tractor.MsgStream, + brokerd_trades_stream: tractor.MsgStream, book: _DarkBook, + + # feed: 'Feed', # noqa task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, + ) -> AsyncIterator[dict]: """Trades update loop - receive updates from broker, convert to EMS responses, transmit to ordering client(s). @@ -339,198 +377,336 @@ async def process_broker_trades( {'presubmitted', 'submitted', 'cancelled', 'inactive'} """ - broker = feed.mod.name + # broker = feed.mod.name # TODO: make this a context # in the paper engine case this is just a mem receive channel - async with feed.receive_trades_data() as trades_stream: + # async with feed.receive_trades_data() as brokerd_trades_stream: - first = await trades_stream.__anext__() + # first = await brokerd_trades_stream.__anext__() - # startup msg expected as first from broker backend - assert first['local_trades'] == 'start' - task_status.started() + # startup msg expected as first from broker backend + # assert first['local_trades'] == 'start' + # task_status.started() - async for event in trades_stream: + async for brokerd_msg in brokerd_trades_stream: - name, msg = event['local_trades'] + # name, msg = event['local_trades'] + name = brokerd_msg['name'] - log.info(f'Received broker trade event:\n{pformat(msg)}') + log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') - if name == 'position': - msg['resp'] = 'position' + if name == 'position': + # msg['resp'] = 'position' + + # relay through position msgs immediately + await ems_client_order_stream.send( + BrokerdPosition(**brokerd_msg).dict() + ) + continue + + # Get the broker (order) request id, this **must** be normalized + # into messaging provided by the broker backend + reqid = brokerd_msg['reqid'] + + # all piker originated requests will have an ems generated oid field + oid = brokerd_msg.get( + 'oid', + book._ems2brokerd_ids.inverse.get(reqid) + ) + + if oid is None: + + # XXX: paper clearing special cases + # paper engine race case: ``Client.submit_limit()`` hasn't + # returned yet and provided an output reqid to register + # locally, so we need to retreive the oid that was already + # packed at submission since we already know it ahead of + # time + paper = brokerd_msg['broker_details'].get('paper_info') + if paper: + # paperboi keeps the ems id up front + oid = paper['oid'] + + else: + # may be an order msg specified as "external" to the + # piker ems flow (i.e. generated by some other + # external broker backend client (like tws for ib) + ext = brokerd_msg.get('external') + if ext: + log.error(f"External trade event {ext}") + + continue + else: + # check for existing live flow entry + entry = book._ems_entries.get(oid) + + # initial response to brokerd order request + if name == 'ack': + + # register the brokerd request id (that was likely + # generated internally) with our locall ems order id for + # reverse lookup later. a BrokerdOrderAck **must** be + # sent after an order request in order to establish this + # id mapping. + book._ems2brokerd_ids[oid] = reqid + + # new order which has not yet be registered into the + # local ems book, insert it now and handle 2 cases: + + # - the order has previously been requested to be + # cancelled by the ems controlling client before we + # received this ack, in which case we relay that cancel + # signal **asap** to the backend broker + if entry.action == 'cancel': + # assign newly providerd broker backend request id + entry.reqid = reqid + + # tell broker to cancel immediately + await brokerd_trades_stream.send(entry.dict()) + + # - the order is now active and will be mirrored in + # our book -> registered as live flow + else: + # update the flow with the ack msg + book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) - # relay through - await ctx.send_yield(msg) continue - # Get the broker (order) request id, this **must** be normalized - # into messaging provided by the broker backend - reqid = msg['reqid'] + # a live flow now exists + oid = entry.oid - # make response packet to EMS client(s) - oid = book._broker2ems_ids.get(reqid) + # make response packet to EMS client(s) + # reqid = book._ems_entries.get(oid) - if oid is None: - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = msg.get('paper_info') - if paper: - oid = paper['oid'] + # # msg is for unknown emsd order id + # if oid is None: + # oid = msg['oid'] + # # XXX: paper clearing special cases + # # paper engine race case: ``Client.submit_limit()`` hasn't + # # returned yet and provided an output reqid to register + # # locally, so we need to retreive the oid that was already + # # packed at submission since we already know it ahead of + # # time + # paper = msg.get('paper_info') + # if paper: + # oid = paper['oid'] + + # else: + # msg.get('external') + # if not msg: + # log.error(f"Unknown trade event {event}") + + # continue + + # resp = { + # 'resp': None, # placeholder + # 'oid': oid + # } + resp = None + broker_details = {} + + if name in ( + 'error', + ): + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? + + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. + + msg = BrokerdError(**brokerd_msg) + + # XXX should we make one when it's blank? + log.error(pformat(msg)) + + # TODO: getting this bs, prolly need to handle status messages + # 'Market data farm connection is OK:usfarm.nj' + + # another stupid ib error to handle + # if 10147 in message: cancel + + # don't relay message to order requester client + continue + + elif name in ( + 'status', + ): + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # everyone doin camel case + msg = BrokerdStatus(**brokerd_msg) + # status = msg['status'].lower() + + if msg.status == 'filled': + + # conditional execution is fully complete, no more + # fills for the noted order + if not msg.remaining: + + resp = 'broker_executed' + + log.info(f'Execution for {oid} is complete!') + + + # just log it else: - msg.get('external') - if not msg: - log.error(f"Unknown trade event {event}") - - continue - - resp = { - 'resp': None, # placeholder - 'oid': oid - } - - if name in ( - 'error', - ): - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? - - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. - - message = msg['message'] - - # XXX should we make one when it's blank? - log.error(pformat(message)) - - # TODO: getting this bs, prolly need to handle status messages - # 'Market data farm connection is OK:usfarm.nj' - - # another stupid ib error to handle - # if 10147 in message: cancel - - # don't relay message to order requester client - continue - - elif name in ( - 'status', - ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case - status = msg['status'].lower() - - if status == 'filled': - - # conditional execution is fully complete, no more - # fills for the noted order - if not msg['remaining']: - - resp['resp'] = 'broker_executed' - - log.info(f'Execution for {oid} is complete!') - - # just log it - else: - log.info(f'{broker} filled {msg}') - - else: - # one of (submitted, cancelled) - resp['resp'] = 'broker_' + status - - elif name in ( - 'fill', - ): - # proxy through the "fill" result(s) - resp['resp'] = 'broker_filled' - resp.update(msg) - - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') - - # respond to requesting client - await ctx.send_yield(resp) + log.info(f'{broker} filled {msg}') -async def process_order_cmds( + else: + # one of {submitted, cancelled} + resp = 'broker_' + msg.status + + # pass the BrokerdStatus msg inside the broker details field + broker_details = msg.dict() + + elif name in ( + 'fill', + ): + msg = BrokerdFill(**brokerd_msg) + + # proxy through the "fill" result(s) + resp = 'broker_filled' + broker_details = msg.dict() + + log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + + else: + raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + + # Create and relay EMS response status message + resp = Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ) + # relay response to requesting EMS client + await ems_client_order_stream.send(resp.dict()) + + +async def process_client_order_cmds( + + # ctx: tractor.Context, + client_order_stream: tractor.MsgStream, # noqa + brokerd_order_stream: tractor.MsgStream, - ctx: tractor.Context, - cmd_stream: 'tractor.ReceiveStream', # noqa symbol: str, feed: 'Feed', # noqa - client: 'Client', # noqa + # client: 'Client', # noqa dark_book: _DarkBook, ) -> None: - async for cmd in cmd_stream: + # cmd: dict + async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') action = cmd['action'] oid = cmd['oid'] - - brid = dark_book._broker2ems_ids.inverse.get(oid) + reqid = dark_book._ems2brokerd_ids.inverse.get(oid) + live_entry = dark_book._ems_entries.get(oid) # TODO: can't wait for this stuff to land in 3.10 # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings if action in ('cancel',): # check for live-broker order - if brid: - log.info("Submitting cancel for live order") - await client.submit_cancel(reqid=brid) + if live_entry: + + msg = BrokerdCancel( + oid=oid, + reqid=reqid or live_entry.reqid, + time_ns=time.time_ns(), + ) + + # send cancel to brokerd immediately! + log.info("Submitting cancel for live order") + + # NOTE: cancel response will be relayed back in messages + # from corresponding broker + # await client.submit_cancel(reqid=reqid) + await brokerd_order_stream.send(msg.dict()) - # check for EMS active exec else: + # might be a cancel for order that hasn't been acked yet + # by brokerd so register a cancel for then the order + # does show up later + dark_book._ems_entries[oid] = msg + + # check for EMS active exec try: + # remove from dark book clearing dark_book.orders[symbol].pop(oid, None) - # TODO: move these to `tractor.MsgStream` - await ctx.send_yield({ - 'resp': 'dark_cancelled', - 'oid': oid - }) + # tell client side that we've cancelled the + # dark-trigger order + await client_order_stream.send( + Status( + resp='dark_cancelled', + oid=oid, + time_ns=time.time_ns(), + ).dict() + ) + except KeyError: log.exception(f'No dark order for {symbol}?') + # TODO: 3.10 struct-pattern matching and unpacking here elif action in ('alert', 'buy', 'sell',): - sym = cmd['symbol'] - trigger_price = cmd['price'] - size = cmd['size'] - brokers = cmd['brokers'] - exec_mode = cmd['exec_mode'] + msg = Order(**cmd) - broker = brokers[0] + # sym = cmd['symbol'] + # trigger_price = cmd['price'] + # size = cmd['size'] + # brokers = cmd['brokers'] + # exec_mode = cmd['exec_mode'] + + sym = msg.symbol + trigger_price = msg.price + size = msg.size + exec_mode = msg.exec_mode + broker = msg.brokers[0] if exec_mode == 'live' and action in ('buy', 'sell',): - # register broker id for ems id - order_id = await client.submit_limit( + if live_entry is not None: + # sanity check on emsd id + assert live_entry.oid == oid + + # if we already had a broker order id then + # this is likely an order update commmand. + log.info(f"Modifying order: {live_entry.reqid}") + + # TODO: port to BrokerdOrder message sending + # register broker id for ems id + msg = BrokerdOrder( oid=oid, # no ib support for oids... + time_ns=time.time_ns(), # if this is None, creates a new order # otherwise will modify any existing one - brid=brid, + reqid=reqid, symbol=sym, action=action, @@ -538,25 +714,38 @@ async def process_order_cmds( size=size, ) - if brid: - assert dark_book._broker2ems_ids[brid] == oid - - # if we already had a broker order id then - # this is likely an order update commmand. - log.info(f"Modifying order: {brid}") - - else: - dark_book._broker2ems_ids[order_id] = oid - + # send request to backend # XXX: the trades data broker response loop - # (``process_broker_trades()`` above) will - # handle sending the ems side acks back to - # the cmd sender from here + # (``translate_and_relay_brokerd_events()`` above) will + # handle relaying the ems side responses back to + # the client/cmd sender from this request + print(f'sending live order {msg}') + await brokerd_order_stream.send(msg.dict()) + + # order_id = await client.submit_limit( + + # oid=oid, # no ib support for oids... + + # # if this is None, creates a new order + # # otherwise will modify any existing one + # brid=brid, + + # symbol=sym, + # action=action, + # price=trigger_price, + # size=size, + # ) + + # an immediate response should be brokerd ack with order + # id but we register our request as part of the flow + dark_book._ems_entries[oid] = msg elif exec_mode in ('dark', 'paper') or ( action in ('alert') ): - # submit order to local EMS + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions # Auto-gen scanner predicate: # we automatically figure out what the alert check @@ -590,8 +779,10 @@ async def process_order_cmds( abs_diff_away = 0 # submit execution/order to EMS scan loop - # FYI: this may result in an override of an existing + + # NOTE: this may result in an override of an existing # dark book entry if the order id already exists + dark_book.orders.setdefault( sym, {} )[oid] = ( @@ -601,14 +792,27 @@ async def process_order_cmds( percent_away, abs_diff_away ) + # TODO: if the predicate resolves immediately send the # execution to the broker asap? Or no? # ack-response that order is live in EMS - await ctx.send_yield({ - 'resp': 'dark_submitted', - 'oid': oid - }) + # await ctx.send_yield( + # {'resp': 'dark_submitted', + # 'oid': oid} + # ) + if action == 'alert': + resp = 'alert_submitted' + else: + resp = 'dark_submitted' + + await client_order_stream.send( + Status( + resp=resp, + oid=oid, + time_ns=time.time_ns(), + ).dict() + ) @tractor.context @@ -618,7 +822,8 @@ async def _emsd_main( # client_actor_name: str, broker: str, symbol: str, - _mode: str = 'dark', # ('paper', 'dark', 'live') + _exec_mode: str = 'dark', # ('paper', 'dark', 'live') + loglevel: str = 'info', ) -> None: """EMS (sub)actor entrypoint providing the @@ -635,15 +840,23 @@ async def _emsd_main( received in a stream from that client actor and then responses are streamed back up to the original calling task in the same client. - The task tree is: + The primary ``emsd`` task tree is: + - ``_emsd_main()``: - accepts order cmds, registers execs with exec loop - - - ``exec_loop()``: - run (dark) conditions on inputs and trigger broker submissions - - - ``process_broker_trades()``: - accept normalized trades responses, process and relay to ems client(s) + sets up brokerd feed, order feed with ems client, trades dialogue with + brokderd trading api. + | + - ``start_clearing()``: + run (dark) conditions on inputs and trigger broker submissions + | + - ``translate_and_relay_brokerd_events()``: + accept normalized trades responses from brokerd, process and + relay to ems client(s); this is a effectively a "trade event + reponse" proxy-broker. + | + - ``process_client_order_cmds()``: + accepts order cmds from requesting piker clients, registers + execs with exec loop """ # from ._client import send_order_cmds @@ -651,49 +864,140 @@ async def _emsd_main( global _router dark_book = _router.get_dark_book(broker) + ems_ctx = ctx + + cached_feed = _router.feeds.get((broker, symbol)) + if cached_feed: + # TODO: use cached feeds per calling-actor + log.warning(f'Opening duplicate feed for {(broker, symbol)}') + # spawn one task per broker feed - async with trio.open_nursery() as n: + async with ( + trio.open_nursery() as n, # TODO: eventually support N-brokers - async with data.open_feed( + data.open_feed( broker, [symbol], - loglevel='info', - ) as feed: + loglevel=loglevel, + ) as feed, + ): + if not cached_feed: + _router.feeds[(broker, symbol)] = feed - # get a portal back to the client - # async with tractor.wait_for_actor(client_actor_name) as portal: + # XXX: this should be initial price quote from target provider + first_quote = await feed.receive() - await ctx.started() + # open a stream with the brokerd backend for order + # flow dialogue - # establish 2-way stream with requesting order-client - async with ctx.open_stream() as order_stream: + book = _router.get_dark_book(broker) + book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + portal = feed._brokerd_portal + + if trades_endpoint is None or _exec_mode == 'paper': + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine inside the brokerd + # actor to simulate the real load it'll likely be under + # when also pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + # for paper mode we need to mock this trades response feed + # so we pass a duck-typed feed-looking mem chan which is fed + # fill and submission events from the exec loop + # feed._trade_stream = client.trade_stream + + # init the trades stream + # client._to_trade_stream.send_nowait({'local_trades': 'start'}) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + async with ( + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + + # if trades_endpoint is not None and _exec_mode != 'paper': + + # # TODO: open a bidir stream here? + # # we have an order API for this broker + # client = client_factory(feed._brokerd_portal) + + # else: + + # return control to parent task + # task_status.started((first_quote, feed, client)) + + # stream = feed.stream + + # start the real-time clearing condition scan loop and + # paper engine simulator. + + # n.start_soon( + # start_clearing, + # brokerd_trades_stream, + # feed.stream, # quote stream + # # client, + # broker, + # symbol, + # _exec_mode, + # book, + # ) + + # signal to client that we're started + # TODO: we could send back **all** brokerd positions here? + await ems_ctx.started(positions) + + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as ems_client_order_stream: + + # trigger scan and exec loop + n.start_soon( + clear_dark_triggers, + + brokerd_trades_stream, + ems_client_order_stream, + feed.stream, - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, broker, symbol, - _mode, + book + # ctx, + # client, ) # begin processing order events from the target brokerd backend - await n.start( - process_broker_trades, - ctx, - feed, + n.start_soon( + + translate_and_relay_brokerd_events, + broker, + ems_client_order_stream, + brokerd_trades_stream, dark_book, ) # start inbound (from attached client) order request processing - await process_order_cmds( - ctx, - order_stream, + await process_client_order_cmds( + ems_client_order_stream, + brokerd_trades_stream, symbol, feed, - client, dark_book, ) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index a49858f4..c3c4016a 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -127,9 +127,9 @@ class OrderMode: """ line = self.lines.commit_line(uuid) - req_msg = self.book._sent_orders.get(uuid) - if req_msg: - req_msg.ack_time_ns = time.time_ns() + # req_msg = self.book._sent_orders.get(uuid) + # if req_msg: + # req_msg.ack_time_ns = time.time_ns() return line @@ -317,10 +317,14 @@ async def start_order_mode( # spawn EMS actor-service async with ( - open_ems(brokername, symbol) as (book, trades_stream), + open_ems(brokername, symbol) as (book, trades_stream, positions), open_order_mode(symbol, chart, book) as order_mode ): + # update any exising positions + for sym, msg in positions.items(): + order_mode.on_position_update(msg) + def get_index(time: float): # XXX: not sure why the time is so off here @@ -343,16 +347,15 @@ async def start_order_mode( fmsg = pformat(msg) log.info(f'Received order msg:\n{fmsg}') - resp = msg['resp'] - - if resp in ( + name = msg['name'] + if name in ( 'position', ): # show line label once order is live order_mode.on_position_update(msg) continue - # delete the line from view + resp = msg['resp'] oid = msg['oid'] # response to 'action' request (buy/sell) @@ -375,21 +378,21 @@ async def start_order_mode( order_mode.on_cancel(oid) elif resp in ( - 'dark_executed' + 'dark_triggered' ): log.info(f'Dark order triggered for {fmsg}') - # for alerts add a triangle and remove the - # level line - if msg['cmd']['action'] == 'alert': - - # should only be one "fill" for an alert - order_mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()) - ) - await order_mode.on_exec(oid, msg) + elif resp in ( + 'alert_triggered' + ): + # should only be one "fill" for an alert + # add a triangle and remove the level line + order_mode.on_fill( + oid, + price=msg['trigger_price'], + arrow_index=get_index(time.time()) + ) + await order_mode.on_exec(oid, msg) # response to completed 'action' request for buy/sell elif resp in ( @@ -400,12 +403,15 @@ async def start_order_mode( # each clearing tick is responded individually elif resp in ('broker_filled',): - action = msg['action'] + action = book._sent_orders[oid].action + details = msg['brokerd_msg'] # TODO: some kinda progress system order_mode.on_fill( oid, - price=msg['price'], - arrow_index=get_index(msg['broker_time']), + price=details['price'], pointing='up' if action == 'buy' else 'down', + + # TODO: put the actual exchange timestamp + arrow_index=get_index(details['broker_time']), )