diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 9bb35b00..4bd16f22 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,7 +26,7 @@ from typing import ( ) import time -from async_generator import aclosing +from trio_util import trio_async_generator import trio from trio_typing import TaskStatus import pendulum @@ -318,7 +318,10 @@ class AggTrade(Struct): M: bool # Ignore -async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: +@trio_async_generator +async def stream_messages( + ws: NoBsWs, +) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 while True: @@ -540,7 +543,7 @@ async def stream_quotes( ) as ws, # avoid stream-gen closure from breaking trio.. - aclosing(stream_messages(ws)) as msg_gen, + stream_messages(ws) as msg_gen, ): typ, quote = await anext(msg_gen) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index a737aaad..b8228a55 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -27,11 +27,11 @@ from typing import ( ) import time -from async_generator import aclosing from fuzzywuzzy import process as fuzzy import numpy as np import pendulum from trio_typing import TaskStatus +from trio_util import trio_async_generator import tractor import trio @@ -122,6 +122,7 @@ async def stream_messages( yield msg +@trio_async_generator async def process_data_feed_msgs( ws: NoBsWs, ): @@ -378,7 +379,12 @@ async def stream_quotes( 'wss://ws.kraken.com/', fixture=subscribe, ) as ws, - aclosing(process_data_feed_msgs(ws)) as msg_gen, + + # avoid stream-gen closure from breaking trio.. + # NOTE: not sure this actually works XD particularly + # if we call `ws._connect()` manally in the streaming + # async gen.. + process_data_feed_msgs(ws) as msg_gen, ): # pull a first quote and deliver typ, ohlc_last = await anext(msg_gen) diff --git a/setup.py b/setup.py index 2a686cc5..19729481 100755 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ setup( # async 'trio', 'trio-websocket', + 'trio-util', 'async_generator', # from github currently (see requirements.txt)