Start brokerd relay loop after opening client stream

In order to avoid missed existing order message emissions on startup we
need to be sure the client side stream is registered with the router
first. So break out the starting of the
`translate_and_relay_brokerd_events()` task until inside the client
stream block and start the task using the dark clearing loop nursery.

Also, ensure `oid` (and thus for `ib` the equivalent re-used `reqid`)
are cast to `str` before registering the dark book. Deliver the dark
book entries as part of the `_emsd_main()` context `.started()` values.
dict_differ
Tyler Goodlet 2022-08-05 18:29:40 -04:00
parent 334f512ad3
commit 0bd8f2bcd9
1 changed files with 47 additions and 34 deletions

View File

@ -501,14 +501,9 @@ async def open_brokerd_trades_dialogue(
task_status.started(relay) task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until # this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled # the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally: finally:
# parent context must have been closed # parent context must have been closed
@ -567,8 +562,8 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
''' '''
book = router.get_dark_book(broker) book: _DarkBook = router.get_dark_book(broker)
relay = router.relays[broker] relay: TradesRelay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream assert relay.brokerd_dialogue == brokerd_trades_stream
@ -731,14 +726,16 @@ async def translate_and_relay_brokerd_events(
if status == 'submitted': if status == 'submitted':
msg = BrokerdStatus(**brokerd_msg) msg = BrokerdStatus(**brokerd_msg)
log.info('Relaying existing open order:\n {brokerd_msg}') log.info(
f'Relaying existing open order:\n {brokerd_msg}'
)
# use backend request id as our ems id though this # use backend request id as our ems id though this
# may end up with collisions? # may end up with collisions?
broker = details['name'] broker = details['name']
# oid = f'{broker}-{reqid}' oid = str(reqid)
oid = reqid
book._ems_entries[oid] = msg book._ems_entries[oid] = msg
# attempt to avoid collisions # attempt to avoid collisions
msg.reqid = oid msg.reqid = oid
resp = 'broker_submitted' resp = 'broker_submitted'
@ -848,7 +845,9 @@ async def process_client_order_cmds(
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(f'Received order cmd:\n{pformat(cmd)}')
oid = cmd['oid'] # CAWT DAMN we need struct support!
oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id # register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
@ -892,24 +891,24 @@ async def process_client_order_cmds(
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if not live_entry: } if not live_entry:
try: # try:
# remove from dark book clearing # remove from dark book clearing
dark_book.orders[symbol].pop(oid, None) dark_book.orders[symbol].pop(oid, None)
# tell client side that we've cancelled the # tell client side that we've cancelled the
# dark-trigger order # dark-trigger order
await client_order_stream.send( await client_order_stream.send(
Status( Status(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
)
) )
# de-register this client dialogue )
router.dialogues.pop(oid) # de-register this client dialogue
router.dialogues.pop(oid)
except KeyError: # except KeyError:
log.exception(f'No dark order for {symbol}?') # log.exception(f'No dark order for {symbol}?')
# live order submission # live order submission
case { case {
@ -932,8 +931,11 @@ async def process_client_order_cmds(
sym = fqsn.replace(f'.{broker}', '') sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None: if live_entry is not None:
# sanity check on emsd id # sanity check on emsd id, but it won't work
assert live_entry.oid == oid # for pre-existing orders that we load since
# the only msg will be a ``BrokerdStatus``
# assert live_entry.oid == oid
reqid = live_entry.reqid reqid = live_entry.reqid
# if we already had a broker order id then # if we already had a broker order id then
# this is likely an order update commmand. # this is likely an order update commmand.
@ -1118,10 +1120,9 @@ async def _emsd_main(
): ):
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
first_quote = feed.first_quotes[fqsn] first_quote: dict = feed.first_quotes[fqsn]
book: _DarkBook = _router.get_dark_book(broker)
book = _router.get_dark_book(broker) book.lasts[fqsn]: float = first_quote['last']
book.lasts[fqsn] = first_quote['last']
# open a stream with the brokerd backend for order # open a stream with the brokerd backend for order
# flow dialogue # flow dialogue
@ -1148,12 +1149,25 @@ async def _emsd_main(
await ems_ctx.started(( await ems_ctx.started((
relay.positions, relay.positions,
list(relay.accounts), list(relay.accounts),
book._ems_entries,
)) ))
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream: async with ems_ctx.open_stream() as ems_client_order_stream:
# register the client side before startingn the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream)
n.start_soon(
translate_and_relay_brokerd_events,
broker,
brokerd_stream,
_router,
)
# trigger scan and exec loop # trigger scan and exec loop
n.start_soon( n.start_soon(
clear_dark_triggers, clear_dark_triggers,
@ -1168,7 +1182,6 @@ async def _emsd_main(
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
try: try:
_router.clients.add(ems_client_order_stream)
# main entrypoint, run here until cancelled. # main entrypoint, run here until cancelled.
await process_client_order_cmds( await process_client_order_cmds(