Factor msg loop into new func: `handle_order_updates()`

kraken_ws_orders
Tyler Goodlet 2022-07-05 11:48:10 -04:00
parent 84cab1327d
commit d9b4c4a413
1 changed files with 351 additions and 323 deletions

View File

@ -328,355 +328,383 @@ async def trades_dialogue(
ids, ids,
) )
# process and relay trades events to ems # enter relay loop
await handle_order_updates(
ws,
ems_stream,
emsflow,
ids,
trans,
acctid,
acc_name,
token,
)
async def handle_order_updates(
ws: NoBsWs,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
Main msg handling loop for all things order management.
This code is broken out to make the context explicit and state variables
defined in the signature clear to the reader.
'''
async for msg in stream_messages(ws):
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades # https://docs.kraken.com/websockets/#message-ownTrades
async for msg in stream_messages(ws): case [
match msg: trades_msgs,
case [ 'ownTrades',
trades_msgs, {'sequence': seq},
'ownTrades', ]:
{'sequence': seq}, # flatten msgs for processing
]: trades = {
# flatten msgs for processing tid: trade
trades = { for entry in trades_msgs
tid: trade for (tid, trade) in entry.items()
for entry in trades_msgs
for (tid, trade) in entry.items()
# only emit entries which are already not-in-ledger # only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans} if tid not in {r.tid for r in trans}
} }
for tid, trade in trades.items(): for tid, trade in trades.items():
# parse-cast # parse-cast
reqid = trade['ordertxid'] reqid = trade['ordertxid']
action = trade['type'] action = trade['type']
price = float(trade['price']) price = float(trade['price'])
size = float(trade['vol']) size = float(trade['vol'])
broker_time = float(trade['time']) broker_time = float(trade['time'])
# send a fill msg for gui update # send a fill msg for gui update
fill_msg = BrokerdFill( fill_msg = BrokerdFill(
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
action=action, action=action,
size=size, size=size,
price=price, price=price,
# TODO: maybe capture more msg data # TODO: maybe capture more msg data
# i.e fees? # i.e fees?
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=broker_time broker_time=broker_time
) )
await ems_stream.send(fill_msg) await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
account=acc_name, account=acc_name,
status='filled', status='filled',
filled=size, filled=size,
reason='Order filled by kraken', reason='Order filled by kraken',
broker_details={ broker_details={
'name': 'kraken', 'name': 'kraken',
'broker_time': broker_time 'broker_time': broker_time
}, },
# TODO: figure out if kraken gives a count # TODO: figure out if kraken gives a count
# of how many units of underlying were # of how many units of underlying were
# filled. Alternatively we can decrement # filled. Alternatively we can decrement
# this value ourselves by associating and # this value ourselves by associating and
# calcing from the diff with the original # calcing from the diff with the original
# client-side request, see: # client-side request, see:
# https://github.com/pikers/piker/issues/296 # https://github.com/pikers/piker/issues/296
remaining=0, remaining=0,
) )
await ems_stream.send(filled_msg) await ems_stream.send(filled_msg)
# update ledger and position tracking # update ledger and position tracking
trans = await update_ledger(acctid, trades) trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf( active, closed = pp.update_pps_conf(
'kraken', 'kraken',
acctid, acctid,
trade_records=trans, trade_records=trans,
ledger_reload={}.fromkeys( ledger_reload={}.fromkeys(
t.bsuid for t in trans), t.bsuid for t in trans),
) )
# emit pp msgs # emit any new pp msgs to ems
for pos in filter( for pos in filter(
bool, bool,
chain(active.values(), closed.values()), chain(active.values(), closed.values()),
): ):
pp_msg = BrokerdPosition( pp_msg = BrokerdPosition(
broker='kraken', broker='kraken',
# XXX: ok so this is annoying, we're # XXX: ok so this is annoying, we're
# relaying an account name with the # relaying an account name with the
# backend suffix prefixed but when # backend suffix prefixed but when
# reading accounts from ledgers we # reading accounts from ledgers we
# don't need it and/or it's prefixed # don't need it and/or it's prefixed
# in the section table.. we should # in the section table.. we should
# just strip this from the message # just strip this from the message
# right since `.broker` is already # right since `.broker` is already
# included? # included?
account=f'kraken.{acctid}', account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(), symbol=pos.symbol.front_fqsn(),
size=pos.size, size=pos.size,
avg_price=pos.be_price, avg_price=pos.be_price,
# TODO # TODO
# currency='' # currency=''
) )
await ems_stream.send(pp_msg) await ems_stream.send(pp_msg)
case [ # process and relay order state change events
order_msgs, # https://docs.kraken.com/websockets/#message-openOrders
'openOrders', case [
{'sequence': seq}, order_msgs,
]: 'openOrders',
# TODO: async order update handling which we {'sequence': seq},
# should remove from `handle_order_requests()` ]:
# above: for order_msg in order_msgs:
# https://github.com/pikers/piker/issues/293 log.info(
# https://github.com/pikers/piker/issues/310 f'Order msg update_{seq}:\n'
for order_msg in order_msgs: f'{pformat(order_msg)}'
log.info( )
'Order msg update_{seq}:\n' txid, update_msg = list(order_msg.items())[0]
f'{pformat(order_msg)}' match update_msg:
) case {
txid, update_msg = list(order_msg.items())[0] 'cancel_reason': 'Order replaced',
match update_msg: 'status': status,
case { 'userref': reqid,
'cancel_reason': 'Order replaced', **rest,
'status': status, }:
'userref': reqid, # we ignore internal order updates
**rest, # triggered by kraken's "edit"
}: # endpoint.
# we ignore internal order updates continue
# triggered by kraken's "edit"
# endpoint.
continue
case { case {
'status': status, 'status': status,
'userref': reqid, 'userref': reqid,
**rest, **rest,
# XXX: eg. of remaining msg schema: # XXX: eg. of remaining msg schema:
# 'avg_price': _, # 'avg_price': _,
# 'cost': _, # 'cost': _,
# 'descr': { # 'descr': {
# 'close': None, # 'close': None,
# 'leverage': None, # 'leverage': None,
# 'order': descr, # 'order': descr,
# 'ordertype': 'limit', # 'ordertype': 'limit',
# 'pair': 'XMR/EUR', # 'pair': 'XMR/EUR',
# 'price': '74.94000000', # 'price': '74.94000000',
# 'price2': '0.00000000', # 'price2': '0.00000000',
# 'type': 'buy' # 'type': 'buy'
# }, # },
# 'expiretm': None, # 'expiretm': None,
# 'fee': '0.00000000', # 'fee': '0.00000000',
# 'limitprice': '0.00000000', # 'limitprice': '0.00000000',
# 'misc': '', # 'misc': '',
# 'oflags': 'fciq', # 'oflags': 'fciq',
# 'opentm': '1656966131.337344', # 'opentm': '1656966131.337344',
# 'refid': None, # 'refid': None,
# 'starttm': None, # 'starttm': None,
# 'stopprice': '0.00000000', # 'stopprice': '0.00000000',
# 'timeinforce': 'GTC', # 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854', # 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm, # 0.0000 # 'vol_exec': exec_vlm, # 0.0000
}: }:
ems_status = { ems_status = {
'open': 'submitted', 'open': 'submitted',
'closed': 'cancelled', 'closed': 'cancelled',
'canceled': 'cancelled', 'canceled': 'cancelled',
# do we even need to forward # do we even need to forward
# this state to the ems? # this state to the ems?
'pending': 'pending', 'pending': 'pending',
}[status] }[status]
submit_vlm = rest.get('vol', 0) submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0) exec_vlm = rest.get('vol_exec', 0)
oid = ids.inverse[reqid] oid = ids.inverse[reqid]
msgs = emsflow[oid] msgs = emsflow[oid]
# send BrokerdStatus messages for all # send BrokerdStatus messages for all
# order state updates # order state updates
resp = BrokerdStatus(
reqid=txid,
time_ns=time.time_ns(), # cuz why not
account=f'kraken.{acctid}',
# everyone doin camel case..
status=ems_status, # force lower case
filled=exec_vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(
'Unknown orders msg:\n'
f'{txid}:{order_msg}'
)
case {
'event': etype,
'status': status,
'errorMessage': errmsg,
'reqid': reqid,
} if (
etype in {'addOrderStatus', 'editOrderStatus'}
and status == 'error'
):
log.error(
f'Failed to submit/edit order {reqid}:\n'
f'{errmsg}'
)
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError(
oid=oid,
# use old reqid in case it changed?
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed submit:\n{errmsg}',
broker_details=resp
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# if we rx any error cancel the order again
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [last.reqid], # txid from submission
})
case {
'event': 'addOrderStatus',
'status': status,
'reqid': reqid, # oid from ems side
# NOTE: in the case of an edit request this is
# a new value!
'txid': txid,
'descr': descr, # only on success?
# 'originaltxid': txid, # only on edits
# **rest,
}:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case {
'event': 'editOrderStatus',
'status': status,
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# successful cancellation
case {
"event": "cancelOrderStatus",
"status": "ok",
'txid': txids,
'reqid': reqid,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
for txid in txids:
resp = BrokerdStatus( resp = BrokerdStatus(
reqid=txid, reqid=txid,
account=last.account, time_ns=time.time_ns(), # cuz why not
time_ns=time.time_ns(), account=f'kraken.{acctid}',
status='cancelled',
reason='Cancel success: {oid}@{txid}', # everyone doin camel case..
broker_details=resp, status=ems_status, # force lower case
filled=exec_vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
) )
msgs.append(resp) msgs.append(resp)
await ems_stream.send(resp.dict()) await ems_stream.send(resp.dict())
# failed cancel case _:
case { log.warning(
"event": "cancelOrderStatus", 'Unknown orders msg:\n'
"status": "error", f'{txid}:{order_msg}'
"errorMessage": errmsg, )
'reqid': reqid,
}:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError( case {
oid=oid, 'event': etype,
reqid=last.reqid, 'status': status,
symbol=last.symbol, 'errorMessage': errmsg,
reason=f'Failed order cancel {errmsg}', 'reqid': reqid,
broker_details=resp } if (
) etype in {'addOrderStatus', 'editOrderStatus'}
msgs.append(resp) and status == 'error'
await ems_stream.send(resp.dict()) ):
log.error(
f'Failed to submit/edit order {reqid}:\n'
f'{errmsg}'
)
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError(
oid=oid,
# use old reqid in case it changed?
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed submit:\n{errmsg}',
broker_details=resp
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case _: # if we rx any error cancel the order again
log.warning(f'Unhandled trades msg: {msg}') await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [last.reqid], # txid from submission
})
case {
'event': 'addOrderStatus',
'status': status,
'reqid': reqid, # oid from ems side
# NOTE: in the case of an edit request this is
# a new value!
'txid': txid,
'descr': descr, # only on success?
# 'originaltxid': txid, # only on edits
# **rest,
}:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case {
'event': 'editOrderStatus',
'status': status,
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=txid, # kraken unique order id
account=last.account, # piker account
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# successful cancellation
case {
"event": "cancelOrderStatus",
"status": "ok",
'txid': txids,
'reqid': reqid,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
for txid in txids:
resp = BrokerdStatus(
reqid=txid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=resp,
)
msgs.append(resp)
await ems_stream.send(resp.dict())
# failed cancel
case {
"event": "cancelOrderStatus",
"status": "error",
"errorMessage": errmsg,
'reqid': reqid,
}:
oid = ids.inverse[reqid]
msgs = emsflow[oid]
last = msgs[-1]
resp = BrokerdError(
oid=oid,
reqid=last.reqid,
symbol=last.symbol,
reason=f'Failed order cancel {errmsg}',
broker_details=resp
)
msgs.append(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(f'Unhandled trades msg: {msg}')
def norm_trade_records( def norm_trade_records(