From d06bbec24f5cbe7a16ec34c886d658d46d96a49a Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 15 Apr 2026 19:04:17 -0400 Subject: [PATCH] Replace `TooFastEdit` sentinel with `set` tracker Drop the pattern of storing a `TooFastEdit` exc instance in `reqids2txids` as a sentinel value; instead track affected reqids in a dedicated `toofastedit: set[int]` and check membership via `reqid in toofastedit`. Deats, - Comment out `TooFastEdit` class and its `reg_err_types()` call. - Add `toofastedit` param to both `handle_order_requests()` and `handle_order_updates()`, threaded from `open_trade_dialog()`. Also, - Use `partial()` with kwargs for the `tn.start_soon()` call to the order handler. - Add `await tractor.pause()` on the too-fast edit path for runtime debugging; will remove once confident this all works. - Expand comments explaining the cancel/edit race condition. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/brokers/kraken/broker.py | 65 ++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 2b34d5cd..c91155bc 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -97,11 +97,11 @@ MsgUnion = Union[ ] -class TooFastEdit(Exception): - 'Edit requests faster then api submissions' +# class TooFastEdit(Exception): +# 'Edit requests faster then api submissions' -reg_err_types([TooFastEdit]) +# reg_err_types([TooFastEdit]) # TODO: make this wrap the `api.Client` and `ws` instances @@ -138,15 +138,16 @@ async def handle_order_requests( apiflows: OrderDialogs, ids: bidict[str, int], reqids2txids: dict[int, str], + toofastedit: set[int], ) -> None: ''' - Process new order submission requests from the EMS - and deliver acks or errors. + `trio.Task` which handles order ctl requests from the EMS and + deliver acks or errors back on that IPC dialog. ''' # XXX: UGH, let's unify this.. with ``msgspec``!!! - msg: dict | Order + msg: dict|Order async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') match msg: @@ -160,8 +161,14 @@ async def handle_order_requests( txid = reqids2txids[reqid] except KeyError: # XXX: not sure if this block ever gets hit now? + # SEEMS TO on the race case with the update task? + # - update dark order quickly after + # triggered-submitted and then we have inavlid + # value in `reqids2txids` sent over ws.send()?? log.error('TOO FAST CANCEL/EDIT') - reqids2txids[reqid] = TooFastEdit(reqid) + # reqids2txids[reqid] = TooFastEdit(reqid) + toofastedit.add(reqid) + reqids2txids[reqid] = reqid await ems_order_stream.send( BrokerdError( oid=msg['oid'], @@ -199,13 +206,16 @@ async def handle_order_requests( # XXX: not sure if this block ever gets hit now? log.error('TOO FAST EDIT') - reqids2txids[reqid] = TooFastEdit(reqid) + # reqids2txids[reqid] = TooFastEdit(reqid) + reqids2txids[reqid] = reqid + toofastedit.add(reqid) + await tractor.pause() await ems_order_stream.send( BrokerdError( oid=msg['oid'], symbol=msg['symbol'], reason=( - f'TooFastEdit reqid:{reqid}, cancelling..' + f'TooFastEdit reqid: {reqid}, cancelling..' ), ) @@ -665,6 +675,10 @@ async def open_trade_dialog( token: str = await client.get_ws_token() + # XXX tracks EMS orders which are updated too quickly + # on the emds side with sync-issues on the kraken side. + toofastedit: set[int] = set() + ws: NoBsWs async with ( ctx.open_stream() as ems_stream, @@ -680,15 +694,16 @@ async def open_trade_dialog( trio.open_nursery() as tn, ): # task for processing inbound requests from ems - tn.start_soon( + tn.start_soon(partial( handle_order_requests, - ws, - client, - ems_stream, - apiflows, - ids, - reqids2txids, - ) + ws=ws, + client=client, + ems_stream=ems_stream, + apiflows=apiflows, + ids=ids, + reqids2txids=reqids2txids, + toofastedit=toofastedit, + )) # enter relay loop await handle_order_updates( @@ -699,6 +714,7 @@ async def open_trade_dialog( apiflows=apiflows, ids=ids, reqids2txids=reqids2txids, + toofastedit=toofastedit, acnt=acnt, ledger=ledger, acctid=acctid, @@ -714,6 +730,7 @@ async def handle_order_updates( apiflows: OrderDialogs, ids: bidict[str, int], reqids2txids: bidict[int, str], + toofastedit: set[int], acnt: Account, # transaction records which will be updated @@ -1021,10 +1038,12 @@ async def handle_order_updates( # <-> ems dialog. if ( status == 'open' - and isinstance( - reqids2txids.get(reqid), - TooFastEdit - ) + and + reqid in toofastedit + # isinstance( + # reqids2txids.get(reqid), + # TooFastEdit + # ) ): # TODO: don't even allow this case # by not moving the client side line @@ -1186,7 +1205,9 @@ async def handle_order_updates( txid # we throttle too-fast-requests on the ems side - and not isinstance(txid, TooFastEdit) + and + reqid in toofastedit + # not isinstance(txid, TooFastEdit) ): # client was editting too quickly # so we instead cancel this order