Replace `TooFastEdit` sentinel with `set` tracker
Drop the pattern of storing a `TooFastEdit` exc instance in `reqids2txids` as a sentinel value; instead track affected reqids in a dedicated `toofastedit: set[int]` and check membership via `reqid in toofastedit`. Deats, - Comment out `TooFastEdit` class and its `reg_err_types()` call. - Add `toofastedit` param to both `handle_order_requests()` and `handle_order_updates()`, threaded from `open_trade_dialog()`. Also, - Use `partial()` with kwargs for the `tn.start_soon()` call to the order handler. - Add `await tractor.pause()` on the too-fast edit path for runtime debugging; will remove once confident this all works. - Expand comments explaining the cancel/edit race condition. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codekraken_stale_ws_token
parent
8cefc1bdf8
commit
d06bbec24f
|
|
@ -97,11 +97,11 @@ MsgUnion = Union[
|
|||
]
|
||||
|
||||
|
||||
class TooFastEdit(Exception):
|
||||
'Edit requests faster then api submissions'
|
||||
# class TooFastEdit(Exception):
|
||||
# 'Edit requests faster then api submissions'
|
||||
|
||||
|
||||
reg_err_types([TooFastEdit])
|
||||
# reg_err_types([TooFastEdit])
|
||||
|
||||
|
||||
# TODO: make this wrap the `api.Client` and `ws` instances
|
||||
|
|
@ -138,15 +138,16 @@ async def handle_order_requests(
|
|||
apiflows: OrderDialogs,
|
||||
ids: bidict[str, int],
|
||||
reqids2txids: dict[int, str],
|
||||
toofastedit: set[int],
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Process new order submission requests from the EMS
|
||||
and deliver acks or errors.
|
||||
`trio.Task` which handles order ctl requests from the EMS and
|
||||
deliver acks or errors back on that IPC dialog.
|
||||
|
||||
'''
|
||||
# XXX: UGH, let's unify this.. with ``msgspec``!!!
|
||||
msg: dict | Order
|
||||
msg: dict|Order
|
||||
async for msg in ems_order_stream:
|
||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||
match msg:
|
||||
|
|
@ -160,8 +161,14 @@ async def handle_order_requests(
|
|||
txid = reqids2txids[reqid]
|
||||
except KeyError:
|
||||
# XXX: not sure if this block ever gets hit now?
|
||||
# SEEMS TO on the race case with the update task?
|
||||
# - update dark order quickly after
|
||||
# triggered-submitted and then we have inavlid
|
||||
# value in `reqids2txids` sent over ws.send()??
|
||||
log.error('TOO FAST CANCEL/EDIT')
|
||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
||||
# reqids2txids[reqid] = TooFastEdit(reqid)
|
||||
toofastedit.add(reqid)
|
||||
reqids2txids[reqid] = reqid
|
||||
await ems_order_stream.send(
|
||||
BrokerdError(
|
||||
oid=msg['oid'],
|
||||
|
|
@ -199,13 +206,16 @@ async def handle_order_requests(
|
|||
|
||||
# XXX: not sure if this block ever gets hit now?
|
||||
log.error('TOO FAST EDIT')
|
||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
||||
# reqids2txids[reqid] = TooFastEdit(reqid)
|
||||
reqids2txids[reqid] = reqid
|
||||
toofastedit.add(reqid)
|
||||
await tractor.pause()
|
||||
await ems_order_stream.send(
|
||||
BrokerdError(
|
||||
oid=msg['oid'],
|
||||
symbol=msg['symbol'],
|
||||
reason=(
|
||||
f'TooFastEdit reqid:{reqid}, cancelling..'
|
||||
f'TooFastEdit reqid: {reqid}, cancelling..'
|
||||
),
|
||||
|
||||
)
|
||||
|
|
@ -665,6 +675,10 @@ async def open_trade_dialog(
|
|||
|
||||
token: str = await client.get_ws_token()
|
||||
|
||||
# XXX tracks EMS orders which are updated too quickly
|
||||
# on the emds side with sync-issues on the kraken side.
|
||||
toofastedit: set[int] = set()
|
||||
|
||||
ws: NoBsWs
|
||||
async with (
|
||||
ctx.open_stream() as ems_stream,
|
||||
|
|
@ -680,15 +694,16 @@ async def open_trade_dialog(
|
|||
trio.open_nursery() as tn,
|
||||
):
|
||||
# task for processing inbound requests from ems
|
||||
tn.start_soon(
|
||||
tn.start_soon(partial(
|
||||
handle_order_requests,
|
||||
ws,
|
||||
client,
|
||||
ems_stream,
|
||||
apiflows,
|
||||
ids,
|
||||
reqids2txids,
|
||||
)
|
||||
ws=ws,
|
||||
client=client,
|
||||
ems_stream=ems_stream,
|
||||
apiflows=apiflows,
|
||||
ids=ids,
|
||||
reqids2txids=reqids2txids,
|
||||
toofastedit=toofastedit,
|
||||
))
|
||||
|
||||
# enter relay loop
|
||||
await handle_order_updates(
|
||||
|
|
@ -699,6 +714,7 @@ async def open_trade_dialog(
|
|||
apiflows=apiflows,
|
||||
ids=ids,
|
||||
reqids2txids=reqids2txids,
|
||||
toofastedit=toofastedit,
|
||||
acnt=acnt,
|
||||
ledger=ledger,
|
||||
acctid=acctid,
|
||||
|
|
@ -714,6 +730,7 @@ async def handle_order_updates(
|
|||
apiflows: OrderDialogs,
|
||||
ids: bidict[str, int],
|
||||
reqids2txids: bidict[int, str],
|
||||
toofastedit: set[int],
|
||||
acnt: Account,
|
||||
|
||||
# transaction records which will be updated
|
||||
|
|
@ -1021,10 +1038,12 @@ async def handle_order_updates(
|
|||
# <-> ems dialog.
|
||||
if (
|
||||
status == 'open'
|
||||
and isinstance(
|
||||
reqids2txids.get(reqid),
|
||||
TooFastEdit
|
||||
)
|
||||
and
|
||||
reqid in toofastedit
|
||||
# isinstance(
|
||||
# reqids2txids.get(reqid),
|
||||
# TooFastEdit
|
||||
# )
|
||||
):
|
||||
# TODO: don't even allow this case
|
||||
# by not moving the client side line
|
||||
|
|
@ -1186,7 +1205,9 @@ async def handle_order_updates(
|
|||
txid
|
||||
|
||||
# we throttle too-fast-requests on the ems side
|
||||
and not isinstance(txid, TooFastEdit)
|
||||
and
|
||||
reqid in toofastedit
|
||||
# not isinstance(txid, TooFastEdit)
|
||||
):
|
||||
# client was editting too quickly
|
||||
# so we instead cancel this order
|
||||
|
|
|
|||
Loading…
Reference in New Issue