Trigger connection reset on slowed heartbeat
							parent
							
								
									e92abd376a
								
							
						
					
					
						commit
						03c5c7d2ba
					
				|  | @ -6,11 +6,13 @@ from dataclasses import dataclass, asdict, field | ||||||
| from itertools import starmap | from itertools import starmap | ||||||
| from typing import List, Dict, Any | from typing import List, Dict, Any | ||||||
| import json | import json | ||||||
|  | import time | ||||||
| 
 | 
 | ||||||
| import trio_websocket | import trio_websocket | ||||||
| import arrow | import arrow | ||||||
| import asks | import asks | ||||||
| import numpy as np | import numpy as np | ||||||
|  | import trio | ||||||
| import tractor | import tractor | ||||||
| 
 | 
 | ||||||
| from ._util import resproc, SymbolNotFound, BrokerError | from ._util import resproc, SymbolNotFound, BrokerError | ||||||
|  | @ -116,6 +118,8 @@ async def get_client() -> Client: | ||||||
| 
 | 
 | ||||||
| @dataclass | @dataclass | ||||||
| class OHLC: | class OHLC: | ||||||
|  |     """Description of the flattened OHLC quote format. | ||||||
|  |     """ | ||||||
|     chan_id: int  # internal kraken id |     chan_id: int  # internal kraken id | ||||||
|     chan_name: str  # eg. ohlc-1  (name-interval) |     chan_name: str  # eg. ohlc-1  (name-interval) | ||||||
|     pair: str  # fx pair |     pair: str  # fx pair | ||||||
|  | @ -181,17 +185,28 @@ async def stream_quotes( | ||||||
|                 async def recv(): |                 async def recv(): | ||||||
|                     return json.loads(await ws.get_message()) |                     return json.loads(await ws.get_message()) | ||||||
| 
 | 
 | ||||||
|                 import time |  | ||||||
| 
 |  | ||||||
|                 async def recv_ohlc(): |                 async def recv_ohlc(): | ||||||
|                     last_hb = 0 |                     too_slow_count = last_hb = 0 | ||||||
|                     while True: |                     while True: | ||||||
|                         msg = await recv() |                         with trio.move_on_after(1.5) as cs: | ||||||
|  |                             msg = await recv() | ||||||
|  | 
 | ||||||
|  |                         # trigger reconnection logic if too slow | ||||||
|  |                         if cs.cancelled_caught: | ||||||
|  |                             too_slow_count += 1 | ||||||
|  |                             if too_slow_count > 2: | ||||||
|  |                                 log.warning( | ||||||
|  |                                     "Heartbeat is to slow, " | ||||||
|  |                                     "resetting ws connection") | ||||||
|  |                                 raise trio_websocket._impl.ConnectionClosed( | ||||||
|  |                                     "Reset Connection") | ||||||
|  | 
 | ||||||
|                         if isinstance(msg, dict): |                         if isinstance(msg, dict): | ||||||
|                             if msg.get('event') == 'heartbeat': |                             if msg.get('event') == 'heartbeat': | ||||||
|                                 log.trace( |                                 now = time.time() | ||||||
|                                     f"Heartbeat after {time.time() - last_hb}") |                                 delay = now - last_hb | ||||||
|                                 last_hb = time.time() |                                 last_hb = now | ||||||
|  |                                 log.trace(f"Heartbeat after {delay}") | ||||||
|                                 # TODO: hmm i guess we should use this |                                 # TODO: hmm i guess we should use this | ||||||
|                                 # for determining when to do connection |                                 # for determining when to do connection | ||||||
|                                 # resets eh? |                                 # resets eh? | ||||||
|  | @ -203,6 +218,7 @@ async def stream_quotes( | ||||||
|                             chan_id, ohlc_array, chan_name, pair = msg |                             chan_id, ohlc_array, chan_name, pair = msg | ||||||
|                             yield OHLC(chan_id, chan_name, pair, *ohlc_array) |                             yield OHLC(chan_id, chan_name, pair, *ohlc_array) | ||||||
| 
 | 
 | ||||||
|  |                 # pull a first quote and deliver | ||||||
|                 ohlc_gen = recv_ohlc() |                 ohlc_gen = recv_ohlc() | ||||||
|                 ohlc_last = await ohlc_gen.__anext__() |                 ohlc_last = await ohlc_gen.__anext__() | ||||||
|                 yield asdict(ohlc_last) |                 yield asdict(ohlc_last) | ||||||
|  | @ -210,6 +226,7 @@ async def stream_quotes( | ||||||
|                 # keep start of last interval for volume tracking |                 # keep start of last interval for volume tracking | ||||||
|                 last_interval_start = ohlc_last.etime |                 last_interval_start = ohlc_last.etime | ||||||
| 
 | 
 | ||||||
|  |                 # start streaming | ||||||
|                 async for ohlc in ohlc_gen: |                 async for ohlc in ohlc_gen: | ||||||
| 
 | 
 | ||||||
|                     # generate tick values to match time & sales pane: |                     # generate tick values to match time & sales pane: | ||||||
|  | @ -231,7 +248,7 @@ async def stream_quotes( | ||||||
|                     yield asdict(ohlc) |                     yield asdict(ohlc) | ||||||
|                     ohlc_last = ohlc |                     ohlc_last = ohlc | ||||||
|         except trio_websocket._impl.ConnectionClosed: |         except trio_websocket._impl.ConnectionClosed: | ||||||
|             log.error("Good job kraken...") |             log.exception("Good job kraken...reconnecting") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue