Compare commits
	
		
			1 Commits 
		
	
	
		
			gitea_feat
			...
			kraken_use
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 9d0589f8d5 | 
|  | @ -114,16 +114,32 @@ async def handle_order_requests( | ||||||
|             }: |             }: | ||||||
|                 cancel = BrokerdCancel(**msg) |                 cancel = BrokerdCancel(**msg) | ||||||
|                 reqid = ids[cancel.oid] |                 reqid = ids[cancel.oid] | ||||||
|                 txid = reqids2txids[reqid] |  | ||||||
| 
 | 
 | ||||||
|                 # call ws api to cancel: |                 try: | ||||||
|                 # https://docs.kraken.com/websockets/#message-cancelOrder |                     txid = reqids2txids[reqid] | ||||||
|                 await ws.send_msg({ |                 except KeyError: | ||||||
|                     'event': 'cancelOrder', |                     # XXX: not sure if this block ever gets hit now? | ||||||
|                     'token': token, |                     log.error('TOO FAST CANCEL/EDIT') | ||||||
|                     'reqid': reqid, |                     reqids2txids[reqid] = TooFastEdit(reqid) | ||||||
|                     'txid': [txid],  # should be txid from submission |                     await ems_order_stream.send( | ||||||
|                 }) |                         BrokerdError( | ||||||
|  |                             oid=msg['oid'], | ||||||
|  |                             symbol=msg['symbol'], | ||||||
|  |                             reason=( | ||||||
|  |                                 f'TooFastEdit reqid:{reqid}, could not cancelling..' | ||||||
|  |                             ), | ||||||
|  | 
 | ||||||
|  |                         ) | ||||||
|  |                     ) | ||||||
|  |                 else: | ||||||
|  |                     # call ws api to cancel: | ||||||
|  |                     # https://docs.kraken.com/websockets/#message-cancelOrder | ||||||
|  |                     await ws.send_msg({ | ||||||
|  |                         'event': 'cancelOrder', | ||||||
|  |                         'token': token, | ||||||
|  |                         'reqid': reqid, | ||||||
|  |                         'txid': [txid],  # should be txid from submission | ||||||
|  |                     }) | ||||||
| 
 | 
 | ||||||
