Support order modification in ems request loop

basic_orders
Tyler Goodlet 2021-03-07 13:12:39 -05:00
parent a43ab1b983
commit 919ecab732
1 changed files with 47 additions and 24 deletions

View File

@ -138,6 +138,7 @@ async def execute_triggers(
""" """
# this stream may eventually contain multiple symbols # this stream may eventually contain multiple symbols
# XXX: optimize this for speed!
async for quotes in stream: async for quotes in stream:
# TODO: numba all this! # TODO: numba all this!
@ -183,6 +184,14 @@ async def execute_triggers(
reqid = await client.submit_limit( reqid = await client.submit_limit(
oid=oid, oid=oid,
# this is a brand new order request for the
# underlying broker so we set out "broker request
# id" (brid) as nothing so that the broker
# client knows that we aren't trying to modify
# an existing order.
brid=None,
symbol=sym, symbol=sym,
action=cmd['action'], action=cmd['action'],
price=submit_price, price=submit_price,
@ -275,11 +284,6 @@ async def exec_loop(
# return control to parent task # return control to parent task
task_status.started((first_quote, feed, client)) task_status.started((first_quote, feed, client))
##############################
# begin price actions sequence
# XXX: optimize this for speed
##############################
# shield this field so the remote brokerd does not get cancelled # shield this field so the remote brokerd does not get cancelled
stream = feed.stream stream = feed.stream
with stream.shield(): with stream.shield():
@ -346,6 +350,7 @@ async def process_broker_trades(
async for event in trades_stream: async for event in trades_stream:
name, msg = event['local_trades'] name, msg = event['local_trades']
log.info(f'Received broker trade event:\n{pformat(msg)}') log.info(f'Received broker trade event:\n{pformat(msg)}')
# Get the broker (order) request id, this **must** be normalized # Get the broker (order) request id, this **must** be normalized
@ -413,7 +418,6 @@ async def process_broker_trades(
status = msg['status'].lower() status = msg['status'].lower()
if status == 'filled': if status == 'filled':
# await tractor.breakpoint()
# conditional execution is fully complete, no more # conditional execution is fully complete, no more
# fills for the noted order # fills for the noted order
@ -445,7 +449,7 @@ async def process_broker_trades(
@tractor.stream @tractor.stream
async def _ems_main( async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
client_actor_name: str, client_actor_name: str,
broker: str, broker: str,
@ -467,7 +471,7 @@ async def _ems_main(
streamed back up to the original calling task in the same client. streamed back up to the original calling task in the same client.
The task tree is: The task tree is:
- ``_ems_main()``: - ``_emsd_main()``:
accepts order cmds, registers execs with exec loop accepts order cmds, registers execs with exec loop
- ``exec_loop()``: - ``exec_loop()``:
@ -479,7 +483,7 @@ async def _ems_main(
""" """
from ._client import send_order_cmds from ._client import send_order_cmds
book = get_dark_book(broker) dark_book = get_dark_book(broker)
# get a portal back to the client # get a portal back to the client
async with tractor.wait_for_actor(client_actor_name) as portal: async with tractor.wait_for_actor(client_actor_name) as portal:
@ -502,7 +506,7 @@ async def _ems_main(
process_broker_trades, process_broker_trades,
ctx, ctx,
feed, feed,
book, dark_book,
) )
# connect back to the calling actor (the one that is # connect back to the calling actor (the one that is
@ -516,12 +520,13 @@ async def _ems_main(
action = cmd['action'] action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
brid = dark_book._broker2ems_ids.inverse.get(oid)
# TODO: can't wait for this stuff to land in 3.10 # TODO: can't wait for this stuff to land in 3.10
# https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings
if action in ('cancel',): if action in ('cancel',):
# check for live-broker order # check for live-broker order
brid = book._broker2ems_ids.inverse.get(oid)
if brid: if brid:
log.info("Submitting cancel for live order") log.info("Submitting cancel for live order")
await client.submit_cancel(reqid=brid) await client.submit_cancel(reqid=brid)
@ -529,7 +534,7 @@ async def _ems_main(
# check for EMS active exec # check for EMS active exec
else: else:
try: try:
book.orders[symbol].pop(oid, None) dark_book.orders[symbol].pop(oid, None)
await ctx.send_yield({ await ctx.send_yield({
'resp': 'dark_cancelled', 'resp': 'dark_cancelled',
@ -547,30 +552,43 @@ async def _ems_main(
exec_mode = cmd.get('exec_mode', _mode) exec_mode = cmd.get('exec_mode', _mode)
broker = brokers[0] broker = brokers[0]
last = book.lasts[(broker, sym)] last = dark_book.lasts[(broker, sym)]
if exec_mode == 'live' and action in ('buy', 'sell',): if exec_mode == 'live' and action in ('buy', 'sell',):
# register broker id for ems id # register broker id for ems id
order_id = await client.submit_limit( order_id = await client.submit_limit(
oid=oid, # no ib support for this
oid=oid, # no ib support for oids...
# if this is None, creates a new order
# otherwise will modify any existing one
brid=brid,
symbol=sym, symbol=sym,
action=action, action=action,
price=trigger_price, price=trigger_price,
size=size, size=size,
) )
book._broker2ems_ids[order_id] = oid
if brid:
assert dark_book._broker2ems_ids[brid] == oid
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying order: {brid}")
else:
dark_book._broker2ems_ids[order_id] = oid
# XXX: the trades data broker response loop # XXX: the trades data broker response loop
# (``process_broker_trades()`` above) will # (``process_broker_trades()`` above) will
# handle sending the ems side acks back to # handle sending the ems side acks back to
# the cmd sender from here # the cmd sender from here
elif exec_mode in ('dark', 'paper') or action in ('alert'): elif exec_mode in ('dark', 'paper') or (
action in ('alert')
# TODO: if the predicate resolves immediately send the ):
# execution to the broker asap? Or no?
# submit order to local EMS # submit order to local EMS
# Auto-gen scanner predicate: # Auto-gen scanner predicate:
@ -581,7 +599,7 @@ async def _ems_main(
# the user choose the predicate operator. # the user choose the predicate operator.
pred = mk_check(trigger_price, last) pred = mk_check(trigger_price, last)
mt = feed.symbols[sym].tick_size min_tick = feed.symbols[sym].tick_size
if action == 'buy': if action == 'buy':
tickfilter = ('ask', 'last', 'trade') tickfilter = ('ask', 'last', 'trade')
@ -590,12 +608,12 @@ async def _ems_main(
# TODO: we probably need to scale this based # TODO: we probably need to scale this based
# on some near term historical spread # on some near term historical spread
# measure? # measure?
abs_diff_away = 3 * mt abs_diff_away = 3 * min_tick
elif action == 'sell': elif action == 'sell':
tickfilter = ('bid', 'last', 'trade') tickfilter = ('bid', 'last', 'trade')
percent_away = -0.005 percent_away = -0.005
abs_diff_away = -3 * mt abs_diff_away = -3 * min_tick
else: # alert else: # alert
tickfilter = ('trade', 'utrade', 'last') tickfilter = ('trade', 'utrade', 'last')
@ -603,7 +621,10 @@ async def _ems_main(
abs_diff_away = 0 abs_diff_away = 0
# submit execution/order to EMS scan loop # submit execution/order to EMS scan loop
book.orders.setdefault( # FYI: this may result in an override of an existing
# dark book entry if the order id already exists
dark_book.orders.setdefault(
sym, {} sym, {}
)[oid] = ( )[oid] = (
pred, pred,
@ -612,6 +633,8 @@ async def _ems_main(
percent_away, percent_away,
abs_diff_away abs_diff_away
) )
# TODO: if the predicate resolves immediately send the
# execution to the broker asap? Or no?
# ack-response that order is live in EMS # ack-response that order is live in EMS
await ctx.send_yield({ await ctx.send_yield({