Merge pull request #188 from pikers/web_utils

Web utils: no more reconnect bs
ems_hotfixes
goodboy 2021-05-28 14:34:08 -04:00 committed by GitHub
commit 0da02aa260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 243 additions and 241 deletions

View File

@ -18,36 +18,26 @@
Binance backend Binance backend
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager
from types import ModuleType
from typing import List, Dict, Any, Tuple, Union, Optional from typing import List, Dict, Any, Tuple, Union, Optional
import json
import time import time
import trio_websocket import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_websocket._impl import (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
import arrow import arrow
import asks import asks
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import trio
import tractor import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto
from .api import open_cached_client from .api import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
log = get_logger(__name__) log = get_logger(__name__)
@ -82,7 +72,6 @@ _ohlc_dtype = [
ohlc_dtype = np.dtype(_ohlc_dtype) ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = False _show_wap_in_history = False
_search_conf = {'pause_period': 0.0616}
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information # https://binance-docs.github.io/apidocs/spot/en/#exchange-information
@ -311,14 +300,15 @@ async def stream_messages(ws):
timeouts = 0 timeouts = 0
while True: while True:
with trio.move_on_after(5) as cs: with trio.move_on_after(3) as cs:
msg = await ws.recv_msg() msg = await ws.recv_msg()
if cs.cancelled_caught: if cs.cancelled_caught:
timeouts += 1 timeouts += 1
if timeouts > 10: if timeouts > 2:
raise trio.TooSlowError("binance feed seems down?") log.error("binance feed seems down and slow af? rebooting...")
await ws._connect()
continue continue
@ -378,93 +368,6 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
} }
class AutoReconWs:
"""Make ``trio_websocketw` sockets stay up no matter the bs.
"""
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
def __init__(
self,
url: str,
stack: AsyncExitStack,
serializer: ModuleType = json,
):
self.url = url
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
async def _connect(
self,
tries: int = 10000,
) -> None:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
last_err = None
for i in range(tries):
try:
self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url)
)
log.info(f'Connection success: {self.url}')
return
except self.recon_errors as err:
last_err = err
log.error(
f'{self} connection bail with '
f'{type(err)}...retry attempt {i}'
)
await trio.sleep(1)
continue
else:
log.exception('ws connection fail...')
raise last_err
async def send_msg(
self,
data: Any,
) -> None:
while True:
try:
return await self._ws.send_message(json.dumps(data))
except self.recon_errors:
await self._connect()
async def recv_msg(
self,
) -> Any:
while True:
try:
return json.loads(await self._ws.get_message())
except self.recon_errors:
await self._connect()
@asynccontextmanager
async def open_autorecon_ws(url):
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = AutoReconWs(url, stack)
await ws._connect()
try:
yield ws
finally:
await stack.aclose()
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
@ -527,8 +430,8 @@ async def stream_quotes(
}, },
} }
async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: @asynccontextmanager
async def subscribe(ws: wsproto.WSConnection):
# setup subs # setup subs
# trade data (aka L1) # trade data (aka L1)
@ -546,6 +449,28 @@ async def stream_quotes(
res = await ws.recv_msg() res = await ws.recv_msg()
assert res['id'] == uid assert res['id'] == uid
yield
subs = []
for sym in symbols:
subs.append("{sym}@aggTrade")
subs.append("{sym}@bookTicker")
# unsub from all pairs on teardown
await ws.send_msg({
"method": "UNSUBSCRIBE",
"params": subs,
"id": uid,
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
async with open_autorecon_ws(
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws)

View File

@ -18,37 +18,27 @@
Kraken backend. Kraken backend.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager
from dataclasses import asdict, field from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional
import json
import time import time
import trio_websocket
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_websocket._impl import ( import trio
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
import arrow import arrow
import asks import asks
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import trio
import tractor import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto
from .api import open_cached_client from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
log = get_logger(__name__) log = get_logger(__name__)
@ -57,11 +47,6 @@ log = get_logger(__name__)
_url = 'https://api.kraken.com/0' _url = 'https://api.kraken.com/0'
_search_conf = {
'pause_period': 0.0616
}
# Broker specific ohlc schema which includes a vwap field # Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [ _ohlc_dtype = [
('index', int), ('index', int),
@ -399,100 +384,6 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
class AutoReconWs:
"""Make ``trio_websocketw` sockets stay up no matter the bs.
TODO:
apply any more insights from this:
https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
"""
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
def __init__(
self,
url: str,
stack: AsyncExitStack,
serializer: ModuleType = json,
):
self.url = url
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
async def _connect(
self,
tries: int = 10000,
) -> None:
while True:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
else:
break
last_err = None
for i in range(tries):
try:
self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url)
)
log.info(f'Connection success: {self.url}')
return
except self.recon_errors as err:
last_err = err
log.error(
f'{self} connection bail with '
f'{type(err)}...retry attempt {i}'
)
await trio.sleep(1)
continue
else:
log.exception('ws connection fail...')
raise last_err
async def send_msg(
self,
data: Any,
) -> None:
while True:
try:
return await self._ws.send_message(json.dumps(data))
except self.recon_errors:
await self._connect()
async def recv_msg(
self,
) -> Any:
while True:
try:
return json.loads(await self._ws.get_message())
except self.recon_errors:
await self._connect()
@asynccontextmanager
async def open_autorecon_ws(url):
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = AutoReconWs(url, stack)
await ws._connect()
try:
yield ws
finally:
await stack.aclose()
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
@ -561,8 +452,8 @@ async def stream_quotes(
}, },
} }
async with open_autorecon_ws('wss://ws.kraken.com/') as ws: @asynccontextmanager
async def subscribe(ws: wsproto.WSConnection):
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client: # specific logic for this in kraken's shitty sync client:
@ -584,8 +475,28 @@ async def stream_quotes(
{'name': 'spread'} # 'depth': 10} {'name': 'spread'} # 'depth': 10}
) )
# pull a first quote and deliver
await ws.send_msg(l1_sub) await ws.send_msg(l1_sub)
yield
# unsub from all pairs on teardown
await ws.send_msg({
'pair': list(ws_pairs.values()),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reonnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
async with open_autorecon_ws(
'wss://ws.kraken.com/',
fixture=subscribe,
) as ws:
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws)

View File

@ -0,0 +1,142 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
ToOlS fOr CoPInG wITh "tHE wEB" protocols.
"""
from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType
from typing import Any, Callable
import json
import trio
import trio_websocket
from trio_websocket._impl import (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
from ..log import get_logger
log = get_logger(__name__)
class NoBsWs:
"""Make ``trio_websocket`` sockets stay up no matter the bs.
"""
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
def __init__(
self,
url: str,
stack: AsyncExitStack,
fixture: Callable,
serializer: ModuleType = json,
):
self.url = url
self.fixture = fixture
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
async def _connect(
self,
tries: int = 10000,
) -> None:
while True:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
else:
break
last_err = None
for i in range(tries):
try:
self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url)
)
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
assert ret is None
log.info(f'Connection success: {self.url}')
return self._ws
except self.recon_errors as err:
last_err = err
log.error(
f'{self} connection bail with '
f'{type(err)}...retry attempt {i}'
)
await trio.sleep(1)
continue
else:
log.exception('ws connection fail...')
raise last_err
async def send_msg(
self,
data: Any,
) -> None:
while True:
try:
return await self._ws.send_message(json.dumps(data))
except self.recon_errors:
await self._connect()
async def recv_msg(
self,
) -> Any:
while True:
try:
return json.loads(await self._ws.get_message())
except self.recon_errors:
await self._connect()
@asynccontextmanager
async def open_autorecon_ws(
url: str,
# TODO: proper type annot smh
fixture: Callable,
):
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = NoBsWs(url, stack, fixture=fixture)
await ws._connect()
try:
yield ws
finally:
await stack.aclose()

View File

@ -192,7 +192,6 @@ class ChartSpace(QtGui.QWidget):
loglevel, loglevel,
) )
self.set_chart_symbol(fqsn, linkedcharts) self.set_chart_symbol(fqsn, linkedcharts)
self.vbox.addWidget(linkedcharts) self.vbox.addWidget(linkedcharts)
@ -1000,7 +999,8 @@ async def test_bed(
i += 1 i += 1
_quote_throttle_rate: int = 60 # Hz _clear_throttle_rate: int = 60 # Hz
_book_throttle_rate: int = 20 # Hz
async def chart_from_quotes( async def chart_from_quotes(
@ -1077,17 +1077,12 @@ async def chart_from_quotes(
tick_size = chart._lc.symbol.tick_size tick_size = chart._lc.symbol.tick_size
tick_margin = 2 * tick_size tick_margin = 2 * tick_size
last = time.time() last_ask = last_bid = last_clear = time.time()
async for quotes in stream: async for quotes in stream:
for sym, quote in quotes.items():
now = time.time() now = time.time()
period = now - last
if period <= 1/_quote_throttle_rate - 0.001:
# faster then display refresh rate
# print(f'quote too fast: {1/period}')
continue
for sym, quote in quotes.items():
for tick in quote.get('ticks', ()): for tick in quote.get('ticks', ()):
# print(f"CHART: {quote['symbol']}: {tick}") # print(f"CHART: {quote['symbol']}: {tick}")
@ -1099,8 +1094,19 @@ async def chart_from_quotes(
# okkk.. # okkk..
continue continue
# clearing price event
if ticktype in ('trade', 'utrade', 'last'): if ticktype in ('trade', 'utrade', 'last'):
# throttle clearing price updates to ~ max 60 FPS
period = now - last_clear
if period <= 1/_clear_throttle_rate:
# faster then display refresh rate
continue
# print(f'passthrough {tick}\n{1/(now-last_clear)}')
# set time of last graphics update
last_clear = now
array = ohlcv.array array = ohlcv.array
# update price sticky(s) # update price sticky(s)
@ -1121,6 +1127,26 @@ async def chart_from_quotes(
chart.update_curve_from_array( chart.update_curve_from_array(
'bar_wap', ohlcv.array) 'bar_wap', ohlcv.array)
# l1 book events
# throttle the book graphics updates at a lower rate
# since they aren't as critical for a manual user
# viewing the chart
elif ticktype in ('ask', 'asize'):
if (now - last_ask) <= 1/_book_throttle_rate:
# print(f'skipping\n{tick}')
continue
# print(f'passthrough {tick}\n{1/(now-last_ask)}')
last_ask = now
elif ticktype in ('bid', 'bsize'):
if (now - last_bid) <= 1/_book_throttle_rate:
continue
# print(f'passthrough {tick}\n{1/(now-last_bid)}')
last_bid = now
# compute max and min trade values to display in view # compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see # TODO: we need a streaming minmax algorithm here, see
# def above. # def above.
@ -1133,6 +1159,7 @@ async def chart_from_quotes(
# XXX: prettty sure this is correct? # XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'): # if ticktype in ('trade', 'last'):
if ticktype in ('last',): # 'size'): if ticktype in ('last',): # 'size'):
label = { label = {
l1.ask_label.fields['level']: l1.ask_label, l1.ask_label.fields['level']: l1.ask_label,
l1.bid_label.fields['level']: l1.bid_label, l1.bid_label.fields['level']: l1.bid_label,
@ -1173,9 +1200,6 @@ async def chart_from_quotes(
last_mx, last_mn = mx, mn last_mx, last_mn = mx, mn
# set time of last graphics update
last = now
async def spawn_fsps( async def spawn_fsps(
linked_charts: LinkedSplitCharts, linked_charts: LinkedSplitCharts,
@ -1366,7 +1390,7 @@ async def run_fsp(
now = time.time() now = time.time()
period = now - last period = now - last
# if period <= 1/30: # if period <= 1/30:
if period <= 1/_quote_throttle_rate - 0.001: if period <= 1/_clear_throttle_rate - 0.001:
# faster then display refresh rate # faster then display refresh rate
# print(f'quote too fast: {1/period}') # print(f'quote too fast: {1/period}')
continue continue
@ -1641,12 +1665,12 @@ async def _async_main(
screen = current_screen() screen = current_screen()
# configure graphics update throttling based on display refresh rate # configure graphics update throttling based on display refresh rate
global _quote_throttle_rate global _clear_throttle_rate
_quote_throttle_rate = min( _clear_throttle_rate = min(
round(screen.refreshRate()), round(screen.refreshRate()),
_quote_throttle_rate, _clear_throttle_rate,
) )
log.info(f'Set graphics update rate to {_quote_throttle_rate} Hz') log.info(f'Set graphics update rate to {_clear_throttle_rate} Hz')
# configure global DPI aware font size # configure global DPI aware font size
_font.configure_to_dpi(screen) _font.configure_to_dpi(screen)