From eb05c78381e658ece96f8c535fbfe33e4aab36fc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Oct 2021 12:59:00 -0400 Subject: [PATCH] Update some typing and add latency checks for binance --- piker/brokers/binance.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 8a3f42e9..4d82474b 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -19,7 +19,7 @@ Binance backend """ from contextlib import asynccontextmanager -from typing import List, Dict, Any, Tuple, Union, Optional +from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator import time import trio @@ -37,7 +37,7 @@ from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws +from ..data._web_bs import open_autorecon_ws, NoBsWs log = get_logger(__name__) @@ -213,7 +213,7 @@ class Client: ) # repack in dict form return {item[0]['symbol']: item[0] - for item in matches} + for item in matches} async def bars( self, @@ -295,7 +295,7 @@ class AggTrade(BaseModel): M: bool # Ignore -async def stream_messages(ws): +async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 while True: @@ -487,11 +487,20 @@ async def stream_quotes( # signal to caller feed is ready for consumption feed_is_live.set() + # import time + # last = time.time() + # start streaming async for typ, msg in msg_gen: + # period = time.time() - last + # hz = 1/period if period else float('inf') + # if hz > 60: + # log.info(f'Binance quotez : {hz}') + topic = msg['symbol'].lower() await send_chan.send({topic: msg}) + # last = time.time() @tractor.context