Deliver existing dialog (msgs) to every EMS client
Ideally every client that connects to the ems can know its state (immediately) meaning relay all the order dialogs that are currently active. This adds full (hacky WIP) support to receive those dialog (msgs) from the `open_ems()` startup values via the `.started()` msg from `_emsd_main()`. Further this adds support to the order mode chart-UI to display existing (live) orders on the chart during startup. Details include, - add a `OrderMode.load_unknown_dialog_from_msg()` for processing and displaying a ``BrokerdStatus`` (for now) msg from the EMS that was not previously created by the current ems client and registering and displaying it on the chart. - break out the ems msg processing into a new `order_mode.process_trade_msg()` func so that it can be called on the startup dialog-msg set as well as eventually used a more general low level auto-strat API (eg. when we get to displaying auto-strat and group trading automatically on an observing chart UI. - hackyness around msg-processing for the dialogs delivery since we're technically delivering `BrokerdStatus` msgs when the client-side processing technically expects `Status` msgs.. we'll rectify this soon!open_order_loading
parent
1cfa04927d
commit
2548aae73d
|
@ -224,11 +224,19 @@ async def open_ems(
|
|||
fqsn=fqsn,
|
||||
exec_mode=mode,
|
||||
|
||||
) as (ctx, (positions, accounts)),
|
||||
) as (
|
||||
ctx,
|
||||
(
|
||||
positions,
|
||||
accounts,
|
||||
dialogs,
|
||||
)
|
||||
),
|
||||
|
||||
# open 2-way trade command stream
|
||||
ctx.open_stream() as trades_stream,
|
||||
):
|
||||
# start sync code order msg delivery task
|
||||
async with trio.open_nursery() as n:
|
||||
n.start_soon(
|
||||
relay_order_cmds_from_sync_code,
|
||||
|
@ -236,4 +244,10 @@ async def open_ems(
|
|||
trades_stream
|
||||
)
|
||||
|
||||
yield book, trades_stream, positions, accounts
|
||||
yield (
|
||||
book,
|
||||
trades_stream,
|
||||
positions,
|
||||
accounts,
|
||||
dialogs,
|
||||
)
|
||||
|
|
|
@ -517,6 +517,48 @@ class OrderMode:
|
|||
|
||||
return ids
|
||||
|
||||
def load_unknown_dialog_from_msg(
|
||||
self,
|
||||
msg: dict,
|
||||
|
||||
) -> OrderDialog:
|
||||
|
||||
oid = str(msg['oid'])
|
||||
size = msg['brokerd_msg']['size']
|
||||
if size >= 0:
|
||||
action = 'buy'
|
||||
else:
|
||||
action = 'sell'
|
||||
|
||||
acct = msg['brokerd_msg']['account']
|
||||
price = msg['brokerd_msg']['price']
|
||||
deats = msg['brokerd_msg']['broker_details']
|
||||
fqsn = (
|
||||
deats['fqsn'] + '.' + deats['name']
|
||||
)
|
||||
symbol = Symbol.from_fqsn(
|
||||
fqsn=fqsn,
|
||||
info={},
|
||||
)
|
||||
# map to order composite-type
|
||||
order = Order(
|
||||
action=action,
|
||||
price=price,
|
||||
account=acct,
|
||||
size=size,
|
||||
symbol=symbol,
|
||||
brokers=symbol.brokers,
|
||||
oid=oid,
|
||||
exec_mode='live', # dark or live
|
||||
)
|
||||
|
||||
dialog = self.submit_order(
|
||||
send_msg=False,
|
||||
order=order,
|
||||
)
|
||||
assert self.dialogs[oid] == dialog
|
||||
return dialog
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_order_mode(
|
||||
|
@ -554,6 +596,7 @@ async def open_order_mode(
|
|||
trades_stream,
|
||||
position_msgs,
|
||||
brokerd_accounts,
|
||||
ems_dialog_msgs,
|
||||
),
|
||||
trio.open_nursery() as tn,
|
||||
|
||||
|
@ -760,195 +803,183 @@ async def open_order_mode(
|
|||
# to handle input since the ems connection is ready
|
||||
started.set()
|
||||
|
||||
for oid, msg in ems_dialog_msgs.items():
|
||||
|
||||
# HACK ALERT: ensure a resp field is filled out since
|
||||
# techincally the call below expects a ``Status``. TODO:
|
||||
# parse into proper ``Status`` equivalents ems-side?
|
||||
msg.setdefault('resp', msg['broker_details']['resp'])
|
||||
msg.setdefault('oid', msg['broker_details']['oid'])
|
||||
msg['brokerd_msg'] = msg
|
||||
|
||||
await process_trade_msg(
|
||||
mode,
|
||||
book,
|
||||
msg,
|
||||
)
|
||||
|
||||
tn.start_soon(
|
||||
process_trades_and_update_ui,
|
||||
tn,
|
||||
feed,
|
||||
mode,
|
||||
trades_stream,
|
||||
mode,
|
||||
book,
|
||||
)
|
||||
|
||||
yield mode
|
||||
|
||||
|
||||
async def process_trades_and_update_ui(
|
||||
|
||||
n: trio.Nursery,
|
||||
feed: Feed,
|
||||
mode: OrderMode,
|
||||
trades_stream: tractor.MsgStream,
|
||||
mode: OrderMode,
|
||||
book: OrderBook,
|
||||
|
||||
) -> None:
|
||||
|
||||
get_index = mode.chart.get_index
|
||||
global _pnl_tasks
|
||||
|
||||
# this is where we receive **back** messages
|
||||
# about executions **from** the EMS actor
|
||||
async for msg in trades_stream:
|
||||
await process_trade_msg(
|
||||
mode,
|
||||
book,
|
||||
msg,
|
||||
)
|
||||
|
||||
fmsg = pformat(msg)
|
||||
log.info(f'Received order msg:\n{fmsg}')
|
||||
|
||||
name = msg['name']
|
||||
if name in (
|
||||
'position',
|
||||
async def process_trade_msg(
|
||||
mode: OrderMode,
|
||||
book: OrderBook,
|
||||
msg: dict,
|
||||
|
||||
) -> None:
|
||||
|
||||
get_index = mode.chart.get_index
|
||||
fmsg = pformat(msg)
|
||||
log.info(f'Received order msg:\n{fmsg}')
|
||||
name = msg['name']
|
||||
if name in (
|
||||
'position',
|
||||
):
|
||||
sym = mode.chart.linked.symbol
|
||||
pp_msg_symbol = msg['symbol'].lower()
|
||||
fqsn = sym.front_fqsn()
|
||||
broker, key = sym.front_feed()
|
||||
if (
|
||||
pp_msg_symbol == fqsn
|
||||
or pp_msg_symbol == fqsn.removesuffix(f'.{broker}')
|
||||
):
|
||||
sym = mode.chart.linked.symbol
|
||||
pp_msg_symbol = msg['symbol'].lower()
|
||||
fqsn = sym.front_fqsn()
|
||||
broker, key = sym.front_feed()
|
||||
if (
|
||||
pp_msg_symbol == fqsn
|
||||
or pp_msg_symbol == fqsn.removesuffix(f'.{broker}')
|
||||
):
|
||||
log.info(f'{fqsn} matched pp msg: {fmsg}')
|
||||
tracker = mode.trackers[msg['account']]
|
||||
tracker.live_pp.update_from_msg(msg)
|
||||
# update order pane widgets
|
||||
tracker.update_from_pp()
|
||||
mode.pane.update_status_ui(tracker)
|
||||
log.info(f'{fqsn} matched pp msg: {fmsg}')
|
||||
tracker = mode.trackers[msg['account']]
|
||||
tracker.live_pp.update_from_msg(msg)
|
||||
# update order pane widgets
|
||||
tracker.update_from_pp()
|
||||
mode.pane.update_status_ui(tracker)
|
||||
|
||||
if tracker.live_pp.size:
|
||||
# display pnl
|
||||
mode.pane.display_pnl(tracker)
|
||||
if tracker.live_pp.size:
|
||||
# display pnl
|
||||
mode.pane.display_pnl(tracker)
|
||||
|
||||
# short circuit to next msg to avoid
|
||||
# unnecessary msg content lookups
|
||||
continue
|
||||
# short circuit to next msg to avoid
|
||||
# unnecessary msg content lookups
|
||||
return
|
||||
# continue
|
||||
|
||||
resp = msg['resp']
|
||||
oid = str(msg['oid'])
|
||||
dialog = mode.dialogs.get(oid)
|
||||
resp = msg['resp']
|
||||
oid = str(msg['oid'])
|
||||
dialog = mode.dialogs.get(oid)
|
||||
|
||||
if dialog is None:
|
||||
log.warning(f'received msg for untracked dialog:\n{fmsg}')
|
||||
if dialog is None:
|
||||
log.warning(
|
||||
f'received msg for untracked dialog:\n{fmsg}'
|
||||
)
|
||||
dialog = mode.load_unknown_dialog_from_msg(msg)
|
||||
|
||||
size = msg['brokerd_msg']['size']
|
||||
if size >= 0:
|
||||
action = 'buy'
|
||||
else:
|
||||
action = 'sell'
|
||||
# record message to dialog tracking
|
||||
dialog.msgs[oid] = msg
|
||||
|
||||
acct = msg['brokerd_msg']['account']
|
||||
price = msg['brokerd_msg']['price']
|
||||
deats = msg['brokerd_msg']['broker_details']
|
||||
fqsn = (
|
||||
deats['fqsn'] + '.' + deats['name']
|
||||
)
|
||||
symbol = Symbol.from_fqsn(
|
||||
fqsn=fqsn,
|
||||
info={},
|
||||
)
|
||||
# map to order composite-type
|
||||
order = Order(
|
||||
action=action,
|
||||
price=price,
|
||||
account=acct,
|
||||
size=size,
|
||||
symbol=symbol,
|
||||
brokers=symbol.brokers,
|
||||
oid=oid,
|
||||
exec_mode='live', # dark or live
|
||||
)
|
||||
# response to 'action' request (buy/sell)
|
||||
if resp in (
|
||||
'dark_submitted',
|
||||
'broker_submitted'
|
||||
):
|
||||
# show line label once order is live
|
||||
mode.on_submit(oid)
|
||||
|
||||
dialog = mode.submit_order(
|
||||
send_msg=False,
|
||||
order=order,
|
||||
)
|
||||
# resp to 'cancel' request or error condition
|
||||
# for action request
|
||||
elif resp in (
|
||||
'broker_inactive',
|
||||
'broker_errored',
|
||||
):
|
||||
# delete level line from view
|
||||
mode.on_cancel(oid)
|
||||
broker_msg = msg['brokerd_msg']
|
||||
log.error(
|
||||
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
|
||||
)
|
||||
|
||||
# # TODO: enable pure tracking / mirroring of dialogs
|
||||
# # is desired.
|
||||
elif resp in (
|
||||
'broker_cancelled',
|
||||
'dark_cancelled'
|
||||
):
|
||||
# delete level line from view
|
||||
mode.on_cancel(oid)
|
||||
broker_msg = msg['brokerd_msg']
|
||||
log.cancel(
|
||||
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
|
||||
)
|
||||
|
||||
elif resp in (
|
||||
'dark_triggered'
|
||||
):
|
||||
log.info(f'Dark order triggered for {fmsg}')
|
||||
|
||||
elif resp in (
|
||||
'alert_triggered'
|
||||
):
|
||||
# should only be one "fill" for an alert
|
||||
# add a triangle and remove the level line
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=msg['trigger_price'],
|
||||
arrow_index=get_index(time.time()),
|
||||
)
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
await mode.on_exec(oid, msg)
|
||||
|
||||
# response to completed 'action' request for buy/sell
|
||||
elif resp in (
|
||||
'broker_executed',
|
||||
):
|
||||
# right now this is just triggering a system alert
|
||||
await mode.on_exec(oid, msg)
|
||||
|
||||
if msg['brokerd_msg']['remaining'] == 0:
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
|
||||
# each clearing tick is responded individually
|
||||
elif resp in (
|
||||
'broker_filled',
|
||||
):
|
||||
known_order = book._sent_orders.get(oid)
|
||||
if not known_order:
|
||||
log.warning(f'order {oid} is unknown')
|
||||
return
|
||||
# continue
|
||||
|
||||
# record message to dialog tracking
|
||||
dialog.msgs[oid] = msg
|
||||
action = known_order.action
|
||||
details = msg['brokerd_msg']
|
||||
|
||||
# response to 'action' request (buy/sell)
|
||||
if resp in (
|
||||
'dark_submitted',
|
||||
'broker_submitted'
|
||||
):
|
||||
# TODO: some kinda progress system
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=details['price'],
|
||||
pointing='up' if action == 'buy' else 'down',
|
||||
|
||||
# show line label once order is live
|
||||
mode.on_submit(oid)
|
||||
# TODO: put the actual exchange timestamp
|
||||
arrow_index=get_index(details['broker_time']),
|
||||
)
|
||||
|
||||
# resp to 'cancel' request or error condition
|
||||
# for action request
|
||||
elif resp in (
|
||||
'broker_inactive',
|
||||
'broker_errored',
|
||||
):
|
||||
# delete level line from view
|
||||
mode.on_cancel(oid)
|
||||
broker_msg = msg['brokerd_msg']
|
||||
log.error(
|
||||
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
|
||||
)
|
||||
|
||||
elif resp in (
|
||||
'broker_cancelled',
|
||||
'dark_cancelled'
|
||||
):
|
||||
# delete level line from view
|
||||
mode.on_cancel(oid)
|
||||
broker_msg = msg['brokerd_msg']
|
||||
log.cancel(
|
||||
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
|
||||
)
|
||||
|
||||
elif resp in (
|
||||
'dark_triggered'
|
||||
):
|
||||
log.info(f'Dark order triggered for {fmsg}')
|
||||
|
||||
elif resp in (
|
||||
'alert_triggered'
|
||||
):
|
||||
# should only be one "fill" for an alert
|
||||
# add a triangle and remove the level line
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=msg['trigger_price'],
|
||||
arrow_index=get_index(time.time()),
|
||||
)
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
await mode.on_exec(oid, msg)
|
||||
|
||||
# response to completed 'action' request for buy/sell
|
||||
elif resp in (
|
||||
'broker_executed',
|
||||
):
|
||||
# right now this is just triggering a system alert
|
||||
await mode.on_exec(oid, msg)
|
||||
|
||||
if msg['brokerd_msg']['remaining'] == 0:
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
|
||||
# each clearing tick is responded individually
|
||||
elif resp in (
|
||||
'broker_filled',
|
||||
):
|
||||
|
||||
known_order = book._sent_orders.get(oid)
|
||||
if not known_order:
|
||||
log.warning(f'order {oid} is unknown')
|
||||
continue
|
||||
|
||||
action = known_order.action
|
||||
details = msg['brokerd_msg']
|
||||
|
||||
# TODO: some kinda progress system
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=details['price'],
|
||||
pointing='up' if action == 'buy' else 'down',
|
||||
|
||||
# TODO: put the actual exchange timestamp
|
||||
arrow_index=get_index(details['broker_time']),
|
||||
)
|
||||
|
||||
# TODO: how should we look this up?
|
||||
# tracker = mode.trackers[msg['account']]
|
||||
# tracker.live_pp.fills.append(msg)
|
||||
# TODO: how should we look this up?
|
||||
# tracker = mode.trackers[msg['account']]
|
||||
# tracker.live_pp.fills.append(msg)
|
||||
|
|
Loading…
Reference in New Issue