Compare commits
	
		
			1 Commits 
		
	
	
		
			gitea_feat
			...
			kraken_use
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 9d0589f8d5 | 
|  | @ -114,16 +114,32 @@ async def handle_order_requests( | |||
|             }: | ||||
|                 cancel = BrokerdCancel(**msg) | ||||
|                 reqid = ids[cancel.oid] | ||||
|                 txid = reqids2txids[reqid] | ||||
| 
 | ||||
|                 # call ws api to cancel: | ||||
|                 # https://docs.kraken.com/websockets/#message-cancelOrder | ||||
|                 await ws.send_msg({ | ||||
|                     'event': 'cancelOrder', | ||||
|                     'token': token, | ||||
|                     'reqid': reqid, | ||||
|                     'txid': [txid],  # should be txid from submission | ||||
|                 }) | ||||
|                 try: | ||||
|                     txid = reqids2txids[reqid] | ||||
|                 except KeyError: | ||||
|                     # XXX: not sure if this block ever gets hit now? | ||||
|                     log.error('TOO FAST CANCEL/EDIT') | ||||
|                     reqids2txids[reqid] = TooFastEdit(reqid) | ||||
|                     await ems_order_stream.send( | ||||
|                         BrokerdError( | ||||
|                             oid=msg['oid'], | ||||
|                             symbol=msg['symbol'], | ||||
|                             reason=( | ||||
|                                 f'TooFastEdit reqid:{reqid}, could not cancelling..' | ||||
|                             ), | ||||
| 
 | ||||
|                         ) | ||||
|                     ) | ||||
|                 else: | ||||
|                     # call ws api to cancel: | ||||
|                     # https://docs.kraken.com/websockets/#message-cancelOrder | ||||
|                     await ws.send_msg({ | ||||
|                         'event': 'cancelOrder', | ||||
|                         'token': token, | ||||
|                         'reqid': reqid, | ||||
|                         'txid': [txid],  # should be txid from submission | ||||
|                     }) | ||||
| 
 | ||||
