Pass our manually mapped `reqid: int` to EMS
Since we seem to always be able to get back the `reqid`/`userref` value we send to kraken ws endpoints, we can use this as our brokerd side order id and avoid all race cases with getting the true `txid` value that `kraken` assigns (and which changes when you do "edits" :eyeroll:). This simplifies status updates by allowing our relay loop just to pass back our generated `.reqid` verbatim and allows responding with a `BrokerdOrderAck` immediately in the request handler task which should guarantee there are no further race conditions with the relay loop and mapping `txid`s from kraken.. and figuring out wtf to do when they change, etc.krakenwsbackup
							parent
							
								
									78b9d90202
								
							
						
					
					
						commit
						5100036e10
					
				| 
						 | 
				
			
			@ -98,7 +98,7 @@ async def handle_order_requests(
 | 
			
		|||
        log.info(f'Rx order msg:\n{pformat(msg)}')
 | 
			
		||||
        match msg:
 | 
			
		||||
            case {
 | 
			
		||||
                'account': 'kraken.spot',
 | 
			
		||||
                'account': 'kraken.spot' as account,
 | 
			
		||||
                'action': action,
 | 
			
		||||
            } if action in {'buy', 'sell'}:
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -120,7 +120,7 @@ async def handle_order_requests(
 | 
			
		|||
                    reqid = next(counter)
 | 
			
		||||
                    ids[order.oid] = reqid
 | 
			
		||||
                    log.debug(
 | 
			
		||||
                        f"GENERATED ORDER {reqid}\n"
 | 
			
		||||
                        f"Adding order {reqid}\n"
 | 
			
		||||
                        f'{ids}'
 | 
			
		||||
                    )
 | 
			
		||||
                    extra = {
 | 
			
		||||
| 
						 | 
				
			
			@ -149,11 +149,18 @@ async def handle_order_requests(
 | 
			
		|||
                log.info(f'Submitting WS order request:\n{pformat(req)}')
 | 
			
		||||
                await ws.send_msg(req)
 | 
			
		||||
 | 
			
		||||
                resp = BrokerdOrderAck(
 | 
			
		||||
                    oid=order.oid,  # ems order request id
 | 
			
		||||
                    reqid=reqid,  # our custom int mapping
 | 
			
		||||
                    account=account,  # piker account
 | 
			
		||||
                )
 | 
			
		||||
                await ems_order_stream.send(resp)
 | 
			
		||||
 | 
			
		||||
                # placehold for sanity checking in relay loop
 | 
			
		||||
                emsflow.setdefault(order.oid, []).append(order)
 | 
			
		||||
 | 
			
		||||
            case {
 | 
			
		||||
                'account': 'kraken.spot',
 | 
			
		||||
                'account': 'kraken.spot' as account,
 | 
			
		||||
                'action': 'cancel',
 | 
			
		||||
            }:
 | 
			
		||||
                cancel = BrokerdCancel(**msg)
 | 
			
		||||
| 
						 | 
				
			
			@ -369,6 +376,8 @@ async def handle_order_updates(
 | 
			
		|||
            case [
 | 
			
		||||
                trades_msgs,
 | 
			
		||||
                'ownTrades',
 | 
			
		||||
                # won't exist for historical values?
 | 
			
		||||
                # 'userref': reqid,
 | 
			
		||||
                {'sequence': seq},
 | 
			
		||||
            ]:
 | 
			
		||||
                # flatten msgs for processing
 | 
			
		||||
| 
						 | 
				
			
			@ -382,8 +391,12 @@ async def handle_order_updates(
 | 
			
		|||
                }
 | 
			
		||||
                for tid, trade in trades.items():
 | 
			
		||||
 | 
			
		||||
                    # parse-cast
 | 
			
		||||
                    reqid = trade['ordertxid']
 | 
			
		||||
                    # NOTE: try to get the requid sent in the order
 | 
			
		||||
                    # request message if posssible; it may not be
 | 
			
		||||
                    # provided since this sub also returns generic
 | 
			
		||||
                    # historical trade events.
 | 
			
		||||
                    reqid = trade.get('userref', trade['ordertxid'])
 | 
			
		||||
 | 
			
		||||
                    action = trade['type']
 | 
			
		||||
                    price = float(trade['price'])
 | 
			
		||||
                    size = float(trade['vol'])
 | 
			
		||||
| 
						 | 
				
			
			@ -392,6 +405,7 @@ async def handle_order_updates(
 | 
			
		|||
                    # send a fill msg for gui update
 | 
			
		||||
                    fill_msg = BrokerdFill(
 | 
			
		||||
                        reqid=reqid,
 | 
			
		||||
 | 
			
		||||
                        time_ns=time.time_ns(),
 | 
			
		||||
 | 
			
		||||
                        action=action,
 | 
			
		||||
| 
						 | 
				
			
			@ -540,7 +554,7 @@ async def handle_order_updates(
 | 
			
		|||
                            # order state updates
 | 
			
		||||
                            resp = BrokerdStatus(
 | 
			
		||||
 | 
			
		||||
                                reqid=txid,
 | 
			
		||||
                                reqid=reqid,
 | 
			
		||||
                                time_ns=time.time_ns(),  # cuz why not
 | 
			
		||||
                                account=f'kraken.{acctid}',
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -598,6 +612,7 @@ async def handle_order_updates(
 | 
			
		|||
                #         'txid': [last.reqid],  # txid from submission
 | 
			
		||||
                #     })
 | 
			
		||||
 | 
			
		||||
                if resps:
 | 
			
		||||
                    msgs.extend(resps)
 | 
			
		||||
                    for resp in resps:
 | 
			
		||||
                        await ems_stream.send(resp.dict())
 | 
			
		||||
| 
						 | 
				
			
			@ -635,7 +650,7 @@ def process_status(
 | 
			
		|||
            resp = BrokerdError(
 | 
			
		||||
                oid=oid,
 | 
			
		||||
                # XXX: use old reqid in case it changed?
 | 
			
		||||
                reqid=last.reqid,
 | 
			
		||||
                reqid=reqid,
 | 
			
		||||
                symbol=getattr(last, 'symbol', 'N/A'),
 | 
			
		||||
 | 
			
		||||
                reason=f'Failed {action}:\n{errmsg}',
 | 
			
		||||
| 
						 | 
				
			
			@ -657,12 +672,7 @@ def process_status(
 | 
			
		|||
                f're-mapped reqid: {reqid}\n'
 | 
			
		||||
                f'txid: {txid}\n'
 | 
			
		||||
            )
 | 
			
		||||
            resp = BrokerdOrderAck(
 | 
			
		||||
                oid=oid,  # ems order request id
 | 
			
		||||
                reqid=txid,  # kraken unique order id
 | 
			
		||||
                account=last.account,  # piker account
 | 
			
		||||
            )
 | 
			
		||||
            return [resp], False
 | 
			
		||||
            return [], False
 | 
			
		||||
 | 
			
		||||
        case {
 | 
			
		||||
            'event': 'editOrderStatus',
 | 
			
		||||
| 
						 | 
				
			
			@ -680,12 +690,7 @@ def process_status(
 | 
			
		|||
                f'{descr}'
 | 
			
		||||
            )
 | 
			
		||||
            # deliver another ack to update the ems-side `.reqid`.
 | 
			
		||||
            resp = BrokerdOrderAck(
 | 
			
		||||
                oid=oid,  # ems order request id
 | 
			
		||||
                reqid=txid,  # kraken unique order id
 | 
			
		||||
                account=last.account,  # piker account
 | 
			
		||||
            )
 | 
			
		||||
            return [resp], False
 | 
			
		||||
            return [], False
 | 
			
		||||
 | 
			
		||||
        case {
 | 
			
		||||
            "event": "cancelOrderStatus",
 | 
			
		||||
| 
						 | 
				
			
			@ -701,7 +706,7 @@ def process_status(
 | 
			
		|||
            resps: list[MsgUnion] = []
 | 
			
		||||
            for txid in rest.get('txid', [last.reqid]):
 | 
			
		||||
                resp = BrokerdStatus(
 | 
			
		||||
                    reqid=txid,
 | 
			
		||||
                    reqid=reqid,
 | 
			
		||||
                    account=last.account,
 | 
			
		||||
                    time_ns=time.time_ns(),
 | 
			
		||||
                    status='cancelled',
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue