Trigger connection reset on slowed heartbeat
							parent
							
								
									f779af02f1
								
							
						
					
					
						commit
						699fffd964
					
				| 
						 | 
					@ -6,11 +6,13 @@ from dataclasses import dataclass, asdict, field
 | 
				
			||||||
from itertools import starmap
 | 
					from itertools import starmap
 | 
				
			||||||
from typing import List, Dict, Any
 | 
					from typing import List, Dict, Any
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import trio_websocket
 | 
					import trio_websocket
 | 
				
			||||||
import arrow
 | 
					import arrow
 | 
				
			||||||
import asks
 | 
					import asks
 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
 | 
					import trio
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from ._util import resproc, SymbolNotFound, BrokerError
 | 
					from ._util import resproc, SymbolNotFound, BrokerError
 | 
				
			||||||
| 
						 | 
					@ -116,6 +118,8 @@ async def get_client() -> Client:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@dataclass
 | 
					@dataclass
 | 
				
			||||||
class OHLC:
 | 
					class OHLC:
 | 
				
			||||||
 | 
					    """Description of the flattened OHLC quote format.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
    chan_id: int  # internal kraken id
 | 
					    chan_id: int  # internal kraken id
 | 
				
			||||||
    chan_name: str  # eg. ohlc-1  (name-interval)
 | 
					    chan_name: str  # eg. ohlc-1  (name-interval)
 | 
				
			||||||
    pair: str  # fx pair
 | 
					    pair: str  # fx pair
 | 
				
			||||||
| 
						 | 
					@ -181,17 +185,28 @@ async def stream_quotes(
 | 
				
			||||||
                async def recv():
 | 
					                async def recv():
 | 
				
			||||||
                    return json.loads(await ws.get_message())
 | 
					                    return json.loads(await ws.get_message())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                import time
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                async def recv_ohlc():
 | 
					                async def recv_ohlc():
 | 
				
			||||||
                    last_hb = 0
 | 
					                    too_slow_count = last_hb = 0
 | 
				
			||||||
                    while True:
 | 
					                    while True:
 | 
				
			||||||
                        msg = await recv()
 | 
					                        with trio.move_on_after(1.5) as cs:
 | 
				
			||||||
 | 
					                            msg = await recv()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        # trigger reconnection logic if too slow
 | 
				
			||||||
 | 
					                        if cs.cancelled_caught:
 | 
				
			||||||
 | 
					                            too_slow_count += 1
 | 
				
			||||||
 | 
					                            if too_slow_count > 2:
 | 
				
			||||||
 | 
					                                log.warning(
 | 
				
			||||||
 | 
					                                    "Heartbeat is to slow, "
 | 
				
			||||||
 | 
					                                    "resetting ws connection")
 | 
				
			||||||
 | 
					                                raise trio_websocket._impl.ConnectionClosed(
 | 
				
			||||||
 | 
					                                    "Reset Connection")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        if isinstance(msg, dict):
 | 
					                        if isinstance(msg, dict):
 | 
				
			||||||
                            if msg.get('event') == 'heartbeat':
 | 
					                            if msg.get('event') == 'heartbeat':
 | 
				
			||||||
                                log.trace(
 | 
					                                now = time.time()
 | 
				
			||||||
                                    f"Heartbeat after {time.time() - last_hb}")
 | 
					                                delay = now - last_hb
 | 
				
			||||||
                                last_hb = time.time()
 | 
					                                last_hb = now
 | 
				
			||||||
 | 
					                                log.trace(f"Heartbeat after {delay}")
 | 
				
			||||||
                                # TODO: hmm i guess we should use this
 | 
					                                # TODO: hmm i guess we should use this
 | 
				
			||||||
                                # for determining when to do connection
 | 
					                                # for determining when to do connection
 | 
				
			||||||
                                # resets eh?
 | 
					                                # resets eh?
 | 
				
			||||||
| 
						 | 
					@ -203,6 +218,7 @@ async def stream_quotes(
 | 
				
			||||||
                            chan_id, ohlc_array, chan_name, pair = msg
 | 
					                            chan_id, ohlc_array, chan_name, pair = msg
 | 
				
			||||||
                            yield OHLC(chan_id, chan_name, pair, *ohlc_array)
 | 
					                            yield OHLC(chan_id, chan_name, pair, *ohlc_array)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # pull a first quote and deliver
 | 
				
			||||||
                ohlc_gen = recv_ohlc()
 | 
					                ohlc_gen = recv_ohlc()
 | 
				
			||||||
                ohlc_last = await ohlc_gen.__anext__()
 | 
					                ohlc_last = await ohlc_gen.__anext__()
 | 
				
			||||||
                yield asdict(ohlc_last)
 | 
					                yield asdict(ohlc_last)
 | 
				
			||||||
| 
						 | 
					@ -210,6 +226,7 @@ async def stream_quotes(
 | 
				
			||||||
                # keep start of last interval for volume tracking
 | 
					                # keep start of last interval for volume tracking
 | 
				
			||||||
                last_interval_start = ohlc_last.etime
 | 
					                last_interval_start = ohlc_last.etime
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # start streaming
 | 
				
			||||||
                async for ohlc in ohlc_gen:
 | 
					                async for ohlc in ohlc_gen:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # generate tick values to match time & sales pane:
 | 
					                    # generate tick values to match time & sales pane:
 | 
				
			||||||
| 
						 | 
					@ -231,7 +248,7 @@ async def stream_quotes(
 | 
				
			||||||
                    yield asdict(ohlc)
 | 
					                    yield asdict(ohlc)
 | 
				
			||||||
                    ohlc_last = ohlc
 | 
					                    ohlc_last = ohlc
 | 
				
			||||||
        except trio_websocket._impl.ConnectionClosed:
 | 
					        except trio_websocket._impl.ConnectionClosed:
 | 
				
			||||||
            log.error("Good job kraken...")
 | 
					            log.exception("Good job kraken...reconnecting")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == '__main__':
 | 
					if __name__ == '__main__':
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue