binance: port to new `NoBsWs` api and drop `trio_util` usage

rekt_pps
Tyler Goodlet 2023-04-21 13:48:18 -04:00
parent 59743b7b73
commit b03564da2c
1 changed files with 8 additions and 27 deletions

View File

@ -21,7 +21,10 @@
Binance backend Binance backend
""" """
from contextlib import asynccontextmanager as acm from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from datetime import datetime from datetime import datetime
# from functools import lru_cache # from functools import lru_cache
from decimal import Decimal from decimal import Decimal
@ -31,7 +34,6 @@ from typing import (
) )
import time import time
from trio_util import trio_async_generator
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum import pendulum
@ -39,7 +41,6 @@ import asks
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
import wsproto
from .._cacheables import async_lifo_cache from .._cacheables import async_lifo_cache
from ..accounting._mktinfo import ( from ..accounting._mktinfo import (
@ -357,36 +358,16 @@ class AggTrade(Struct):
M: bool # Ignore M: bool # Ignore
@trio_async_generator
async def stream_messages( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]: ) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 # TODO: match syntax here!
while True: async for msg in ws:
with trio.move_on_after(3) as cs:
msg = await ws.recv_msg()
if cs.cancelled_caught:
timeouts += 1
if timeouts > 2:
log.error("binance feed seems down and slow af? rebooting...")
try:
await ws._connect()
except BaseException as err:
assert err
# Wut in the f#@$% is going on here.
with trio.CancelScope(shield=True):
await tractor.breakpoint()
continue
# for l1 streams binance doesn't add an event type field so # for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys # identify those messages by matching keys
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
if msg.get('u'): if msg.get('u'):
sym = msg['s'] sym = msg['s']
bid = float(msg['b']) bid = float(msg['b'])
@ -545,7 +526,7 @@ async def stream_quotes(
) )
@acm @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: NoBsWs):
# setup subs # setup subs
# trade data (aka L1) # trade data (aka L1)
@ -591,7 +572,7 @@ async def stream_quotes(
) as ws, ) as ws,
# avoid stream-gen closure from breaking trio.. # avoid stream-gen closure from breaking trio..
stream_messages(ws) as msg_gen, aclosing(stream_messages(ws)) as msg_gen,
): ):
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)