First draft: relay open orders through ems and display on chart

dict_differ
Tyler Goodlet 2022-08-05 14:51:15 -04:00
parent 02980282cd
commit 0d332427e2
4 changed files with 170 additions and 105 deletions

View File

@ -149,10 +149,14 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
sym = cmd.symbol
msg = pformat(cmd)
if sym == symbol_key:
log.info(f'Send order cmd:\n{msg}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)
else:
log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}')
@acm

View File

@ -188,9 +188,9 @@ async def clear_dark_triggers(
tuple(execs.items())
):
if (
not pred or
ttype not in tf or
not pred(price)
not pred
or ttype not in tf
or not pred(price)
):
# log.runtime(
# f'skipping quote for {sym} '
@ -345,7 +345,7 @@ class Router(Struct):
already exists.
'''
relay = self.relays.get(feed.mod.name)
relay: TradesRelay = self.relays.get(feed.mod.name)
if (
relay is None
@ -452,7 +452,6 @@ async def open_brokerd_trades_dialogue(
async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
@ -718,6 +717,43 @@ async def translate_and_relay_brokerd_events(
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
# unknown valid BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
'broker_details': details,
}:
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
if status == 'submitted':
msg = BrokerdStatus(**brokerd_msg)
log.info('Relaying existing open order:\n {brokerd_msg}')
# use backend request id as our ems id though this
# may end up with collisions?
broker = details['name']
# oid = f'{broker}-{reqid}'
oid = reqid
book._ems_entries[oid] = msg
# attempt to avoid collisions
msg.reqid = oid
resp = 'broker_submitted'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
else:
log.error(
f'Unknown status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
continue
# BrokerdFill
case {
'name': 'fill',
@ -731,56 +767,14 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# unknown valid message case?
# case {
# 'name': name,
# 'symbol': sym,
# 'reqid': reqid, # brokerd generated order-request id
# # 'oid': oid, # ems order-dialog id
# 'broker_details': details,
# } if (
# book._ems2brokerd_ids.inverse.get(reqid) is None
# ):
# # TODO: pretty sure we can drop this now?
# # XXX: paper clearing special cases
# # paper engine race case: ``Client.submit_limit()`` hasn't
# # returned yet and provided an output reqid to register
# # locally, so we need to retreive the oid that was already
# # packed at submission since we already know it ahead of
# # time
# paper = details.get('paper_info')
# ext = details.get('external')
# if paper:
# # paperboi keeps the ems id up front
# oid = paper['oid']
# elif ext:
# # may be an order msg specified as "external" to the
# # piker ems flow (i.e. generated by some other
# # external broker backend client (like tws for ib)
# log.error(f"External trade event {name}@{ext}")
# else:
# # something is out of order, we don't have an oid for
# # this broker-side message.
# log.error(
# f'Unknown oid: {oid} for msg {name}:\n'
# f'{pformat(brokerd_msg)}\n'
# 'Unable to relay message to client side!?'
# )
# continue
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
entry = book._ems_entries[oid]
assert entry.oid == oid
if getattr(entry, 'oid', None):
assert entry.oid == oid
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
@ -803,7 +797,32 @@ async def translate_and_relay_brokerd_events(
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
f'Received `brokerd` msg for unknown client oid: {oid}')
else:
# existing open order relay
assert oid == entry.reqid
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the

View File

@ -194,6 +194,10 @@ class BrokerdStatus(Struct):
# }
status: str
# +ve is buy, -ve is sell
size: float = 0.0
price: float = 0.0
filled: float = 0.0
reason: str = ''
remaining: float = 0.0

View File

@ -152,10 +152,7 @@ class OrderMode:
def line_from_order(
self,
order: Order,
symbol: Symbol,
**line_kwargs,
) -> LevelLine:
@ -173,8 +170,7 @@ class OrderMode:
color=self._colors[order.action],
dotted=True if (
order.exec_mode == 'dark' and
order.action != 'alert'
order.exec_mode == 'dark' and order.action != 'alert'
) else False,
**line_kwargs,
@ -236,7 +232,6 @@ class OrderMode:
line = self.line_from_order(
order,
symbol,
show_markers=True,
# just for the stage line to avoid
@ -262,6 +257,8 @@ class OrderMode:
def submit_order(
self,
send_msg: bool = True,
order: Optional[Order] = None,
) -> OrderDialog:
'''
@ -269,18 +266,19 @@ class OrderMode:
represent the order on a chart.
'''
if not order:
staged = self._staged_order
symbol: Symbol = staged.symbol
oid = str(uuid.uuid4())
# symbol: Symbol = staged.symbol
# format order data for ems
order = staged.copy()
order.oid = oid
order.symbol = symbol.front_fqsn()
order.symbol = order.symbol.front_fqsn()
line = self.line_from_order(
order,
symbol,
show_markers=True,
only_show_markers_on_hover=True,
@ -298,17 +296,17 @@ class OrderMode:
# color once the submission ack arrives.
self.lines.submit_line(
line=line,
uuid=oid,
uuid=order.oid,
)
dialog = OrderDialog(
uuid=oid,
uuid=order.oid,
order=order,
symbol=symbol,
symbol=order.symbol,
line=line,
last_status_close=self.multistatus.open_status(
f'submitting {self._trigger_type}-{order.action}',
final_msg=f'submitted {self._trigger_type}-{order.action}',
f'submitting {order.exec_mode}-{order.action}',
final_msg=f'submitted {order.exec_mode}-{order.action}',
clear_on_next=True,
)
)
@ -318,14 +316,21 @@ class OrderMode:
# enter submission which will be popped once a response
# from the EMS is received to move the order to a different# status
self.dialogs[oid] = dialog
self.dialogs[order.oid] = dialog
# hook up mouse drag handlers
line._on_drag_start = self.order_line_modify_start
line._on_drag_end = self.order_line_modify_complete
# send order cmd to ems
if send_msg:
self.book.send(order)
else:
# just register for control over this order
# TODO: some kind of mini-perms system here based on
# an out-of-band tagging/auth sub-sys for multiplayer
# order control?
self.book._sent_orders[order.oid] = order
return dialog
@ -502,7 +507,7 @@ class OrderMode:
oid = dialog.uuid
cancel_status_close = self.multistatus.open_status(
f'cancelling order {oid[:6]}',
f'cancelling order {oid}',
group_key=key,
)
dialog.last_status_close = cancel_status_close
@ -596,10 +601,10 @@ async def open_order_mode(
sym = msg['symbol']
if (
sym == symkey or
# mega-UGH, i think we need to fix the FQSN stuff sooner
# then later..
sym == symkey.removesuffix(f'.{broker}')
(sym == symkey) or (
# mega-UGH, i think we need to fix the FQSN
# stuff sooner then later..
sym == symkey.removesuffix(f'.{broker}'))
):
pps_by_account[acctid] = msg
@ -653,7 +658,7 @@ async def open_order_mode(
# setup order mode sidepane widgets
form: FieldsForm = chart.sidepane
form.vbox.setSpacing(
int((1 + 5/8)*_font.px_size)
int((1 + 5 / 8) * _font.px_size)
)
from ._feedstatus import mk_feed_label
@ -814,15 +819,48 @@ async def process_trades_and_update_ui(
continue
resp = msg['resp']
oid = msg['oid']
oid = str(msg['oid'])
dialog = mode.dialogs.get(oid)
if dialog is None:
log.warning(f'received msg for untracked dialog:\n{fmsg}')
# TODO: enable pure tracking / mirroring of dialogs
# is desired.
continue
size = msg['brokerd_msg']['size']
if size >= 0:
action = 'buy'
else:
action = 'sell'
acct = msg['brokerd_msg']['account']
price = msg['brokerd_msg']['price']
deats = msg['brokerd_msg']['broker_details']
fqsn = (
deats['fqsn'] + '.' + deats['name']
)
symbol = Symbol.from_fqsn(
fqsn=fqsn,
info={},
)
# map to order composite-type
order = Order(
action=action,
price=price,
account=acct,
size=size,
symbol=symbol,
brokers=symbol.brokers,
oid=oid,
exec_mode='live', # dark or live
)
dialog = mode.submit_order(
send_msg=False,
order=order,
)
# # TODO: enable pure tracking / mirroring of dialogs
# # is desired.
# continue
# record message to dialog tracking
dialog.msgs[oid] = msg