diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 10a55e3d..776d33cb 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -21,6 +21,7 @@ Order api and machinery from collections import ChainMap, defaultdict from contextlib import ( asynccontextmanager as acm, + aclosing, ) from functools import partial from itertools import count @@ -679,7 +680,7 @@ async def trades_dialogue( token=token, ), ) as ws, - stream_messages(ws) as stream, + aclosing(stream_messages(ws)) as stream, trio.open_nursery() as nurse, ): # task for processing inbound requests from ems diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 5b654970..e92c8021 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -18,7 +18,10 @@ Real-time and historical data feed endpoints. ''' -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + aclosing, +) from datetime import datetime from typing import ( Any, @@ -31,7 +34,6 @@ from fuzzywuzzy import process as fuzzy import numpy as np import pendulum from trio_typing import TaskStatus -from trio_util import trio_async_generator import tractor import trio @@ -82,7 +84,6 @@ class OHLC(Struct): ticks: list[Any] = [] -@trio_async_generator async def stream_messages( ws: NoBsWs, ): @@ -93,26 +94,9 @@ async def stream_messages( though a single async generator. ''' - too_slow_count = last_hb = 0 - - while True: - - with trio.move_on_after(5) as cs: - msg = await ws.recv_msg() - - # trigger reconnection if heartbeat is laggy - if cs.cancelled_caught: - - too_slow_count += 1 - - if too_slow_count > 20: - log.warning( - "Heartbeat is too slow, resetting ws connection") - - await ws._connect() - too_slow_count = 0 - continue + last_hb: float = 0 + async for msg in ws: match msg: case {'event': 'heartbeat'}: now = time.time() @@ -130,7 +114,6 @@ async def stream_messages( yield msg -@trio_async_generator async def process_data_feed_msgs( ws: NoBsWs, ): @@ -138,7 +121,7 @@ async def process_data_feed_msgs( Parse and pack data feed messages. ''' - async with stream_messages(ws) as ws_stream: + async with aclosing(stream_messages(ws)) as ws_stream: async for msg in ws_stream: match msg: case { @@ -416,13 +399,15 @@ async def stream_quotes( open_autorecon_ws( 'wss://ws.kraken.com/', fixture=subscribe, + msg_recv_timeout=5, + reset_after=20, ) as ws, # avoid stream-gen closure from breaking trio.. # NOTE: not sure this actually works XD particularly # if we call `ws._connect()` manally in the streaming # async gen.. - process_data_feed_msgs(ws) as msg_gen, + aclosing(process_data_feed_msgs(ws)) as msg_gen, ): # pull a first quote and deliver typ, ohlc_last = await anext(msg_gen)