Normalize kraken quotes for latency tracking
							parent
							
								
									ea8205968c
								
							
						
					
					
						commit
						0b42ac1420
					
				|  | @ -120,6 +120,9 @@ async def get_client() -> Client: | |||
| @dataclass | ||||
| class OHLC: | ||||
|     """Description of the flattened OHLC quote format. | ||||
| 
 | ||||
|     For schema details see: | ||||
|         https://docs.kraken.com/websockets/#message-ohlc | ||||
|     """ | ||||
|     chan_id: int  # internal kraken id | ||||
|     chan_name: str  # eg. ohlc-1  (name-interval) | ||||
|  | @ -144,6 +147,56 @@ class OHLC: | |||
|             setattr(self, f, val.type(getattr(self, f))) | ||||
| 
 | ||||
| 
 | ||||
| async def recv_ohlc(recv): | ||||
|     too_slow_count = last_hb = 0 | ||||
|     while True: | ||||
|         with trio.move_on_after(1.5) as cs: | ||||
|             msg = await recv() | ||||
| 
 | ||||
|         # trigger reconnection logic if too slow | ||||
|         if cs.cancelled_caught: | ||||
|             too_slow_count += 1 | ||||
|             if too_slow_count > 2: | ||||
|                 log.warning( | ||||
|                     "Heartbeat is to slow, " | ||||
|                     "resetting ws connection") | ||||
|                 raise trio_websocket._impl.ConnectionClosed( | ||||
|                     "Reset Connection") | ||||
| 
 | ||||
|         if isinstance(msg, dict): | ||||
|             if msg.get('event') == 'heartbeat': | ||||
|                 now = time.time() | ||||
|                 delay = now - last_hb | ||||
|                 last_hb = now | ||||
|                 log.trace(f"Heartbeat after {delay}") | ||||
|                 # TODO: hmm i guess we should use this | ||||
|                 # for determining when to do connection | ||||
|                 # resets eh? | ||||
|                 continue | ||||
|             err = msg.get('errorMessage') | ||||
|             if err: | ||||
|                 raise BrokerError(err) | ||||
|         else: | ||||
|             chan_id, ohlc_array, chan_name, pair = msg | ||||
|             yield OHLC(chan_id, chan_name, pair, *ohlc_array) | ||||
| 
 | ||||
| 
 | ||||
| def normalize( | ||||
|     ohlc: OHLC, | ||||
| ) -> dict: | ||||
|     quote = asdict(ohlc) | ||||
|     quote['broker_ts'] = quote['time'] | ||||
|     quote['brokerd_ts'] = time.time() | ||||
|     quote['pair'] = quote['pair'].replace('/', '') | ||||
| 
 | ||||
|     # seriously eh? what's with this non-symmetry everywhere | ||||
|     # in subscription systems... | ||||
|     topic = quote['pair'].replace('/', '') | ||||
| 
 | ||||
|     print(quote) | ||||
|     return topic, quote | ||||
| 
 | ||||
| 
 | ||||
| @tractor.msg.pub | ||||
| async def stream_quotes( | ||||
|     get_topics: Callable, | ||||
|  | @ -192,47 +245,11 @@ async def stream_quotes( | |||
|                 async def recv(): | ||||
|                     return json.loads(await ws.get_message()) | ||||
| 
 | ||||
|                 async def recv_ohlc(): | ||||
|                     too_slow_count = last_hb = 0 | ||||
|                     while True: | ||||
|                         with trio.move_on_after(1.5) as cs: | ||||
|                             msg = await recv() | ||||
| 
 | ||||
|                         # trigger reconnection logic if too slow | ||||
|                         if cs.cancelled_caught: | ||||
|                             too_slow_count += 1 | ||||
|                             if too_slow_count > 2: | ||||
|                                 log.warning( | ||||
|                                     "Heartbeat is to slow, " | ||||
|                                     "resetting ws connection") | ||||
|                                 raise trio_websocket._impl.ConnectionClosed( | ||||
|                                     "Reset Connection") | ||||
| 
 | ||||
|                         if isinstance(msg, dict): | ||||
|                             if msg.get('event') == 'heartbeat': | ||||
|                                 now = time.time() | ||||
|                                 delay = now - last_hb | ||||
|                                 last_hb = now | ||||
|                                 log.trace(f"Heartbeat after {delay}") | ||||
|                                 # TODO: hmm i guess we should use this | ||||
|                                 # for determining when to do connection | ||||
|                                 # resets eh? | ||||
|                                 continue | ||||
|                             err = msg.get('errorMessage') | ||||
|                             if err: | ||||
|                                 raise BrokerError(err) | ||||
|                         else: | ||||
|                             chan_id, ohlc_array, chan_name, pair = msg | ||||
|                             yield OHLC(chan_id, chan_name, pair, *ohlc_array) | ||||
| 
 | ||||
|                 # pull a first quote and deliver | ||||
|                 ohlc_gen = recv_ohlc() | ||||
|                 ohlc_gen = recv_ohlc(recv) | ||||
|                 ohlc_last = await ohlc_gen.__anext__() | ||||
| 
 | ||||
|                 # seriously eh? what's with this non-symmetry everywhere | ||||
|                 # in subscription systems... | ||||
|                 quote = asdict(ohlc_last) | ||||
|                 topic = quote['pair'].replace('/', '') | ||||
|                 topic, quote = normalize(ohlc_last) | ||||
| 
 | ||||
|                 # packetize as {topic: quote} | ||||
|                 yield {topic: quote} | ||||
|  | @ -260,11 +277,10 @@ async def stream_quotes( | |||
|                             'size': tick_volume, | ||||
|                         }) | ||||
| 
 | ||||
|                     topic, quote = normalize(ohlc) | ||||
| 
 | ||||
|                     # XXX: format required by ``tractor.msg.pub`` | ||||
|                     # requires a ``Dict[topic: str, quote: dict]`` | ||||
|                     quote = asdict(ohlc) | ||||
|                     print(quote) | ||||
|                     topic = quote['pair'].replace('/', '') | ||||
|                     yield {topic: quote} | ||||
| 
 | ||||
|                     ohlc_last = ohlc | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue