Update some typing and add latency checks for binance

misc_backend_fixes
Tyler Goodlet 2021-10-22 12:59:00 -04:00
parent ff8c33cf7e
commit d27214621d
1 changed files with 13 additions and 4 deletions

View File

@ -19,7 +19,7 @@ Binance backend
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import List, Dict, Any, Tuple, Union, Optional from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
import time import time
import trio import trio
@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__) log = get_logger(__name__)
@ -295,7 +295,7 @@ class AggTrade(BaseModel):
M: bool # Ignore M: bool # Ignore
async def stream_messages(ws): async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
while True: while True:
@ -487,11 +487,20 @@ async def stream_quotes(
# signal to caller feed is ready for consumption # signal to caller feed is ready for consumption
feed_is_live.set() feed_is_live.set()
# import time
# last = time.time()
# start streaming # start streaming
async for typ, msg in msg_gen: async for typ, msg in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower() topic = msg['symbol'].lower()
await send_chan.send({topic: msg}) await send_chan.send({topic: msg})
# last = time.time()
@tractor.context @tractor.context