|             case { |             case { | ||||||
|                 'account': 'kraken.spot' as account, |                 'account': 'kraken.spot' as account, | ||||||
|  | @ -235,9 +251,17 @@ async def handle_order_requests( | ||||||
| async def subscribe( | async def subscribe( | ||||||
|     ws: wsproto.WSConnection, |     ws: wsproto.WSConnection, | ||||||
|     token: str, |     token: str, | ||||||
|     subs: list[str] = [ |     subs: list[tuple[str, dict]] = [ | ||||||
|         'ownTrades', |         ('ownTrades', { | ||||||
|         'openOrders', |             # don't send first 50 trades on startup, | ||||||
|  |             # we already pull this manually from the rest endpoint. | ||||||
|  |             'snapshot': False, | ||||||
|  |         },), | ||||||
|  |         ('openOrders', { | ||||||
|  |             # include rate limit counters | ||||||
|  |             'ratecounter': True, | ||||||
|  |         },), | ||||||
|  | 
 | ||||||
|     ], |     ], | ||||||
| ): | ): | ||||||
|     ''' |     ''' | ||||||
|  | @ -250,12 +274,15 @@ async def subscribe( | ||||||
|     # more specific logic for this in kraken's sync client: |     # more specific logic for this in kraken's sync client: | ||||||
|     # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 |     # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 | ||||||
|     assert token |     assert token | ||||||
|     for sub in subs: |     subnames: set[str] = set() | ||||||
|  | 
 | ||||||
|  |     for name, sub_opts in subs: | ||||||
|         msg = { |         msg = { | ||||||
|             'event': 'subscribe', |             'event': 'subscribe', | ||||||
|             'subscription': { |             'subscription': { | ||||||
|                 'name': sub, |                 'name': name, | ||||||
|                 'token': token, |                 'token': token, | ||||||
|  |                 **sub_opts, | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  | @ -264,7 +291,34 @@ async def subscribe( | ||||||
|         # since internally the ws methods appear to be FIFO |         # since internally the ws methods appear to be FIFO | ||||||
|         # locked. |         # locked. | ||||||
|         await ws.send_msg(msg) |         await ws.send_msg(msg) | ||||||
|  |         subnames.add(name) | ||||||
| 
 | 
 | ||||||
|  |     # wait on subscriptionn acks | ||||||
|  |     with trio.move_on_after(5): | ||||||
|  |         while True: | ||||||
|  |             match (msg := await ws.recv_msg()): | ||||||
|  |                 case { | ||||||
|  |                     'event': 'subscriptionStatus', | ||||||
|  |                     'status': 'subscribed', | ||||||
|  |                     'subscription': sub_opts, | ||||||
|  |                 } as msg: | ||||||
|  |                     log.info( | ||||||
|  |                         f'Sucessful subscribe for {sub_opts}:\n' | ||||||
|  |                         f'{pformat(msg)}' | ||||||
|  |                     ) | ||||||
|  |                     subnames.remove(sub_opts['name']) | ||||||
|  |                     if not subnames: | ||||||
|  |                         break | ||||||
|  | 
 | ||||||
|  |                 case { | ||||||
|  |                     'event': 'subscriptionStatus', | ||||||
|  |                     'status': 'error', | ||||||
|  |                     'errorMessage': errmsg, | ||||||
|  |                 } as msg: | ||||||
|  |                     raise RuntimeError( | ||||||
|  |                         f'{errmsg}\n\n' | ||||||
|  |                         f'{pformat(msg)}' | ||||||
|  |                     ) | ||||||
|     yield |     yield | ||||||
| 
 | 
 | ||||||
|     for sub in subs: |     for sub in subs: | ||||||
|  | @ -616,9 +670,10 @@ async def handle_order_updates( | ||||||
|                     for (tid, trade) in entry.items() |                     for (tid, trade) in entry.items() | ||||||
| 
 | 
 | ||||||
|                     # don't re-process datums we've already seen |                     # don't re-process datums we've already seen | ||||||
|                     if tid not in ledger_trans |                     # if tid not in ledger_trans | ||||||
|                 } |                 } | ||||||
|                 for tid, trade in trades.items(): |                 for tid, trade in trades.items(): | ||||||
|  |                     assert tid not in ledger_trans | ||||||
|                     txid = trade['ordertxid'] |                     txid = trade['ordertxid'] | ||||||
|                     reqid = trade.get('userref') |                     reqid = trade.get('userref') | ||||||
| 
 | 
 | ||||||
|  | @ -636,22 +691,8 @@ async def handle_order_updates( | ||||||
|                     size = float(trade['vol']) |                     size = float(trade['vol']) | ||||||
|                     broker_time = float(trade['time']) |                     broker_time = float(trade['time']) | ||||||
| 
 | 
 | ||||||
|                     # send a fill msg for gui update |                     # TODO: we can emit this on the "closed" state in | ||||||
|                     fill_msg = BrokerdFill( |                     # the `openOrders` sub-block below. | ||||||
|                         time_ns=time.time_ns(), |  | ||||||
|                         reqid=reqid, |  | ||||||
| 
 |  | ||||||
|                         action=action, |  | ||||||
|                         size=size, |  | ||||||
|                         price=price, |  | ||||||
| 
 |  | ||||||
|                         # TODO: maybe capture more msg data |  | ||||||
|                         # i.e fees? |  | ||||||
|                         broker_details={'name': 'kraken'} | trade, |  | ||||||
|                         broker_time=broker_time |  | ||||||
|                     ) |  | ||||||
|                     await ems_stream.send(fill_msg) |  | ||||||
| 
 |  | ||||||
|                     status_msg = BrokerdStatus( |                     status_msg = BrokerdStatus( | ||||||
|                         reqid=reqid, |                         reqid=reqid, | ||||||
|                         time_ns=time.time_ns(), |                         time_ns=time.time_ns(), | ||||||
|  | @ -905,17 +946,33 @@ async def handle_order_updates( | ||||||
|                         # NOTE: there is no `status` field |                         # NOTE: there is no `status` field | ||||||
|                         case { |                         case { | ||||||
|                             'vol_exec': vlm, |                             'vol_exec': vlm, | ||||||
|  |                             'avg_price': price, | ||||||
|                             'userref': reqid, |                             'userref': reqid, | ||||||
|                             **rest, |                             **rest, | ||||||
|                         }: |                         } as msg: | ||||||
|                             # TODO: emit fill msg from here? | 
 | ||||||
|                             ourreqid = reqids2txids.inverse[txid] |                             ourreqid = reqids2txids.inverse[txid] | ||||||
|                             assert reqid == ourreqid |                             assert reqid == ourreqid | ||||||
|                             log.info( |                             log.info( | ||||||
|                                 f'openOrders vlm={vlm} Fill for {reqid}:\n' |                                 f'openOrders vlm={vlm} Fill for {reqid}:\n' | ||||||
|                                 f'{update_msg}' |                                 f'{update_msg}' | ||||||
|                             ) |                             ) | ||||||
|                             continue | 
 | ||||||
|  |                             fill_msg = BrokerdFill( | ||||||
|  |                                 time_ns=time.time_ns(), | ||||||
|  |                                 reqid=reqid, | ||||||
|  | 
 | ||||||
|  |                                 # action=action,  # just use size value | ||||||
|  |                                 # for now? | ||||||
|  |                                 size=vlm, | ||||||
|  |                                 price=price, | ||||||
|  | 
 | ||||||
|  |                                 # TODO: maybe capture more msg data | ||||||
|  |                                 # i.e fees? | ||||||
|  |                                 broker_details={'name': 'kraken'} | trade, | ||||||
|  |                                 broker_time=broker_time | ||||||
|  |                             ) | ||||||
|  |                             await ems_stream.send(fill_msg) | ||||||
| 
 | 
 | ||||||
|                         case _: |                         case _: | ||||||
|                             log.warning( |                             log.warning( | ||||||
|  | @ -923,17 +980,7 @@ async def handle_order_updates( | ||||||
|                                 f'{txid}:{order_msg}' |                                 f'{txid}:{order_msg}' | ||||||
|                             ) |                             ) | ||||||
| 
 | 
 | ||||||
|             # NOTE: The only reason for this seems to be remapping |             # order request status updates | ||||||
|             # underlying `txid` values on order "edits" which the |  | ||||||
|             # `openOrders` sub doesn't seem to have any knowledge of. |  | ||||||
|             # UPDATE: seems like we don't need this any more thanks to |  | ||||||
|             # passing through the dialog key / reqid in the `newuserref` |  | ||||||
|             # field on edit requests. |  | ||||||
| 
 |  | ||||||
|             # I'd also like to ask them which event guarantees that the |  | ||||||
|             # the live order is now in the book, since these status ones |  | ||||||
|             # almost seem more like request-acks then state guarantees. |  | ||||||
|             # ANSWER the `openOrders` is more indicative of "liveness". |  | ||||||
|             case { |             case { | ||||||
|                 'event': etype, |                 'event': etype, | ||||||
|                 'status': status, |                 'status': status, | ||||||
|  | @ -1003,8 +1050,8 @@ async def handle_order_updates( | ||||||
|                             'reqid': reqid or 0, |                             'reqid': reqid or 0, | ||||||
|                             'txid': [txid], |                             'txid': [txid], | ||||||
|                         }) |                         }) | ||||||
| 
 |  | ||||||
|             case _: |             case _: | ||||||
|  | 
 | ||||||
|                 log.warning(f'Unhandled trades update msg: {msg}') |                 log.warning(f'Unhandled trades update msg: {msg}') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -222,10 +222,13 @@ class BrokerdFill(Struct): | ||||||
|     time_ns: int |     time_ns: int | ||||||
| 
 | 
 | ||||||
|     # order exeuction related |     # order exeuction related | ||||||
|     action: str |  | ||||||
|     size: float |     size: float | ||||||
|     price: float |     price: float | ||||||
| 
 | 
 | ||||||
|  |     # TODO: pretty sure we can just remove this and instaed use | ||||||
|  |     # +/- size values right? | ||||||
|  |     action: Optional[str] = None | ||||||
|  | 
 | ||||||
|     broker_details: dict = {}  # meta-data (eg. commisions etc.) |     broker_details: dict = {}  # meta-data (eg. commisions etc.) | ||||||
| 
 | 
 | ||||||
|     # brokerd timestamp required for order mode arrow placement on x-axis |     # brokerd timestamp required for order mode arrow placement on x-axis | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue