diff --git a/piker/exchange/_ems.py b/piker/exchange/_ems.py index 5ebf1fa3..2ae11ae0 100644 --- a/piker/exchange/_ems.py +++ b/piker/exchange/_ems.py @@ -448,6 +448,145 @@ async def process_broker_trades( await ctx.send_yield(resp) +async def process_order_cmds( + ctx: tractor.Context, + cmd_stream: 'tractor.ReceiveStream', # noqa + symbol: str, + feed: 'Feed', # noqa + client: 'Client', # noqa + dark_book: _DarkBook, +) -> None: + + async for cmd in cmd_stream: + + log.info(f'Received order cmd:\n{pformat(cmd)}') + + action = cmd['action'] + oid = cmd['oid'] + + brid = dark_book._broker2ems_ids.inverse.get(oid) + + # TODO: can't wait for this stuff to land in 3.10 + # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings + if action in ('cancel',): + + # check for live-broker order + if brid: + log.info("Submitting cancel for live order") + await client.submit_cancel(reqid=brid) + + # check for EMS active exec + else: + try: + dark_book.orders[symbol].pop(oid, None) + + await ctx.send_yield({ + 'resp': 'dark_cancelled', + 'oid': oid + }) + except KeyError: + log.exception(f'No dark order for {symbol}?') + + elif action in ('alert', 'buy', 'sell',): + + sym = cmd['symbol'] + trigger_price = cmd['price'] + size = cmd['size'] + brokers = cmd['brokers'] + exec_mode = cmd['exec_mode'] + + broker = brokers[0] + last = dark_book.lasts[(broker, sym)] + + if exec_mode == 'live' and action in ('buy', 'sell',): + + # register broker id for ems id + order_id = await client.submit_limit( + + oid=oid, # no ib support for oids... + + # if this is None, creates a new order + # otherwise will modify any existing one + brid=brid, + + symbol=sym, + action=action, + price=trigger_price, + size=size, + ) + + if brid: + assert dark_book._broker2ems_ids[brid] == oid + + # if we already had a broker order id then + # this is likely an order update commmand. + log.info(f"Modifying order: {brid}") + + else: + dark_book._broker2ems_ids[order_id] = oid + + # XXX: the trades data broker response loop + # (``process_broker_trades()`` above) will + # handle sending the ems side acks back to + # the cmd sender from here + + elif exec_mode in ('dark', 'paper') or ( + action in ('alert') + ): + # submit order to local EMS + + # Auto-gen scanner predicate: + # we automatically figure out what the alert check + # condition should be based on the current first + # price received from the feed, instead of being + # like every other shitty tina platform that makes + # the user choose the predicate operator. + pred = mk_check(trigger_price, last) + + min_tick = feed.symbols[sym].tick_size + + if action == 'buy': + tickfilter = ('ask', 'last', 'trade') + percent_away = 0.005 + + # TODO: we probably need to scale this based + # on some near term historical spread + # measure? + abs_diff_away = 3 * min_tick + + elif action == 'sell': + tickfilter = ('bid', 'last', 'trade') + percent_away = -0.005 + abs_diff_away = -3 * min_tick + + else: # alert + tickfilter = ('trade', 'utrade', 'last') + percent_away = 0 + abs_diff_away = 0 + + # submit execution/order to EMS scan loop + # FYI: this may result in an override of an existing + # dark book entry if the order id already exists + + dark_book.orders.setdefault( + sym, {} + )[oid] = ( + pred, + tickfilter, + cmd, + percent_away, + abs_diff_away + ) + # TODO: if the predicate resolves immediately send the + # execution to the broker asap? Or no? + + # ack-response that order is live in EMS + await ctx.send_yield({ + 'resp': 'dark_submitted', + 'oid': oid + }) + + @tractor.stream async def _emsd_main( ctx: tractor.Context, @@ -513,133 +652,14 @@ async def _emsd_main( # acting as an EMS client and will submit orders) to # receive requests pushed over a tractor stream # using (for now) an async generator. - async for cmd in await portal.run(send_order_cmds): + order_stream = await portal.run(send_order_cmds) - log.info(f'Received order cmd:\n{pformat(cmd)}') - - action = cmd['action'] - oid = cmd['oid'] - - brid = dark_book._broker2ems_ids.inverse.get(oid) - - # TODO: can't wait for this stuff to land in 3.10 - # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings - if action in ('cancel',): - - # check for live-broker order - if brid: - log.info("Submitting cancel for live order") - await client.submit_cancel(reqid=brid) - - # check for EMS active exec - else: - try: - dark_book.orders[symbol].pop(oid, None) - - await ctx.send_yield({ - 'resp': 'dark_cancelled', - 'oid': oid - }) - except KeyError: - log.exception(f'No dark order for {symbol}?') - - elif action in ('alert', 'buy', 'sell',): - - sym = cmd['symbol'] - trigger_price = cmd['price'] - size = cmd['size'] - brokers = cmd['brokers'] - exec_mode = cmd.get('exec_mode', _mode) - - broker = brokers[0] - last = dark_book.lasts[(broker, sym)] - - if exec_mode == 'live' and action in ('buy', 'sell',): - - # register broker id for ems id - order_id = await client.submit_limit( - - oid=oid, # no ib support for oids... - - # if this is None, creates a new order - # otherwise will modify any existing one - brid=brid, - - symbol=sym, - action=action, - price=trigger_price, - size=size, - ) - - if brid: - assert dark_book._broker2ems_ids[brid] == oid - - # if we already had a broker order id then - # this is likely an order update commmand. - log.info(f"Modifying order: {brid}") - - else: - dark_book._broker2ems_ids[order_id] = oid - - # XXX: the trades data broker response loop - # (``process_broker_trades()`` above) will - # handle sending the ems side acks back to - # the cmd sender from here - - elif exec_mode in ('dark', 'paper') or ( - action in ('alert') - ): - # submit order to local EMS - - # Auto-gen scanner predicate: - # we automatically figure out what the alert check - # condition should be based on the current first - # price received from the feed, instead of being - # like every other shitty tina platform that makes - # the user choose the predicate operator. - pred = mk_check(trigger_price, last) - - min_tick = feed.symbols[sym].tick_size - - if action == 'buy': - tickfilter = ('ask', 'last', 'trade') - percent_away = 0.005 - - # TODO: we probably need to scale this based - # on some near term historical spread - # measure? - abs_diff_away = 3 * min_tick - - elif action == 'sell': - tickfilter = ('bid', 'last', 'trade') - percent_away = -0.005 - abs_diff_away = -3 * min_tick - - else: # alert - tickfilter = ('trade', 'utrade', 'last') - percent_away = 0 - abs_diff_away = 0 - - # submit execution/order to EMS scan loop - # FYI: this may result in an override of an existing - # dark book entry if the order id already exists - - dark_book.orders.setdefault( - sym, {} - )[oid] = ( - pred, - tickfilter, - cmd, - percent_away, - abs_diff_away - ) - # TODO: if the predicate resolves immediately send the - # execution to the broker asap? Or no? - - # ack-response that order is live in EMS - await ctx.send_yield({ - 'resp': 'dark_submitted', - 'oid': oid - }) - - # continue and wait on next order cmd + # start inbound order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + )