Start brokerd relay loop after opening client stream

In order to avoid missed existing order message emissions on startup we
need to be sure the client side stream is registered with the router
first. So break out the starting of the
`translate_and_relay_brokerd_events()` task until inside the client
stream block and start the task using the dark clearing loop nursery.

Also, ensure `oid` (and thus for `ib` the equivalent re-used `reqid`)
are cast to `str` before registering the dark book. Deliver the dark
book entries as part of the `_emsd_main()` context `.started()` values.
open_order_loading
Tyler Goodlet 2022-08-05 18:29:40 -04:00
parent 1510383738
commit e34ea94f9f
1 changed files with 47 additions and 34 deletions

View File

@ -501,14 +501,9 @@ async def open_brokerd_trades_dialogue(
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally:
# parent context must have been closed
@ -567,8 +562,8 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
book: _DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
@ -731,14 +726,16 @@ async def translate_and_relay_brokerd_events(
if status == 'submitted':
msg = BrokerdStatus(**brokerd_msg)
log.info('Relaying existing open order:\n {brokerd_msg}')
log.info(
f'Relaying existing open order:\n {brokerd_msg}'
)
# use backend request id as our ems id though this
# may end up with collisions?
broker = details['name']
# oid = f'{broker}-{reqid}'
oid = reqid
oid = str(reqid)
book._ems_entries[oid] = msg
# attempt to avoid collisions
msg.reqid = oid
resp = 'broker_submitted'
@ -848,7 +845,9 @@ async def process_client_order_cmds(
async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}')
oid = cmd['oid']
# CAWT DAMN we need struct support!
oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
@ -892,7 +891,7 @@ async def process_client_order_cmds(
'action': 'cancel',
'oid': oid,
} if not live_entry:
try:
# try:
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
@ -908,8 +907,8 @@ async def process_client_order_cmds(
# de-register this client dialogue
router.dialogues.pop(oid)
except KeyError:
log.exception(f'No dark order for {symbol}?')
# except KeyError:
# log.exception(f'No dark order for {symbol}?')
# live order submission
case {
@ -932,8 +931,11 @@ async def process_client_order_cmds(
sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None:
# sanity check on emsd id
assert live_entry.oid == oid
# sanity check on emsd id, but it won't work
# for pre-existing orders that we load since
# the only msg will be a ``BrokerdStatus``
# assert live_entry.oid == oid
reqid = live_entry.reqid
# if we already had a broker order id then
# this is likely an order update commmand.
@ -1118,10 +1120,9 @@ async def _emsd_main(
):
# XXX: this should be initial price quote from target provider
first_quote = feed.first_quotes[fqsn]
book = _router.get_dark_book(broker)
book.lasts[fqsn] = first_quote['last']
first_quote: dict = feed.first_quotes[fqsn]
book: _DarkBook = _router.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']
# open a stream with the brokerd backend for order
# flow dialogue
@ -1148,12 +1149,25 @@ async def _emsd_main(
await ems_ctx.started((
relay.positions,
list(relay.accounts),
book._ems_entries,
))
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream:
# register the client side before startingn the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream)
n.start_soon(
translate_and_relay_brokerd_events,
broker,
brokerd_stream,
_router,
)
# trigger scan and exec loop
n.start_soon(
clear_dark_triggers,
@ -1168,7 +1182,6 @@ async def _emsd_main(
# start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
# main entrypoint, run here until cancelled.
await process_client_order_cmds(