Deliver existing dialog (msgs) to every EMS client

Ideally every client that connects to the ems can know its state
(immediately) meaning relay all the order dialogs that are currently
active. This adds full (hacky WIP) support to receive those dialog
(msgs) from the `open_ems()` startup values via the `.started()` msg
from `_emsd_main()`.

Further this adds support to the order mode chart-UI to display existing
(live) orders on the chart during startup. Details include,

- add a `OrderMode.load_unknown_dialog_from_msg()` for processing and
  displaying a ``BrokerdStatus`` (for now) msg from the EMS that was not
  previously created by the current ems client and registering and
  displaying it on the chart.
- break out the ems msg processing into a new
  `order_mode.process_trade_msg()` func so that it can be called on the
  startup dialog-msg set as well as eventually used a more general low
  level auto-strat API (eg. when we get to displaying auto-strat and
  group trading automatically on an observing chart UI.
- hackyness around msg-processing for the dialogs delivery since we're
  technically delivering `BrokerdStatus` msgs when the client-side
  processing technically expects `Status` msgs.. we'll rectify this
  soon!
dict_differ
Tyler Goodlet 2022-08-05 20:39:00 -04:00
parent aa204228ab
commit 5ac5743c66
2 changed files with 205 additions and 160 deletions

View File

@ -224,11 +224,19 @@ async def open_ems(
fqsn=fqsn, fqsn=fqsn,
exec_mode=mode, exec_mode=mode,
) as (ctx, (positions, accounts)), ) as (
ctx,
(
positions,
accounts,
dialogs,
)
),
# open 2-way trade command stream # open 2-way trade command stream
ctx.open_stream() as trades_stream, ctx.open_stream() as trades_stream,
): ):
# start sync code order msg delivery task
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon( n.start_soon(
relay_order_cmds_from_sync_code, relay_order_cmds_from_sync_code,
@ -236,4 +244,10 @@ async def open_ems(
trades_stream trades_stream
) )
yield book, trades_stream, positions, accounts yield (
book,
trades_stream,
positions,
accounts,
dialogs,
)

View File

@ -517,6 +517,48 @@ class OrderMode:
return ids return ids
def load_unknown_dialog_from_msg(
self,
msg: dict,
) -> OrderDialog:
oid = str(msg['oid'])
size = msg['brokerd_msg']['size']
if size >= 0:
action = 'buy'
else:
action = 'sell'
acct = msg['brokerd_msg']['account']
price = msg['brokerd_msg']['price']
deats = msg['brokerd_msg']['broker_details']
fqsn = (
deats['fqsn'] + '.' + deats['name']
)
symbol = Symbol.from_fqsn(
fqsn=fqsn,
info={},
)
# map to order composite-type
order = Order(
action=action,
price=price,
account=acct,
size=size,
symbol=symbol,
brokers=symbol.brokers,
oid=oid,
exec_mode='live', # dark or live
)
dialog = self.submit_order(
send_msg=False,
order=order,
)
assert self.dialogs[oid] == dialog
return dialog
@asynccontextmanager @asynccontextmanager
async def open_order_mode( async def open_order_mode(
@ -554,6 +596,7 @@ async def open_order_mode(
trades_stream, trades_stream,
position_msgs, position_msgs,
brokerd_accounts, brokerd_accounts,
ems_dialog_msgs,
), ),
trio.open_nursery() as tn, trio.open_nursery() as tn,
@ -760,195 +803,183 @@ async def open_order_mode(
# to handle input since the ems connection is ready # to handle input since the ems connection is ready
started.set() started.set()
for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
msg.setdefault('resp', msg['broker_details']['resp'])
msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg
await process_trade_msg(
mode,
book,
msg,
)
tn.start_soon( tn.start_soon(
process_trades_and_update_ui, process_trades_and_update_ui,
tn,
feed,
mode,
trades_stream, trades_stream,
mode,
book, book,
) )
yield mode yield mode
async def process_trades_and_update_ui( async def process_trades_and_update_ui(
n: trio.Nursery,
feed: Feed,
mode: OrderMode,
trades_stream: tractor.MsgStream, trades_stream: tractor.MsgStream,
mode: OrderMode,
book: OrderBook, book: OrderBook,
) -> None: ) -> None:
get_index = mode.chart.get_index
global _pnl_tasks
# this is where we receive **back** messages # this is where we receive **back** messages
# about executions **from** the EMS actor # about executions **from** the EMS actor
async for msg in trades_stream: async for msg in trades_stream:
await process_trade_msg(
mode,
book,
msg,
)
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
name = msg['name'] async def process_trade_msg(
if name in ( mode: OrderMode,
'position', book: OrderBook,
msg: dict,
) -> None:
get_index = mode.chart.get_index
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
name = msg['name']
if name in (
'position',
):
sym = mode.chart.linked.symbol
pp_msg_symbol = msg['symbol'].lower()
fqsn = sym.front_fqsn()
broker, key = sym.front_feed()
if (
pp_msg_symbol == fqsn
or pp_msg_symbol == fqsn.removesuffix(f'.{broker}')
): ):
sym = mode.chart.linked.symbol log.info(f'{fqsn} matched pp msg: {fmsg}')
pp_msg_symbol = msg['symbol'].lower() tracker = mode.trackers[msg['account']]
fqsn = sym.front_fqsn() tracker.live_pp.update_from_msg(msg)
broker, key = sym.front_feed() # update order pane widgets
if ( tracker.update_from_pp()
pp_msg_symbol == fqsn mode.pane.update_status_ui(tracker)
or pp_msg_symbol == fqsn.removesuffix(f'.{broker}')
):
log.info(f'{fqsn} matched pp msg: {fmsg}')
tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg)
# update order pane widgets
tracker.update_from_pp()
mode.pane.update_status_ui(tracker)
if tracker.live_pp.size: if tracker.live_pp.size:
# display pnl # display pnl
mode.pane.display_pnl(tracker) mode.pane.display_pnl(tracker)
# short circuit to next msg to avoid # short circuit to next msg to avoid
# unnecessary msg content lookups # unnecessary msg content lookups
continue return
# continue
resp = msg['resp'] resp = msg['resp']
oid = str(msg['oid']) oid = str(msg['oid'])
dialog = mode.dialogs.get(oid) dialog = mode.dialogs.get(oid)
if dialog is None: if dialog is None:
log.warning(f'received msg for untracked dialog:\n{fmsg}') log.warning(
f'received msg for untracked dialog:\n{fmsg}'
)
dialog = mode.load_unknown_dialog_from_msg(msg)
size = msg['brokerd_msg']['size'] # record message to dialog tracking
if size >= 0: dialog.msgs[oid] = msg
action = 'buy'
else:
action = 'sell'
acct = msg['brokerd_msg']['account'] # response to 'action' request (buy/sell)
price = msg['brokerd_msg']['price'] if resp in (
deats = msg['brokerd_msg']['broker_details'] 'dark_submitted',
fqsn = ( 'broker_submitted'
deats['fqsn'] + '.' + deats['name'] ):
) # show line label once order is live
symbol = Symbol.from_fqsn( mode.on_submit(oid)
fqsn=fqsn,
info={},
)
# map to order composite-type
order = Order(
action=action,
price=price,
account=acct,
size=size,
symbol=symbol,
brokers=symbol.brokers,
oid=oid,
exec_mode='live', # dark or live
)
dialog = mode.submit_order( # resp to 'cancel' request or error condition
send_msg=False, # for action request
order=order, elif resp in (
) 'broker_inactive',
'broker_errored',
):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
# # TODO: enable pure tracking / mirroring of dialogs elif resp in (
# # is desired. 'broker_cancelled',
'dark_cancelled'
):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.cancel(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (
'dark_triggered'
):
log.info(f'Dark order triggered for {fmsg}')
elif resp in (
'alert_triggered'
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
mode.on_fill(
oid,
price=msg['trigger_price'],
arrow_index=get_index(time.time()),
)
mode.lines.remove_line(uuid=oid)
await mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell
elif resp in (
'broker_executed',
):
# right now this is just triggering a system alert
await mode.on_exec(oid, msg)
if msg['brokerd_msg']['remaining'] == 0:
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
elif resp in (
'broker_filled',
):
known_order = book._sent_orders.get(oid)
if not known_order:
log.warning(f'order {oid} is unknown')
return
# continue # continue
# record message to dialog tracking action = known_order.action
dialog.msgs[oid] = msg details = msg['brokerd_msg']
# response to 'action' request (buy/sell) # TODO: some kinda progress system
if resp in ( mode.on_fill(
'dark_submitted', oid,
'broker_submitted' price=details['price'],
): pointing='up' if action == 'buy' else 'down',
# show line label once order is live # TODO: put the actual exchange timestamp
mode.on_submit(oid) arrow_index=get_index(details['broker_time']),
)
# resp to 'cancel' request or error condition # TODO: how should we look this up?
# for action request # tracker = mode.trackers[msg['account']]
elif resp in ( # tracker.live_pp.fills.append(msg)
'broker_inactive',
'broker_errored',
):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (
'broker_cancelled',
'dark_cancelled'
):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.cancel(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (
'dark_triggered'
):
log.info(f'Dark order triggered for {fmsg}')
elif resp in (
'alert_triggered'
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
mode.on_fill(
oid,
price=msg['trigger_price'],
arrow_index=get_index(time.time()),
)
mode.lines.remove_line(uuid=oid)
await mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell
elif resp in (
'broker_executed',
):
# right now this is just triggering a system alert
await mode.on_exec(oid, msg)
if msg['brokerd_msg']['remaining'] == 0:
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
elif resp in (
'broker_filled',
):
known_order = book._sent_orders.get(oid)
if not known_order:
log.warning(f'order {oid} is unknown')
continue
action = known_order.action
details = msg['brokerd_msg']
# TODO: some kinda progress system
mode.on_fill(
oid,
price=details['price'],
pointing='up' if action == 'buy' else 'down',
# TODO: put the actual exchange timestamp
arrow_index=get_index(details['broker_time']),
)
# TODO: how should we look this up?
# tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg)