From 89dc3dde617f7555dc1945e88c2f627198d2d513 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 May 2021 17:13:20 -0400 Subject: [PATCH 1/8] Move no bs websocket api into its own data module --- piker/data/_web_bs.py | 142 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 piker/data/_web_bs.py diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py new file mode 100644 index 00000000..543ac19f --- /dev/null +++ b/piker/data/_web_bs.py @@ -0,0 +1,142 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +ToOlS fOr CoPInG wITh "tHE wEB" protocols. + +""" +from contextlib import asynccontextmanager, AsyncExitStack +from types import ModuleType +from typing import Any, Callable +import json + +import trio +import trio_websocket +from trio_websocket._impl import ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, +) + +from ..log import get_logger + +log = get_logger(__name__) + + +class NoBsWs: + """Make ``trio_websocket`` sockets stay up no matter the bs. + + """ + recon_errors = ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, + ) + + def __init__( + self, + url: str, + stack: AsyncExitStack, + fixture: Callable, + serializer: ModuleType = json, + ): + self.url = url + self.fixture = fixture + self._stack = stack + self._ws: 'WebSocketConnection' = None # noqa + + async def _connect( + self, + tries: int = 10000, + ) -> None: + while True: + try: + await self._stack.aclose() + except (DisconnectionTimeout, RuntimeError): + await trio.sleep(1) + else: + break + + last_err = None + for i in range(tries): + try: + self._ws = await self._stack.enter_async_context( + trio_websocket.open_websocket_url(self.url) + ) + # rerun user code fixture + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + assert ret is None + + log.info(f'Connection success: {self.url}') + return self._ws + + except self.recon_errors as err: + last_err = err + log.error( + f'{self} connection bail with ' + f'{type(err)}...retry attempt {i}' + ) + await trio.sleep(1) + continue + else: + log.exception('ws connection fail...') + raise last_err + + async def send_msg( + self, + data: Any, + ) -> None: + while True: + try: + return await self._ws.send_message(json.dumps(data)) + except self.recon_errors: + await self._connect() + + async def recv_msg( + self, + ) -> Any: + while True: + try: + return json.loads(await self._ws.get_message()) + except self.recon_errors: + await self._connect() + + +@asynccontextmanager +async def open_autorecon_ws( + url: str, + + # TODO: proper type annot smh + fixture: Callable, +): + """Apparently we can QoS for all sorts of reasons..so catch em. + + """ + async with AsyncExitStack() as stack: + ws = NoBsWs(url, stack, fixture=fixture) + await ws._connect() + + try: + yield ws + + finally: + await stack.aclose() From d0e3f5a51c43733c3a5d962da8065b10185585dc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 May 2021 17:14:04 -0400 Subject: [PATCH 2/8] Port binance and kraken to "reliable" ws API --- piker/brokers/binance.py | 131 ++++++++----------------------------- piker/brokers/kraken.py | 136 ++++++++------------------------------- 2 files changed, 54 insertions(+), 213 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index c2de7ded..655800dc 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -18,36 +18,26 @@ Binance backend """ -from contextlib import asynccontextmanager, AsyncExitStack -from types import ModuleType +from contextlib import asynccontextmanager from typing import List, Dict, Any, Tuple, Union, Optional -import json import time -import trio_websocket +import trio from trio_typing import TaskStatus -from trio_websocket._impl import ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, -) - import arrow import asks from fuzzywuzzy import process as fuzzy import numpy as np -import trio import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel - +import wsproto from .api import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray +from ..data._web_bs import open_autorecon_ws log = get_logger(__name__) @@ -378,93 +368,6 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: } -class AutoReconWs: - """Make ``trio_websocketw` sockets stay up no matter the bs. - - """ - recon_errors = ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, - ) - - def __init__( - self, - url: str, - stack: AsyncExitStack, - serializer: ModuleType = json, - ): - self.url = url - self._stack = stack - self._ws: 'WebSocketConnection' = None # noqa - - async def _connect( - self, - tries: int = 10000, - ) -> None: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) - - last_err = None - for i in range(tries): - try: - self._ws = await self._stack.enter_async_context( - trio_websocket.open_websocket_url(self.url) - ) - log.info(f'Connection success: {self.url}') - return - except self.recon_errors as err: - last_err = err - log.error( - f'{self} connection bail with ' - f'{type(err)}...retry attempt {i}' - ) - await trio.sleep(1) - continue - else: - log.exception('ws connection fail...') - raise last_err - - async def send_msg( - self, - data: Any, - ) -> None: - while True: - try: - return await self._ws.send_message(json.dumps(data)) - except self.recon_errors: - await self._connect() - - async def recv_msg( - self, - ) -> Any: - while True: - try: - return json.loads(await self._ws.get_message()) - except self.recon_errors: - await self._connect() - - -@asynccontextmanager -async def open_autorecon_ws(url): - """Apparently we can QoS for all sorts of reasons..so catch em. - - """ - async with AsyncExitStack() as stack: - ws = AutoReconWs(url, stack) - - await ws._connect() - try: - yield ws - - finally: - await stack.aclose() - - async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa @@ -527,8 +430,8 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: - + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection): # setup subs # trade data (aka L1) @@ -546,6 +449,28 @@ async def stream_quotes( res = await ws.recv_msg() assert res['id'] == uid + yield + + subs = [] + for sym in symbols: + subs.append("{sym}@aggTrade") + subs.append("{sym}@bookTicker") + + # unsub from all pairs on teardown + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": uid, + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + async with open_autorecon_ws( + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws: + # pull a first quote and deliver msg_gen = stream_messages(ws) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 7e9afd5c..bb543936 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -18,37 +18,27 @@ Kraken backend. """ -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import asynccontextmanager from dataclasses import asdict, field -from types import ModuleType from typing import List, Dict, Any, Tuple, Optional -import json import time -import trio_websocket from trio_typing import TaskStatus -from trio_websocket._impl import ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, -) - +import trio import arrow import asks from fuzzywuzzy import process as fuzzy import numpy as np -import trio import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel - +import wsproto from .api import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray +from ..data._web_bs import open_autorecon_ws log = get_logger(__name__) @@ -399,100 +389,6 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } -class AutoReconWs: - """Make ``trio_websocketw` sockets stay up no matter the bs. - - TODO: - apply any more insights from this: - https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds - - """ - recon_errors = ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, - ) - - def __init__( - self, - url: str, - stack: AsyncExitStack, - serializer: ModuleType = json, - ): - self.url = url - self._stack = stack - self._ws: 'WebSocketConnection' = None # noqa - - async def _connect( - self, - tries: int = 10000, - ) -> None: - while True: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) - else: - break - - last_err = None - for i in range(tries): - try: - self._ws = await self._stack.enter_async_context( - trio_websocket.open_websocket_url(self.url) - ) - log.info(f'Connection success: {self.url}') - return - except self.recon_errors as err: - last_err = err - log.error( - f'{self} connection bail with ' - f'{type(err)}...retry attempt {i}' - ) - await trio.sleep(1) - continue - else: - log.exception('ws connection fail...') - raise last_err - - async def send_msg( - self, - data: Any, - ) -> None: - while True: - try: - return await self._ws.send_message(json.dumps(data)) - except self.recon_errors: - await self._connect() - - async def recv_msg( - self, - ) -> Any: - while True: - try: - return json.loads(await self._ws.get_message()) - except self.recon_errors: - await self._connect() - - -@asynccontextmanager -async def open_autorecon_ws(url): - """Apparently we can QoS for all sorts of reasons..so catch em. - - """ - async with AsyncExitStack() as stack: - ws = AutoReconWs(url, stack) - - await ws._connect() - try: - yield ws - - finally: - await stack.aclose() - - async def backfill_bars( sym: str, @@ -561,8 +457,8 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://ws.kraken.com/') as ws: - + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe # specific logic for this in kraken's shitty sync client: @@ -584,8 +480,28 @@ async def stream_quotes( {'name': 'spread'} # 'depth': 10} ) + # pull a first quote and deliver await ws.send_msg(l1_sub) + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'pair': ws_pairs.values(), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # see the tips on reonnection logic: + # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + async with open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws: + # pull a first quote and deliver msg_gen = stream_messages(ws) From 1a7b06c147aab9bf783f34037f5faa6d357b387b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 May 2021 17:20:26 -0400 Subject: [PATCH 3/8] Limit l1 graphics updates to half the refresh rate --- piker/ui/_chart.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index b92e4aa5..b309c4a5 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -192,7 +192,6 @@ class ChartSpace(QtGui.QWidget): loglevel, ) - self.set_chart_symbol(fqsn, linkedcharts) self.vbox.addWidget(linkedcharts) @@ -1079,13 +1078,8 @@ async def chart_from_quotes( last = time.time() async for quotes in stream: - now = time.time() period = now - last - if period <= 1/_quote_throttle_rate - 0.001: - # faster then display refresh rate - # print(f'quote too fast: {1/period}') - continue for sym, quote in quotes.items(): for tick in quote.get('ticks', ()): @@ -1101,6 +1095,12 @@ async def chart_from_quotes( if ticktype in ('trade', 'utrade', 'last'): + # throttle clearing price updates to ~ 60 FPS + if period <= 1/_quote_throttle_rate - 0.001: + # faster then display refresh rate + # print(f'quote too fast: {1/period}') + continue + array = ohlcv.array # update price sticky(s) @@ -1121,6 +1121,19 @@ async def chart_from_quotes( chart.update_curve_from_array( 'bar_wap', ohlcv.array) + else: # non-clearing tick + + now = time.time() + period = now - last + + # throttle the book graphics updates at a lower rate + # since they aren't as critical for a manual user + # viewing the chart + + if period <= 1/_quote_throttle_rate/2: + # print(f'skipping\n{tick}') + continue + # compute max and min trade values to display in view # TODO: we need a streaming minmax algorithm here, see # def above. @@ -1133,6 +1146,7 @@ async def chart_from_quotes( # XXX: prettty sure this is correct? # if ticktype in ('trade', 'last'): if ticktype in ('last',): # 'size'): + label = { l1.ask_label.fields['level']: l1.ask_label, l1.bid_label.fields['level']: l1.bid_label, From 19711bf024139db1214cbbda8ad99334e6aca4c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 May 2021 17:30:30 -0400 Subject: [PATCH 4/8] Reconnect slow binance conns instead of error --- piker/brokers/binance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 655800dc..a3731b9b 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -308,7 +308,8 @@ async def stream_messages(ws): timeouts += 1 if timeouts > 10: - raise trio.TooSlowError("binance feed seems down?") + log.error("binance feed seems down and slow af? rebooting...") + await ws._connect() continue From ff856a6ee35ced8d0c8c6dda6bd0b23dd98515e0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 May 2021 18:02:27 -0400 Subject: [PATCH 5/8] Woops, make unsub pairs a list --- piker/brokers/kraken.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index bb543936..e6e54ee7 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -487,7 +487,7 @@ async def stream_quotes( # unsub from all pairs on teardown await ws.send_msg({ - 'pair': ws_pairs.values(), + 'pair': list(ws_pairs.values()), 'event': 'unsubscribe', 'subscription': ['ohlc', 'spread'], }) From b689adaeb0ca28951dc249658bf1411f4c060030 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 May 2021 22:07:55 -0400 Subject: [PATCH 6/8] Throttle L1 label updates at a lower rate --- piker/ui/_chart.py | 60 +++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index b309c4a5..a55202f3 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -999,7 +999,8 @@ async def test_bed( i += 1 -_quote_throttle_rate: int = 60 # Hz +_clear_throttle_rate: int = 60 # Hz +_book_throttle_rate: int = 20 # Hz async def chart_from_quotes( @@ -1076,12 +1077,12 @@ async def chart_from_quotes( tick_size = chart._lc.symbol.tick_size tick_margin = 2 * tick_size - last = time.time() + last_ask = last_bid = last_clear = time.time() async for quotes in stream: - now = time.time() - period = now - last - for sym, quote in quotes.items(): + + now = time.time() + for tick in quote.get('ticks', ()): # print(f"CHART: {quote['symbol']}: {tick}") @@ -1093,14 +1094,19 @@ async def chart_from_quotes( # okkk.. continue + # clearing price event if ticktype in ('trade', 'utrade', 'last'): - # throttle clearing price updates to ~ 60 FPS - if period <= 1/_quote_throttle_rate - 0.001: + # throttle clearing price updates to ~ max 60 FPS + period = now - last_clear + if period <= 1/_clear_throttle_rate: # faster then display refresh rate - # print(f'quote too fast: {1/period}') continue + # print(f'passthrough {tick}\n{1/(now-last_clear)}') + # set time of last graphics update + last_clear = now + array = ohlcv.array # update price sticky(s) @@ -1121,19 +1127,26 @@ async def chart_from_quotes( chart.update_curve_from_array( 'bar_wap', ohlcv.array) - else: # non-clearing tick + # l1 book events + # throttle the book graphics updates at a lower rate + # since they aren't as critical for a manual user + # viewing the chart - now = time.time() - period = now - last - - # throttle the book graphics updates at a lower rate - # since they aren't as critical for a manual user - # viewing the chart - - if period <= 1/_quote_throttle_rate/2: + elif ticktype in ('ask', 'asize'): + if (now - last_ask) <= 1/_book_throttle_rate: # print(f'skipping\n{tick}') continue + # print(f'passthrough {tick}\n{1/(now-last_ask)}') + last_ask = now + + elif ticktype in ('bid', 'bsize'): + if (now - last_bid) <= 1/_book_throttle_rate: + continue + + # print(f'passthrough {tick}\n{1/(now-last_bid)}') + last_bid = now + # compute max and min trade values to display in view # TODO: we need a streaming minmax algorithm here, see # def above. @@ -1187,9 +1200,6 @@ async def chart_from_quotes( last_mx, last_mn = mx, mn - # set time of last graphics update - last = now - async def spawn_fsps( linked_charts: LinkedSplitCharts, @@ -1380,7 +1390,7 @@ async def run_fsp( now = time.time() period = now - last # if period <= 1/30: - if period <= 1/_quote_throttle_rate - 0.001: + if period <= 1/_clear_throttle_rate - 0.001: # faster then display refresh rate # print(f'quote too fast: {1/period}') continue @@ -1655,12 +1665,12 @@ async def _async_main( screen = current_screen() # configure graphics update throttling based on display refresh rate - global _quote_throttle_rate - _quote_throttle_rate = min( + global _clear_throttle_rate + _clear_throttle_rate = min( round(screen.refreshRate()), - _quote_throttle_rate, + _clear_throttle_rate, ) - log.info(f'Set graphics update rate to {_quote_throttle_rate} Hz') + log.info(f'Set graphics update rate to {_clear_throttle_rate} Hz') # configure global DPI aware font size _font.configure_to_dpi(screen) From 51a8308105bdb9ba1e35f8b9b5df365e4aacd851 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 May 2021 09:38:57 -0400 Subject: [PATCH 7/8] Lower connection timeout duration --- piker/brokers/binance.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index a3731b9b..e8309cdb 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -72,7 +72,6 @@ _ohlc_dtype = [ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = False -_search_conf = {'pause_period': 0.0616} # https://binance-docs.github.io/apidocs/spot/en/#exchange-information @@ -301,13 +300,13 @@ async def stream_messages(ws): timeouts = 0 while True: - with trio.move_on_after(5) as cs: + with trio.move_on_after(3) as cs: msg = await ws.recv_msg() if cs.cancelled_caught: timeouts += 1 - if timeouts > 10: + if timeouts > 2: log.error("binance feed seems down and slow af? rebooting...") await ws._connect() From 9792b9aa7d020277aa1c107269ac62f5d56ac4b5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 May 2021 09:39:17 -0400 Subject: [PATCH 8/8] Drop search pause config; use default --- piker/brokers/kraken.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index e6e54ee7..dde578e5 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -47,11 +47,6 @@ log = get_logger(__name__) _url = 'https://api.kraken.com/0' -_search_conf = { - 'pause_period': 0.0616 -} - - # Broker specific ohlc schema which includes a vwap field _ohlc_dtype = [ ('index', int),