Factor msg loop into new func: `handle_order_updates()`

tractor_typed_msg_hackin
Tyler Goodlet 2022-07-05 11:48:10 -04:00
parent 9723d2737a
commit 4d19c0f910
1 changed files with 351 additions and 323 deletions

View File

@ -329,10 +329,41 @@ async def trades_dialogue(
ids, ids,
) )
# process and relay trades events to ems # enter relay loop
# https://docs.kraken.com/websockets/#message-ownTrades await handle_order_updates(
ws,
ems_stream,
emsflow,
ids,
trans,
acctid,
acc_name,
token,
)
async def handle_order_updates(
ws: NoBsWs,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
Main msg handling loop for all things order management.
This code is broken out to make the context explicit and state variables
defined in the signature clear to the reader.
'''
async for msg in stream_messages(ws): async for msg in stream_messages(ws):
match msg: match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
case [ case [
trades_msgs, trades_msgs,
'ownTrades', 'ownTrades',
@ -405,7 +436,7 @@ async def trades_dialogue(
t.bsuid for t in trans), t.bsuid for t in trans),
) )
# emit pp msgs # emit any new pp msgs to ems
for pos in filter( for pos in filter(
bool, bool,
chain(active.values(), closed.values()), chain(active.values(), closed.values()),
@ -432,19 +463,16 @@ async def trades_dialogue(
) )
await ems_stream.send(pp_msg.dict()) await ems_stream.send(pp_msg.dict())
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
case [ case [
order_msgs, order_msgs,
'openOrders', 'openOrders',
{'sequence': seq}, {'sequence': seq},
]: ]:
# TODO: async order update handling which we
# should remove from `handle_order_requests()`
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
for order_msg in order_msgs: for order_msg in order_msgs:
log.info( log.info(
'Order msg update_{seq}:\n' f'Order msg update_{seq}:\n'
f'{pformat(order_msg)}' f'{pformat(order_msg)}'
) )
txid, update_msg = list(order_msg.items())[0] txid, update_msg = list(order_msg.items())[0]