First draft: relay open orders through ems and display on chart

open_order_loading
Tyler Goodlet 2022-08-05 14:51:15 -04:00
parent 9e36dbe47f
commit 682a0191ef
4 changed files with 170 additions and 105 deletions

View File

@ -149,10 +149,14 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd.symbol == symbol_key: sym = cmd.symbol
log.info(f'Send order cmd:\n{pformat(cmd)}') msg = pformat(cmd)
if sym == symbol_key:
log.info(f'Send order cmd:\n{msg}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)
else:
log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}')
@acm @acm

View File

@ -188,9 +188,9 @@ async def clear_dark_triggers(
tuple(execs.items()) tuple(execs.items())
): ):
if ( if (
not pred or not pred
ttype not in tf or or ttype not in tf
not pred(price) or not pred(price)
): ):
# log.runtime( # log.runtime(
# f'skipping quote for {sym} ' # f'skipping quote for {sym} '
@ -345,7 +345,7 @@ class Router(Struct):
already exists. already exists.
''' '''
relay = self.relays.get(feed.mod.name) relay: TradesRelay = self.relays.get(feed.mod.name)
if ( if (
relay is None relay is None
@ -452,7 +452,6 @@ async def open_brokerd_trades_dialogue(
async with ( async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream, brokerd_ctx.open_stream() as brokerd_trades_stream,
): ):
# XXX: really we only want one stream per `emsd` actor # XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're # to relay global `brokerd` order events unless we're
@ -718,6 +717,43 @@ async def translate_and_relay_brokerd_events(
# one of {submitted, cancelled} # one of {submitted, cancelled}
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# unknown valid BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
'broker_details': details,
}:
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
if status == 'submitted':
msg = BrokerdStatus(**brokerd_msg)
log.info('Relaying existing open order:\n {brokerd_msg}')
# use backend request id as our ems id though this
# may end up with collisions?
broker = details['name']
# oid = f'{broker}-{reqid}'
oid = reqid
book._ems_entries[oid] = msg
# attempt to avoid collisions
msg.reqid = oid
resp = 'broker_submitted'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
else:
log.error(
f'Unknown status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
continue
# BrokerdFill # BrokerdFill
case { case {
'name': 'fill', 'name': 'fill',
@ -731,56 +767,14 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_filled' resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# unknown valid message case?
# case {
# 'name': name,
# 'symbol': sym,
# 'reqid': reqid, # brokerd generated order-request id
# # 'oid': oid, # ems order-dialog id
# 'broker_details': details,
# } if (
# book._ems2brokerd_ids.inverse.get(reqid) is None
# ):
# # TODO: pretty sure we can drop this now?
# # XXX: paper clearing special cases
# # paper engine race case: ``Client.submit_limit()`` hasn't
# # returned yet and provided an output reqid to register
# # locally, so we need to retreive the oid that was already
# # packed at submission since we already know it ahead of
# # time
# paper = details.get('paper_info')
# ext = details.get('external')
# if paper:
# # paperboi keeps the ems id up front
# oid = paper['oid']
# elif ext:
# # may be an order msg specified as "external" to the
# # piker ems flow (i.e. generated by some other
# # external broker backend client (like tws for ib)
# log.error(f"External trade event {name}@{ext}")
# else:
# # something is out of order, we don't have an oid for
# # this broker-side message.
# log.error(
# f'Unknown oid: {oid} for msg {name}:\n'
# f'{pformat(brokerd_msg)}\n'
# 'Unable to relay message to client side!?'
# )
# continue
case _: case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid') raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow # retrieve existing live flow
entry = book._ems_entries[oid] entry = book._ems_entries[oid]
assert entry.oid == oid
if getattr(entry, 'oid', None):
assert entry.oid == oid
old_reqid = entry.reqid old_reqid = entry.reqid
if old_reqid and old_reqid != reqid: if old_reqid and old_reqid != reqid:
log.warning( log.warning(
@ -803,7 +797,32 @@ async def translate_and_relay_brokerd_events(
) )
except KeyError: except KeyError:
log.error( log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}') f'Received `brokerd` msg for unknown client oid: {oid}')
else:
# existing open order relay
assert oid == entry.reqid
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the

View File

@ -194,6 +194,10 @@ class BrokerdStatus(Struct):
# } # }
status: str status: str
# +ve is buy, -ve is sell
size: float = 0.0
price: float = 0.0
filled: float = 0.0 filled: float = 0.0
reason: str = '' reason: str = ''
remaining: float = 0.0 remaining: float = 0.0

View File

@ -152,10 +152,7 @@ class OrderMode:
def line_from_order( def line_from_order(
self, self,
order: Order, order: Order,
symbol: Symbol,
**line_kwargs, **line_kwargs,
) -> LevelLine: ) -> LevelLine:
@ -173,8 +170,7 @@ class OrderMode:
color=self._colors[order.action], color=self._colors[order.action],
dotted=True if ( dotted=True if (
order.exec_mode == 'dark' and order.exec_mode == 'dark' and order.action != 'alert'
order.action != 'alert'
) else False, ) else False,
**line_kwargs, **line_kwargs,
@ -236,7 +232,6 @@ class OrderMode:
line = self.line_from_order( line = self.line_from_order(
order, order,
symbol,
show_markers=True, show_markers=True,
# just for the stage line to avoid # just for the stage line to avoid
@ -262,6 +257,8 @@ class OrderMode:
def submit_order( def submit_order(
self, self,
send_msg: bool = True,
order: Optional[Order] = None,
) -> OrderDialog: ) -> OrderDialog:
''' '''
@ -269,18 +266,19 @@ class OrderMode:
represent the order on a chart. represent the order on a chart.
''' '''
if not order:
staged = self._staged_order staged = self._staged_order
symbol: Symbol = staged.symbol
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# symbol: Symbol = staged.symbol
# format order data for ems # format order data for ems
order = staged.copy() order = staged.copy()
order.oid = oid order.oid = oid
order.symbol = symbol.front_fqsn()
order.symbol = order.symbol.front_fqsn()
line = self.line_from_order( line = self.line_from_order(
order, order,
symbol,
show_markers=True, show_markers=True,
only_show_markers_on_hover=True, only_show_markers_on_hover=True,
@ -298,17 +296,17 @@ class OrderMode:
# color once the submission ack arrives. # color once the submission ack arrives.
self.lines.submit_line( self.lines.submit_line(
line=line, line=line,
uuid=oid, uuid=order.oid,
) )
dialog = OrderDialog( dialog = OrderDialog(
uuid=oid, uuid=order.oid,
order=order, order=order,
symbol=symbol, symbol=order.symbol,
line=line, line=line,
last_status_close=self.multistatus.open_status( last_status_close=self.multistatus.open_status(
f'submitting {self._trigger_type}-{order.action}', f'submitting {order.exec_mode}-{order.action}',
final_msg=f'submitted {self._trigger_type}-{order.action}', final_msg=f'submitted {order.exec_mode}-{order.action}',
clear_on_next=True, clear_on_next=True,
) )
) )
@ -318,14 +316,21 @@ class OrderMode:
# enter submission which will be popped once a response # enter submission which will be popped once a response
# from the EMS is received to move the order to a different# status # from the EMS is received to move the order to a different# status
self.dialogs[oid] = dialog self.dialogs[order.oid] = dialog
# hook up mouse drag handlers # hook up mouse drag handlers
line._on_drag_start = self.order_line_modify_start line._on_drag_start = self.order_line_modify_start
line._on_drag_end = self.order_line_modify_complete line._on_drag_end = self.order_line_modify_complete
# send order cmd to ems # send order cmd to ems
if send_msg:
self.book.send(order) self.book.send(order)
else:
# just register for control over this order
# TODO: some kind of mini-perms system here based on
# an out-of-band tagging/auth sub-sys for multiplayer
# order control?
self.book._sent_orders[order.oid] = order
return dialog return dialog
@ -502,7 +507,7 @@ class OrderMode:
oid = dialog.uuid oid = dialog.uuid
cancel_status_close = self.multistatus.open_status( cancel_status_close = self.multistatus.open_status(
f'cancelling order {oid[:6]}', f'cancelling order {oid}',
group_key=key, group_key=key,
) )
dialog.last_status_close = cancel_status_close dialog.last_status_close = cancel_status_close
@ -596,10 +601,10 @@ async def open_order_mode(
sym = msg['symbol'] sym = msg['symbol']
if ( if (
sym == symkey or (sym == symkey) or (
# mega-UGH, i think we need to fix the FQSN stuff sooner # mega-UGH, i think we need to fix the FQSN
# then later.. # stuff sooner then later..
sym == symkey.removesuffix(f'.{broker}') sym == symkey.removesuffix(f'.{broker}'))
): ):
pps_by_account[acctid] = msg pps_by_account[acctid] = msg
@ -814,15 +819,48 @@ async def process_trades_and_update_ui(
continue continue
resp = msg['resp'] resp = msg['resp']
oid = msg['oid'] oid = str(msg['oid'])
dialog = mode.dialogs.get(oid) dialog = mode.dialogs.get(oid)
if dialog is None: if dialog is None:
log.warning(f'received msg for untracked dialog:\n{fmsg}') log.warning(f'received msg for untracked dialog:\n{fmsg}')
# TODO: enable pure tracking / mirroring of dialogs size = msg['brokerd_msg']['size']
# is desired. if size >= 0:
continue action = 'buy'
else:
action = 'sell'
acct = msg['brokerd_msg']['account']
price = msg['brokerd_msg']['price']
deats = msg['brokerd_msg']['broker_details']
fqsn = (
deats['fqsn'] + '.' + deats['name']
)
symbol = Symbol.from_fqsn(
fqsn=fqsn,
info={},
)
# map to order composite-type
order = Order(
action=action,
price=price,
account=acct,
size=size,
symbol=symbol,
brokers=symbol.brokers,
oid=oid,
exec_mode='live', # dark or live
)
dialog = mode.submit_order(
send_msg=False,
order=order,
)
# # TODO: enable pure tracking / mirroring of dialogs
# # is desired.
# continue
# record message to dialog tracking # record message to dialog tracking
dialog.msgs[oid] = msg dialog.msgs[oid] = msg