Normalize kraken quotes for latency tracking

its_happening
Tyler Goodlet 2020-08-26 21:44:03 -04:00
parent 778e3c7b06
commit 38df68935d
1 changed files with 57 additions and 41 deletions

View File

@ -120,6 +120,9 @@ async def get_client() -> Client:
@dataclass @dataclass
class OHLC: class OHLC:
"""Description of the flattened OHLC quote format. """Description of the flattened OHLC quote format.
For schema details see:
https://docs.kraken.com/websockets/#message-ohlc
""" """
chan_id: int # internal kraken id chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval) chan_name: str # eg. ohlc-1 (name-interval)
@ -144,6 +147,56 @@ class OHLC:
setattr(self, f, val.type(getattr(self, f))) setattr(self, f, val.type(getattr(self, f)))
async def recv_ohlc(recv):
too_slow_count = last_hb = 0
while True:
with trio.move_on_after(1.5) as cs:
msg = await recv()
# trigger reconnection logic if too slow
if cs.cancelled_caught:
too_slow_count += 1
if too_slow_count > 2:
log.warning(
"Heartbeat is to slow, "
"resetting ws connection")
raise trio_websocket._impl.ConnectionClosed(
"Reset Connection")
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
now = time.time()
delay = now - last_hb
last_hb = now
log.trace(f"Heartbeat after {delay}")
# TODO: hmm i guess we should use this
# for determining when to do connection
# resets eh?
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
chan_id, ohlc_array, chan_name, pair = msg
yield OHLC(chan_id, chan_name, pair, *ohlc_array)
def normalize(
ohlc: OHLC,
) -> dict:
quote = asdict(ohlc)
quote['broker_ts'] = quote['time']
quote['brokerd_ts'] = time.time()
quote['pair'] = quote['pair'].replace('/', '')
# seriously eh? what's with this non-symmetry everywhere
# in subscription systems...
topic = quote['pair'].replace('/', '')
print(quote)
return topic, quote
@tractor.msg.pub @tractor.msg.pub
async def stream_quotes( async def stream_quotes(
get_topics: Callable, get_topics: Callable,
@ -192,47 +245,11 @@ async def stream_quotes(
async def recv(): async def recv():
return json.loads(await ws.get_message()) return json.loads(await ws.get_message())
async def recv_ohlc():
too_slow_count = last_hb = 0
while True:
with trio.move_on_after(1.5) as cs:
msg = await recv()
# trigger reconnection logic if too slow
if cs.cancelled_caught:
too_slow_count += 1
if too_slow_count > 2:
log.warning(
"Heartbeat is to slow, "
"resetting ws connection")
raise trio_websocket._impl.ConnectionClosed(
"Reset Connection")
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
now = time.time()
delay = now - last_hb
last_hb = now
log.trace(f"Heartbeat after {delay}")
# TODO: hmm i guess we should use this
# for determining when to do connection
# resets eh?
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
chan_id, ohlc_array, chan_name, pair = msg
yield OHLC(chan_id, chan_name, pair, *ohlc_array)
# pull a first quote and deliver # pull a first quote and deliver
ohlc_gen = recv_ohlc() ohlc_gen = recv_ohlc(recv)
ohlc_last = await ohlc_gen.__anext__() ohlc_last = await ohlc_gen.__anext__()
# seriously eh? what's with this non-symmetry everywhere topic, quote = normalize(ohlc_last)
# in subscription systems...
quote = asdict(ohlc_last)
topic = quote['pair'].replace('/', '')
# packetize as {topic: quote} # packetize as {topic: quote}
yield {topic: quote} yield {topic: quote}
@ -260,11 +277,10 @@ async def stream_quotes(
'size': tick_volume, 'size': tick_volume,
}) })
topic, quote = normalize(ohlc)
# XXX: format required by ``tractor.msg.pub`` # XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]`` # requires a ``Dict[topic: str, quote: dict]``
quote = asdict(ohlc)
print(quote)
topic = quote['pair'].replace('/', '')
yield {topic: quote} yield {topic: quote}
ohlc_last = ohlc ohlc_last = ohlc