|             case { | ||||
|                 'account': 'kraken.spot' as account, | ||||
|  | @ -235,9 +251,17 @@ async def handle_order_requests( | |||
| async def subscribe( | ||||
|     ws: wsproto.WSConnection, | ||||
|     token: str, | ||||
|     subs: list[str] = [ | ||||
|         'ownTrades', | ||||
|         'openOrders', | ||||
|     subs: list[tuple[str, dict]] = [ | ||||
|         ('ownTrades', { | ||||
|             # don't send first 50 trades on startup, | ||||
|             # we already pull this manually from the rest endpoint. | ||||
|             'snapshot': False, | ||||
|         },), | ||||
|         ('openOrders', { | ||||
|             # include rate limit counters | ||||
|             'ratecounter': True, | ||||
|         },), | ||||
| 
 | ||||
|     ], | ||||
| ): | ||||
|     ''' | ||||
|  | @ -250,12 +274,15 @@ async def subscribe( | |||
|     # more specific logic for this in kraken's sync client: | ||||
|     # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 | ||||
|     assert token | ||||
|     for sub in subs: | ||||
|     subnames: set[str] = set() | ||||
| 
 | ||||
|     for name, sub_opts in subs: | ||||
|         msg = { | ||||
|             'event': 'subscribe', | ||||
|             'subscription': { | ||||
|                 'name': sub, | ||||
|                 'name': name, | ||||
|                 'token': token, | ||||
|                 **sub_opts, | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|  | @ -264,7 +291,34 @@ async def subscribe( | |||
|         # since internally the ws methods appear to be FIFO | ||||
|         # locked. | ||||
|         await ws.send_msg(msg) | ||||
|         subnames.add(name) | ||||
| 
 | ||||
|     # wait on subscriptionn acks | ||||
|     with trio.move_on_after(5): | ||||
|         while True: | ||||
|             match (msg := await ws.recv_msg()): | ||||
|                 case { | ||||
|                     'event': 'subscriptionStatus', | ||||
|                     'status': 'subscribed', | ||||
|                     'subscription': sub_opts, | ||||
|                 } as msg: | ||||
|                     log.info( | ||||
|                         f'Sucessful subscribe for {sub_opts}:\n' | ||||
|                         f'{pformat(msg)}' | ||||
|                     ) | ||||
|                     subnames.remove(sub_opts['name']) | ||||
|                     if not subnames: | ||||
|                         break | ||||
| 
 | ||||
|                 case { | ||||
|                     'event': 'subscriptionStatus', | ||||
|                     'status': 'error', | ||||
|                     'errorMessage': errmsg, | ||||
|                 } as msg: | ||||
|                     raise RuntimeError( | ||||
|                         f'{errmsg}\n\n' | ||||
|                         f'{pformat(msg)}' | ||||
|                     ) | ||||
|     yield | ||||
| 
 | ||||
|     for sub in subs: | ||||
|  | @ -616,9 +670,10 @@ async def handle_order_updates( | |||
|                     for (tid, trade) in entry.items() | ||||
| 
 | ||||
|                     # don't re-process datums we've already seen | ||||
|                     if tid not in ledger_trans | ||||
|                     # if tid not in ledger_trans | ||||
|                 } | ||||
|                 for tid, trade in trades.items(): | ||||
|                     assert tid not in ledger_trans | ||||
|                     txid = trade['ordertxid'] | ||||
|                     reqid = trade.get('userref') | ||||
| 
 | ||||
|  | @ -636,22 +691,8 @@ async def handle_order_updates( | |||
|                     size = float(trade['vol']) | ||||
|                     broker_time = float(trade['time']) | ||||
| 
 | ||||
|                     # send a fill msg for gui update | ||||
|                     fill_msg = BrokerdFill( | ||||
|                         time_ns=time.time_ns(), | ||||
|                         reqid=reqid, | ||||
| 
 | ||||
|                         action=action, | ||||
|                         size=size, | ||||
|                         price=price, | ||||
| 
 | ||||
|                         # TODO: maybe capture more msg data | ||||
|                         # i.e fees? | ||||
|                         broker_details={'name': 'kraken'} | trade, | ||||
|                         broker_time=broker_time | ||||
|                     ) | ||||
|                     await ems_stream.send(fill_msg) | ||||
| 
 | ||||
|                     # TODO: we can emit this on the "closed" state in | ||||
|                     # the `openOrders` sub-block below. | ||||
|                     status_msg = BrokerdStatus( | ||||
|                         reqid=reqid, | ||||
|                         time_ns=time.time_ns(), | ||||
|  | @ -905,17 +946,33 @@ async def handle_order_updates( | |||
|                         # NOTE: there is no `status` field | ||||
|                         case { | ||||
|                             'vol_exec': vlm, | ||||
|                             'avg_price': price, | ||||
|                             'userref': reqid, | ||||
|                             **rest, | ||||
|                         }: | ||||
|                             # TODO: emit fill msg from here? | ||||
|                         } as msg: | ||||
| 
 | ||||
|                             ourreqid = reqids2txids.inverse[txid] | ||||
|                             assert reqid == ourreqid | ||||
|                             log.info( | ||||
|                                 f'openOrders vlm={vlm} Fill for {reqid}:\n' | ||||
|                                 f'{update_msg}' | ||||
|                             ) | ||||
|                             continue | ||||
| 
 | ||||
|                             fill_msg = BrokerdFill( | ||||
|                                 time_ns=time.time_ns(), | ||||
|                                 reqid=reqid, | ||||
| 
 | ||||
|                                 # action=action,  # just use size value | ||||
|                                 # for now? | ||||
|                                 size=vlm, | ||||
|                                 price=price, | ||||
| 
 | ||||
|                                 # TODO: maybe capture more msg data | ||||
|                                 # i.e fees? | ||||
|                                 broker_details={'name': 'kraken'} | trade, | ||||
|                                 broker_time=broker_time | ||||
|                             ) | ||||
|                             await ems_stream.send(fill_msg) | ||||
| 
 | ||||
|                         case _: | ||||
|                             log.warning( | ||||
|  | @ -923,17 +980,7 @@ async def handle_order_updates( | |||
|                                 f'{txid}:{order_msg}' | ||||
|                             ) | ||||
| 
 | ||||
|             # NOTE: The only reason for this seems to be remapping | ||||
|             # underlying `txid` values on order "edits" which the | ||||
|             # `openOrders` sub doesn't seem to have any knowledge of. | ||||
|             # UPDATE: seems like we don't need this any more thanks to | ||||
|             # passing through the dialog key / reqid in the `newuserref` | ||||
|             # field on edit requests. | ||||
| 
 | ||||
|             # I'd also like to ask them which event guarantees that the | ||||
|             # the live order is now in the book, since these status ones | ||||
|             # almost seem more like request-acks then state guarantees. | ||||
|             # ANSWER the `openOrders` is more indicative of "liveness". | ||||
|             # order request status updates | ||||
|             case { | ||||
|                 'event': etype, | ||||
|                 'status': status, | ||||
|  | @ -1003,8 +1050,8 @@ async def handle_order_updates( | |||
|                             'reqid': reqid or 0, | ||||
|                             'txid': [txid], | ||||
|                         }) | ||||
| 
 | ||||
|             case _: | ||||
| 
 | ||||
|                 log.warning(f'Unhandled trades update msg: {msg}') | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -222,10 +222,13 @@ class BrokerdFill(Struct): | |||
|     time_ns: int | ||||
| 
 | ||||
|     # order exeuction related | ||||
|     action: str | ||||
|     size: float | ||||
|     price: float | ||||
| 
 | ||||
|     # TODO: pretty sure we can just remove this and instaed use | ||||
|     # +/- size values right? | ||||
|     action: Optional[str] = None | ||||
| 
 | ||||
|     broker_details: dict = {}  # meta-data (eg. commisions etc.) | ||||
| 
 | ||||
|     # brokerd timestamp required for order mode arrow placement on x-axis | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue