From b03564da2cc77cb9a2dfe1711b35c2e25b6fb48b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Apr 2023 13:48:18 -0400 Subject: [PATCH] binance: port to new `NoBsWs` api and drop `trio_util` usage --- piker/brokers/binance.py | 35 ++++++++--------------------------- 1 file changed, 8 insertions(+), 27 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 915902fb..cde20d3f 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -21,7 +21,10 @@ Binance backend """ -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + aclosing, +) from datetime import datetime # from functools import lru_cache from decimal import Decimal @@ -31,7 +34,6 @@ from typing import ( ) import time -from trio_util import trio_async_generator import trio from trio_typing import TaskStatus import pendulum @@ -39,7 +41,6 @@ import asks from fuzzywuzzy import process as fuzzy import numpy as np import tractor -import wsproto from .._cacheables import async_lifo_cache from ..accounting._mktinfo import ( @@ -357,36 +358,16 @@ class AggTrade(Struct): M: bool # Ignore -@trio_async_generator async def stream_messages( ws: NoBsWs, ) -> AsyncGenerator[NoBsWs, dict]: - timeouts = 0 - while True: - - with trio.move_on_after(3) as cs: - msg = await ws.recv_msg() - - if cs.cancelled_caught: - - timeouts += 1 - if timeouts > 2: - log.error("binance feed seems down and slow af? rebooting...") - try: - await ws._connect() - except BaseException as err: - assert err - # Wut in the f#@$% is going on here. - with trio.CancelScope(shield=True): - await tractor.breakpoint() - - continue + # TODO: match syntax here! + async for msg in ws: # for l1 streams binance doesn't add an event type field so # identify those messages by matching keys # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams - if msg.get('u'): sym = msg['s'] bid = float(msg['b']) @@ -545,7 +526,7 @@ async def stream_quotes( ) @acm - async def subscribe(ws: wsproto.WSConnection): + async def subscribe(ws: NoBsWs): # setup subs # trade data (aka L1) @@ -591,7 +572,7 @@ async def stream_quotes( ) as ws, # avoid stream-gen closure from breaking trio.. - stream_messages(ws) as msg_gen, + aclosing(stream_messages(ws)) as msg_gen, ): typ, quote = await anext(msg_gen)