Update some typing and add latency checks for binance
							parent
							
								
									c677ff47a4
								
							
						
					
					
						commit
						eb05c78381
					
				| 
						 | 
				
			
			@ -19,7 +19,7 @@ Binance backend
 | 
			
		|||
 | 
			
		||||
"""
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Union, Optional
 | 
			
		||||
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
| 
						 | 
				
			
			@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
 | 
			
		|||
from ._util import resproc, SymbolNotFound
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from ..data import ShmArray
 | 
			
		||||
from ..data._web_bs import open_autorecon_ws
 | 
			
		||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -213,7 +213,7 @@ class Client:
 | 
			
		|||
        )
 | 
			
		||||
        # repack in dict form
 | 
			
		||||
        return {item[0]['symbol']: item[0]
 | 
			
		||||
         for item in matches}
 | 
			
		||||
                for item in matches}
 | 
			
		||||
 | 
			
		||||
    async def bars(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			@ -295,7 +295,7 @@ class AggTrade(BaseModel):
 | 
			
		|||
    M: bool  # Ignore
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_messages(ws):
 | 
			
		||||
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
 | 
			
		||||
 | 
			
		||||
    timeouts = 0
 | 
			
		||||
    while True:
 | 
			
		||||
| 
						 | 
				
			
			@ -487,11 +487,20 @@ async def stream_quotes(
 | 
			
		|||
            # signal to caller feed is ready for consumption
 | 
			
		||||
            feed_is_live.set()
 | 
			
		||||
 | 
			
		||||
            # import time
 | 
			
		||||
            # last = time.time()
 | 
			
		||||
 | 
			
		||||
            # start streaming
 | 
			
		||||
            async for typ, msg in msg_gen:
 | 
			
		||||
 | 
			
		||||
                # period = time.time() - last
 | 
			
		||||
                # hz = 1/period if period else float('inf')
 | 
			
		||||
                # if hz > 60:
 | 
			
		||||
                #     log.info(f'Binance quotez : {hz}')
 | 
			
		||||
 | 
			
		||||
                topic = msg['symbol'].lower()
 | 
			
		||||
                await send_chan.send({topic: msg})
 | 
			
		||||
                # last = time.time()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue