From 919ecab732a74be0ea15352c082472243e841e89 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 7 Mar 2021 13:12:39 -0500 Subject: [PATCH] Support order modification in ems request loop --- piker/exchange/_ems.py | 71 ++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/piker/exchange/_ems.py b/piker/exchange/_ems.py index a3938b06..5ebf1fa3 100644 --- a/piker/exchange/_ems.py +++ b/piker/exchange/_ems.py @@ -138,6 +138,7 @@ async def execute_triggers( """ # this stream may eventually contain multiple symbols + # XXX: optimize this for speed! async for quotes in stream: # TODO: numba all this! @@ -183,6 +184,14 @@ async def execute_triggers( reqid = await client.submit_limit( oid=oid, + + # this is a brand new order request for the + # underlying broker so we set out "broker request + # id" (brid) as nothing so that the broker + # client knows that we aren't trying to modify + # an existing order. + brid=None, + symbol=sym, action=cmd['action'], price=submit_price, @@ -275,11 +284,6 @@ async def exec_loop( # return control to parent task task_status.started((first_quote, feed, client)) - ############################## - # begin price actions sequence - # XXX: optimize this for speed - ############################## - # shield this field so the remote brokerd does not get cancelled stream = feed.stream with stream.shield(): @@ -346,6 +350,7 @@ async def process_broker_trades( async for event in trades_stream: name, msg = event['local_trades'] + log.info(f'Received broker trade event:\n{pformat(msg)}') # Get the broker (order) request id, this **must** be normalized @@ -413,7 +418,6 @@ async def process_broker_trades( status = msg['status'].lower() if status == 'filled': - # await tractor.breakpoint() # conditional execution is fully complete, no more # fills for the noted order @@ -445,7 +449,7 @@ async def process_broker_trades( @tractor.stream -async def _ems_main( +async def _emsd_main( ctx: tractor.Context, client_actor_name: str, broker: str, @@ -467,7 +471,7 @@ async def _ems_main( streamed back up to the original calling task in the same client. The task tree is: - - ``_ems_main()``: + - ``_emsd_main()``: accepts order cmds, registers execs with exec loop - ``exec_loop()``: @@ -479,7 +483,7 @@ async def _ems_main( """ from ._client import send_order_cmds - book = get_dark_book(broker) + dark_book = get_dark_book(broker) # get a portal back to the client async with tractor.wait_for_actor(client_actor_name) as portal: @@ -502,7 +506,7 @@ async def _ems_main( process_broker_trades, ctx, feed, - book, + dark_book, ) # connect back to the calling actor (the one that is @@ -516,12 +520,13 @@ async def _ems_main( action = cmd['action'] oid = cmd['oid'] + brid = dark_book._broker2ems_ids.inverse.get(oid) + # TODO: can't wait for this stuff to land in 3.10 # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings if action in ('cancel',): # check for live-broker order - brid = book._broker2ems_ids.inverse.get(oid) if brid: log.info("Submitting cancel for live order") await client.submit_cancel(reqid=brid) @@ -529,7 +534,7 @@ async def _ems_main( # check for EMS active exec else: try: - book.orders[symbol].pop(oid, None) + dark_book.orders[symbol].pop(oid, None) await ctx.send_yield({ 'resp': 'dark_cancelled', @@ -547,30 +552,43 @@ async def _ems_main( exec_mode = cmd.get('exec_mode', _mode) broker = brokers[0] - last = book.lasts[(broker, sym)] + last = dark_book.lasts[(broker, sym)] if exec_mode == 'live' and action in ('buy', 'sell',): # register broker id for ems id order_id = await client.submit_limit( - oid=oid, # no ib support for this + + oid=oid, # no ib support for oids... + + # if this is None, creates a new order + # otherwise will modify any existing one + brid=brid, + symbol=sym, action=action, price=trigger_price, size=size, ) - book._broker2ems_ids[order_id] = oid + + if brid: + assert dark_book._broker2ems_ids[brid] == oid + + # if we already had a broker order id then + # this is likely an order update commmand. + log.info(f"Modifying order: {brid}") + + else: + dark_book._broker2ems_ids[order_id] = oid # XXX: the trades data broker response loop # (``process_broker_trades()`` above) will # handle sending the ems side acks back to # the cmd sender from here - elif exec_mode in ('dark', 'paper') or action in ('alert'): - - # TODO: if the predicate resolves immediately send the - # execution to the broker asap? Or no? - + elif exec_mode in ('dark', 'paper') or ( + action in ('alert') + ): # submit order to local EMS # Auto-gen scanner predicate: @@ -581,7 +599,7 @@ async def _ems_main( # the user choose the predicate operator. pred = mk_check(trigger_price, last) - mt = feed.symbols[sym].tick_size + min_tick = feed.symbols[sym].tick_size if action == 'buy': tickfilter = ('ask', 'last', 'trade') @@ -590,12 +608,12 @@ async def _ems_main( # TODO: we probably need to scale this based # on some near term historical spread # measure? - abs_diff_away = 3 * mt + abs_diff_away = 3 * min_tick elif action == 'sell': tickfilter = ('bid', 'last', 'trade') percent_away = -0.005 - abs_diff_away = -3 * mt + abs_diff_away = -3 * min_tick else: # alert tickfilter = ('trade', 'utrade', 'last') @@ -603,7 +621,10 @@ async def _ems_main( abs_diff_away = 0 # submit execution/order to EMS scan loop - book.orders.setdefault( + # FYI: this may result in an override of an existing + # dark book entry if the order id already exists + + dark_book.orders.setdefault( sym, {} )[oid] = ( pred, @@ -612,6 +633,8 @@ async def _ems_main( percent_away, abs_diff_away ) + # TODO: if the predicate resolves immediately send the + # execution to the broker asap? Or no? # ack-response that order is live in EMS await ctx.send_yield({