Support multiple client dialogues active on one brokerd trades dialogue

backup_asyncify_input_modes
Tyler Goodlet 2021-06-26 16:00:04 -04:00
parent d6d7c24320
commit 1edccf37d9
1 changed files with 52 additions and 34 deletions

View File

@ -282,6 +282,7 @@ class _Router(BaseModel):
books: dict[str, _DarkBook] = {} books: dict[str, _DarkBook] = {}
# order id to client stream map # order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {} dialogues: dict[str, list[tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors # brokername to trades-dialogues streams with ``brokerd`` actors
@ -425,8 +426,6 @@ async def open_brokerd_trades_dialogue(
# normalizing them to EMS messages and relaying back to # normalizing them to EMS messages and relaying back to
# the piker order client set. # the piker order client set.
# with brokerd_trades_stream.shield():
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_dialogue=brokerd_trades_stream,
positions=positions, positions=positions,
@ -450,12 +449,8 @@ async def open_brokerd_trades_dialogue(
# the ``brokerd`` task either dies or is cancelled # the ``brokerd`` task either dies or is cancelled
finally: finally:
# context must have been closed # parent context must have been closed
# remove from cache so next client will respawn if needed # remove from cache so next client will respawn if needed
# print('BROKERD DIALOGUE KILLED!!?!?!')
# with trio.CancelScope(shield=True):
# await tractor.breakpoint()
# raise
_router.relays.pop(broker) _router.relays.pop(broker)
@ -521,13 +516,12 @@ async def translate_and_relay_brokerd_events(
pos_msg = BrokerdPosition(**brokerd_msg).dict() pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd`` # keep up to date locally in ``emsd``
relay.positions.update(pos_msg) relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by # relay through position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
for oid, ems_client_order_stream in router.dialogues.items(): for client_stream in router.clients:
await client_stream.send(pos_msg)
await ems_client_order_stream.send(pos_msg)
continue continue
@ -682,6 +676,7 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message # Create and relay response status message
# to requesting EMS client # to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send( await ems_client_order_stream.send(
Status( Status(
@ -692,6 +687,9 @@ async def translate_and_relay_brokerd_events(
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() ).dict()
) )
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
@ -712,6 +710,8 @@ async def process_client_order_cmds(
) -> None: ) -> None:
client_dialogues = router.dialogues
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
@ -720,14 +720,16 @@ async def process_client_order_cmds(
action = cmd['action'] action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id # register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
router.dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid) live_entry = dark_book._ems_entries.get(oid)
@ -1027,6 +1029,8 @@ async def _emsd_main(
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
try: try:
_router.clients.add(ems_client_order_stream)
await process_client_order_cmds( await process_client_order_cmds(
ems_client_order_stream, ems_client_order_stream,
@ -1039,12 +1043,26 @@ async def _emsd_main(
dark_book, dark_book,
_router, _router,
) )
finally: finally:
pass # remove client from "registry"
# for oid, client_stream in _router.dialogs.copy().items(): _router.clients.remove(ems_client_order_stream)
# if client_stream is ems_client_order_stream:
# # TODO: we need a placeholder for sending dialogues = _router.dialogues
# # the updates to an alert system inside
# # ``emsd`` ?? for oid, client_stream in dialogues.items():
# print(f'popping order for stream {oid}')
# _router.dialogs.pop(oid) if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.).