Handle too-fast-edits, add `ChainMap` msg tracing

Since our ems doesn't actually do blocking style client-side submission
updates, thus resulting in the client being able to update an existing
order's state before knowing its current state, we can run into race
conditions where for some backends an order is updated using the wrong
order id. For kraken we manually implement detecting this race (lol, for
now anyway) such that when a new client side edit comes in before the
new `txid` is known, we simply expect the handler loop to cancel the
order. Further this adds cancellation on arbitrary status errors, like
rate limits.

Also this adds 2 leg (ems <-> brokerd <-> kraken) msg tracing using
a `collections.ChainMap` which is likely going to end up being the POC
for a more general data structure recommended for backends that need to
trace msg flow for translation with the ems.
kraken_ws_orders
Tyler Goodlet 2022-07-13 16:08:47 -04:00
parent 5b135fad61
commit 2386270cad
1 changed files with 100 additions and 43 deletions

View File

@ -18,12 +18,14 @@
Order api and machinery Order api and machinery
''' '''
from collections import ChainMap, defaultdict
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
contextmanager as cm, contextmanager as cm,
) )
from functools import partial from functools import partial
from itertools import chain, count import itertools
from itertools import count
from pprint import pformat from pprint import pformat
import time import time
from typing import ( from typing import (
@ -73,13 +75,17 @@ MsgUnion = Union[
] ]
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
async def handle_order_requests( async def handle_order_requests(
ws: NoBsWs, ws: NoBsWs,
client: Client, client: Client,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
token: str, token: str,
emsflow: dict[str, list[MsgUnion]], apiflows: dict[int, ChainMap[dict[str, dict]]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str], reqids2txids: dict[int, str],
@ -101,7 +107,6 @@ async def handle_order_requests(
'action': 'cancel', 'action': 'cancel',
}: }:
cancel = BrokerdCancel(**msg) cancel = BrokerdCancel(**msg)
# last = emsflow[cancel.oid]
reqid = ids[cancel.oid] reqid = ids[cancel.oid]
txid = reqids2txids[reqid] txid = reqids2txids[reqid]
@ -126,7 +131,21 @@ async def handle_order_requests(
if order.oid in ids: if order.oid in ids:
ep = 'editOrder' ep = 'editOrder'
reqid = ids[order.oid] # integer not txid reqid = ids[order.oid] # integer not txid
txid = reqids2txids[reqid] try:
txid = reqids2txids.pop(reqid)
except KeyError:
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid:{reqid}, cancelling..'
),
)
)
extra = { extra = {
'orderid': txid, # txid 'orderid': txid, # txid
} }
@ -176,7 +195,7 @@ async def handle_order_requests(
await ws.send_msg(req) await ws.send_msg(req)
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order) apiflows[reqid].maps.append(msg)
case _: case _:
account = msg.get('account') account = msg.get('account')
@ -328,10 +347,10 @@ async def trades_dialogue(
aclosing(stream_messages(ws)) as stream, aclosing(stream_messages(ws)) as stream,
): ):
# task local msg dialog tracking # task local msg dialog tracking
emsflow: dict[ apiflows: defaultdict[
str, int,
list[MsgUnion], ChainMap[dict[str, dict]],
] = {} ] = defaultdict(ChainMap)
# 2way map for ems ids to kraken int reqids.. # 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict() ids: bidict[str, int] = bidict()
@ -344,7 +363,7 @@ async def trades_dialogue(
client, client,
ems_stream, ems_stream,
token, token,
emsflow, apiflows,
ids, ids,
reqids2txids, reqids2txids,
) )
@ -354,7 +373,7 @@ async def trades_dialogue(
ws, ws,
stream, stream,
ems_stream, ems_stream,
emsflow, apiflows,
ids, ids,
reqids2txids, reqids2txids,
trans, trans,
@ -368,7 +387,7 @@ async def handle_order_updates(
ws: NoBsWs, ws: NoBsWs,
ws_stream: AsyncIterator, ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]], apiflows: dict[int, ChainMap[dict[str, dict]]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: bidict[int, str], reqids2txids: bidict[int, str],
trans: set[pp.Transaction], trans: set[pp.Transaction],
@ -478,7 +497,6 @@ async def handle_order_updates(
continue continue
# update ledger and position tracking # update ledger and position tracking
await tractor.breakpoint()
trans: set[pp.Transaction] trans: set[pp.Transaction]
with open_ledger( with open_ledger(
acctid, acctid,
@ -500,7 +518,7 @@ async def handle_order_updates(
# emit any new pp msgs to ems # emit any new pp msgs to ems
for pos in filter( for pos in filter(
bool, bool,
chain(active.values(), closed.values()), itertools.chain(active.values(), closed.values()),
): ):
pp_msg = BrokerdPosition( pp_msg = BrokerdPosition(
broker='kraken', broker='kraken',
@ -611,16 +629,40 @@ async def handle_order_updates(
if ( if (
status == 'open' status == 'open'
and oid is None # a non-ems-active order and (
):
# TODO: handle these and relay them # TOO fast edit handled by the
# through the EMS to the client / UI # request handler task.
# side! (toofast := isinstance(
log.warning( reqids2txids.get(reqid),
f'Received active order {txid}:\n' TooFastEdit
f'{update_msg}\n' ))
'Cancelling order for now!..'
# pre-existing open order NOT from
# this EMS session.
or (noid := oid is None)
) )
):
if toofast:
# TODO: don't even allow this case
# by not moving the client side line
# until an edit confirmation
# arrives...
log.warning(
f'Received too fast edit {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
elif noid: # a non-ems-active order
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Rx unknown active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel: # call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder # https://docs.kraken.com/websockets/#message-cancelOrder
@ -660,10 +702,7 @@ async def handle_order_updates(
), ),
) )
# TODO: use collections.ChainMap here apiflows[reqid].maps.append(update_msg)
# msgs = emsflow[oid]
# msgs.append(resp)
await ems_stream.send(resp) await ems_stream.send(resp)
# fill event. # fill event.
@ -735,28 +774,47 @@ async def handle_order_updates(
) )
continue continue
lasttxid = reqids2txids.get(reqid)
txid = rest.get('txid') txid = rest.get('txid')
if txid:
# XXX: we **must** do this mapping for edit order
# status updates since the `openOrders` sub above
# never relays back the correct client-side `reqid`
# that is put in the order request..
reqids2txids[reqid] = txid
msgs = emsflow[oid] # update the msg chain
last = msgs[-1] chain = apiflows[reqid]
chain.maps.append(event)
# pretxid = chain['txid']
# print(f'pretxid: {pretxid}')
resps, errored = process_status( resps, errored = process_status(
event, event,
oid, oid,
token, token,
msgs, chain,
last,
) )
if resps: if resps:
msgs.extend(resps)
for resp in resps: for resp in resps:
await ems_stream.send(resp) await ems_stream.send(resp)
if txid:
if (
isinstance(lasttxid, TooFastEdit)
or errored
):
# client was editting too quickly
# so we instead cancel this order
print("SENDING CANCEL")
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
else:
# XXX: we **must** do this mapping for edit order
# status updates since the `openOrders` sub above
# never relays back the correct client-side `reqid`
# that is put in the order request..
reqids2txids[reqid] = txid
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')
@ -765,8 +823,7 @@ def process_status(
event: dict[str, str], event: dict[str, str],
oid: str, oid: str,
token: str, token: str,
msgs: list[MsgUnion], chain: ChainMap,
last: MsgUnion,
) -> tuple[list[MsgUnion], bool]: ) -> tuple[list[MsgUnion], bool]:
''' '''
@ -782,7 +839,7 @@ def process_status(
'errorMessage': errmsg, 'errorMessage': errmsg,
}: }:
# any of ``{'add', 'edit', 'cancel'}`` # any of ``{'add', 'edit', 'cancel'}``
action = etype.rstrip('OrderStatus') action = etype.removesuffix('OrderStatus')
log.error( log.error(
f'Failed to {action} order {reqid}:\n' f'Failed to {action} order {reqid}:\n'
f'{errmsg}' f'{errmsg}'
@ -791,7 +848,7 @@ def process_status(
oid=oid, oid=oid,
# XXX: use old reqid in case it changed? # XXX: use old reqid in case it changed?
reqid=reqid, reqid=reqid,
symbol=getattr(last, 'symbol', 'N/A'), symbol=chain.get('symbol', 'N/A'),
reason=f'Failed {action}:\n{errmsg}', reason=f'Failed {action}:\n{errmsg}',
broker_details=event broker_details=event
@ -842,7 +899,7 @@ def process_status(
# 'txid': txids, # 'txid': txids,
**rest, **rest,
}: }:
for txid in rest.get('txid', [last.reqid]): for txid in rest.get('txid', [chain['reqid']]):
log.info( log.info(
f'Cancelling order {oid}[requid={reqid}]:\n' f'Cancelling order {oid}[requid={reqid}]:\n'
f'brokerd reqid: {reqid}\n' f'brokerd reqid: {reqid}\n'