Port binance and kraken to "reliable" ws API

web_utils
Tyler Goodlet 2021-05-27 17:14:04 -04:00
parent 89dc3dde61
commit d0e3f5a51c
2 changed files with 54 additions and 213 deletions

View File

@ -18,36 +18,26 @@
Binance backend Binance backend
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager
from types import ModuleType
from typing import List, Dict, Any, Tuple, Union, Optional from typing import List, Dict, Any, Tuple, Union, Optional
import json
import time import time
import trio_websocket import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_websocket._impl import (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
import arrow import arrow
import asks import asks
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import trio
import tractor import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto
from .api import open_cached_client from .api import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
log = get_logger(__name__) log = get_logger(__name__)
@ -378,93 +368,6 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
} }
class AutoReconWs:
"""Make ``trio_websocketw` sockets stay up no matter the bs.
"""
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
def __init__(
self,
url: str,
stack: AsyncExitStack,
serializer: ModuleType = json,
):
self.url = url
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
async def _connect(
self,
tries: int = 10000,
) -> None:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
last_err = None
for i in range(tries):
try:
self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url)
)
log.info(f'Connection success: {self.url}')
return
except self.recon_errors as err:
last_err = err
log.error(
f'{self} connection bail with '
f'{type(err)}...retry attempt {i}'
)
await trio.sleep(1)
continue
else:
log.exception('ws connection fail...')
raise last_err
async def send_msg(
self,
data: Any,
) -> None:
while True:
try:
return await self._ws.send_message(json.dumps(data))
except self.recon_errors:
await self._connect()
async def recv_msg(
self,
) -> Any:
while True:
try:
return json.loads(await self._ws.get_message())
except self.recon_errors:
await self._connect()
@asynccontextmanager
async def open_autorecon_ws(url):
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = AutoReconWs(url, stack)
await ws._connect()
try:
yield ws
finally:
await stack.aclose()
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
@ -527,8 +430,8 @@ async def stream_quotes(
}, },
} }
async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: @asynccontextmanager
async def subscribe(ws: wsproto.WSConnection):
# setup subs # setup subs
# trade data (aka L1) # trade data (aka L1)
@ -546,6 +449,28 @@ async def stream_quotes(
res = await ws.recv_msg() res = await ws.recv_msg()
assert res['id'] == uid assert res['id'] == uid
yield
subs = []
for sym in symbols:
subs.append("{sym}@aggTrade")
subs.append("{sym}@bookTicker")
# unsub from all pairs on teardown
await ws.send_msg({
"method": "UNSUBSCRIBE",
"params": subs,
"id": uid,
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
async with open_autorecon_ws(
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws)

View File

@ -18,37 +18,27 @@
Kraken backend. Kraken backend.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager
from dataclasses import asdict, field from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional
import json
import time import time
import trio_websocket
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_websocket._impl import ( import trio
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
import arrow import arrow
import asks import asks
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import trio
import tractor import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto
from .api import open_cached_client from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
log = get_logger(__name__) log = get_logger(__name__)
@ -399,100 +389,6 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
class AutoReconWs:
"""Make ``trio_websocketw` sockets stay up no matter the bs.
TODO:
apply any more insights from this:
https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
"""
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
def __init__(
self,
url: str,
stack: AsyncExitStack,
serializer: ModuleType = json,
):
self.url = url
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
async def _connect(
self,
tries: int = 10000,
) -> None:
while True:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
else:
break
last_err = None
for i in range(tries):
try:
self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url)
)
log.info(f'Connection success: {self.url}')
return
except self.recon_errors as err:
last_err = err
log.error(
f'{self} connection bail with '
f'{type(err)}...retry attempt {i}'
)
await trio.sleep(1)
continue
else:
log.exception('ws connection fail...')
raise last_err
async def send_msg(
self,
data: Any,
) -> None:
while True:
try:
return await self._ws.send_message(json.dumps(data))
except self.recon_errors:
await self._connect()
async def recv_msg(
self,
) -> Any:
while True:
try:
return json.loads(await self._ws.get_message())
except self.recon_errors:
await self._connect()
@asynccontextmanager
async def open_autorecon_ws(url):
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = AutoReconWs(url, stack)
await ws._connect()
try:
yield ws
finally:
await stack.aclose()
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
@ -561,8 +457,8 @@ async def stream_quotes(
}, },
} }
async with open_autorecon_ws('wss://ws.kraken.com/') as ws: @asynccontextmanager
async def subscribe(ws: wsproto.WSConnection):
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client: # specific logic for this in kraken's shitty sync client:
@ -584,8 +480,28 @@ async def stream_quotes(
{'name': 'spread'} # 'depth': 10} {'name': 'spread'} # 'depth': 10}
) )
# pull a first quote and deliver
await ws.send_msg(l1_sub) await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'pair': ws_pairs.values(),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reonnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
async with open_autorecon_ws(
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws)