kraken: port to new `NoBsWs`, passing timeout (counts) during setup

rekt_pps
Tyler Goodlet 2023-04-21 13:56:42 -04:00
parent b03564da2c
commit 34ff5ff249
2 changed files with 12 additions and 26 deletions

View File

@ -21,6 +21,7 @@ Order api and machinery
from collections import ChainMap, defaultdict from collections import ChainMap, defaultdict
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
aclosing,
) )
from functools import partial from functools import partial
from itertools import count from itertools import count
@ -679,7 +680,7 @@ async def trades_dialogue(
token=token, token=token,
), ),
) as ws, ) as ws,
stream_messages(ws) as stream, aclosing(stream_messages(ws)) as stream,
trio.open_nursery() as nurse, trio.open_nursery() as nurse,
): ):
# task for processing inbound requests from ems # task for processing inbound requests from ems

View File

@ -18,7 +18,10 @@
Real-time and historical data feed endpoints. Real-time and historical data feed endpoints.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from datetime import datetime from datetime import datetime
from typing import ( from typing import (
Any, Any,
@ -31,7 +34,6 @@ from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import pendulum import pendulum
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_util import trio_async_generator
import tractor import tractor
import trio import trio
@ -82,7 +84,6 @@ class OHLC(Struct):
ticks: list[Any] = [] ticks: list[Any] = []
@trio_async_generator
async def stream_messages( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
): ):
@ -93,26 +94,9 @@ async def stream_messages(
though a single async generator. though a single async generator.
''' '''
too_slow_count = last_hb = 0 last_hb: float = 0
while True:
with trio.move_on_after(5) as cs:
msg = await ws.recv_msg()
# trigger reconnection if heartbeat is laggy
if cs.cancelled_caught:
too_slow_count += 1
if too_slow_count > 20:
log.warning(
"Heartbeat is too slow, resetting ws connection")
await ws._connect()
too_slow_count = 0
continue
async for msg in ws:
match msg: match msg:
case {'event': 'heartbeat'}: case {'event': 'heartbeat'}:
now = time.time() now = time.time()
@ -130,7 +114,6 @@ async def stream_messages(
yield msg yield msg
@trio_async_generator
async def process_data_feed_msgs( async def process_data_feed_msgs(
ws: NoBsWs, ws: NoBsWs,
): ):
@ -138,7 +121,7 @@ async def process_data_feed_msgs(
Parse and pack data feed messages. Parse and pack data feed messages.
''' '''
async with stream_messages(ws) as ws_stream: async with aclosing(stream_messages(ws)) as ws_stream:
async for msg in ws_stream: async for msg in ws_stream:
match msg: match msg:
case { case {
@ -416,13 +399,15 @@ async def stream_quotes(
open_autorecon_ws( open_autorecon_ws(
'wss://ws.kraken.com/', 'wss://ws.kraken.com/',
fixture=subscribe, fixture=subscribe,
msg_recv_timeout=5,
reset_after=20,
) as ws, ) as ws,
# avoid stream-gen closure from breaking trio.. # avoid stream-gen closure from breaking trio..
# NOTE: not sure this actually works XD particularly # NOTE: not sure this actually works XD particularly
# if we call `ws._connect()` manally in the streaming # if we call `ws._connect()` manally in the streaming
# async gen.. # async gen..
process_data_feed_msgs(ws) as msg_gen, aclosing(process_data_feed_msgs(ws)) as msg_gen,
): ):
# pull a first quote and deliver # pull a first quote and deliver
typ, ohlc_last = await anext(msg_gen) typ, ohlc_last = await anext(msg_gen)