diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index c2de7ded..e8309cdb 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -18,36 +18,26 @@ Binance backend """ -from contextlib import asynccontextmanager, AsyncExitStack -from types import ModuleType +from contextlib import asynccontextmanager from typing import List, Dict, Any, Tuple, Union, Optional -import json import time -import trio_websocket +import trio from trio_typing import TaskStatus -from trio_websocket._impl import ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, -) - import arrow import asks from fuzzywuzzy import process as fuzzy import numpy as np -import trio import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel - +import wsproto from .api import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray +from ..data._web_bs import open_autorecon_ws log = get_logger(__name__) @@ -82,7 +72,6 @@ _ohlc_dtype = [ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = False -_search_conf = {'pause_period': 0.0616} # https://binance-docs.github.io/apidocs/spot/en/#exchange-information @@ -311,14 +300,15 @@ async def stream_messages(ws): timeouts = 0 while True: - with trio.move_on_after(5) as cs: + with trio.move_on_after(3) as cs: msg = await ws.recv_msg() if cs.cancelled_caught: timeouts += 1 - if timeouts > 10: - raise trio.TooSlowError("binance feed seems down?") + if timeouts > 2: + log.error("binance feed seems down and slow af? rebooting...") + await ws._connect() continue @@ -378,93 +368,6 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: } -class AutoReconWs: - """Make ``trio_websocketw` sockets stay up no matter the bs. - - """ - recon_errors = ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, - ) - - def __init__( - self, - url: str, - stack: AsyncExitStack, - serializer: ModuleType = json, - ): - self.url = url - self._stack = stack - self._ws: 'WebSocketConnection' = None # noqa - - async def _connect( - self, - tries: int = 10000, - ) -> None: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) - - last_err = None - for i in range(tries): - try: - self._ws = await self._stack.enter_async_context( - trio_websocket.open_websocket_url(self.url) - ) - log.info(f'Connection success: {self.url}') - return - except self.recon_errors as err: - last_err = err - log.error( - f'{self} connection bail with ' - f'{type(err)}...retry attempt {i}' - ) - await trio.sleep(1) - continue - else: - log.exception('ws connection fail...') - raise last_err - - async def send_msg( - self, - data: Any, - ) -> None: - while True: - try: - return await self._ws.send_message(json.dumps(data)) - except self.recon_errors: - await self._connect() - - async def recv_msg( - self, - ) -> Any: - while True: - try: - return json.loads(await self._ws.get_message()) - except self.recon_errors: - await self._connect() - - -@asynccontextmanager -async def open_autorecon_ws(url): - """Apparently we can QoS for all sorts of reasons..so catch em. - - """ - async with AsyncExitStack() as stack: - ws = AutoReconWs(url, stack) - - await ws._connect() - try: - yield ws - - finally: - await stack.aclose() - - async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa @@ -527,8 +430,8 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: - + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection): # setup subs # trade data (aka L1) @@ -546,6 +449,28 @@ async def stream_quotes( res = await ws.recv_msg() assert res['id'] == uid + yield + + subs = [] + for sym in symbols: + subs.append("{sym}@aggTrade") + subs.append("{sym}@bookTicker") + + # unsub from all pairs on teardown + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": uid, + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + async with open_autorecon_ws( + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws: + # pull a first quote and deliver msg_gen = stream_messages(ws) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 7e9afd5c..dde578e5 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -18,37 +18,27 @@ Kraken backend. """ -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import asynccontextmanager from dataclasses import asdict, field -from types import ModuleType from typing import List, Dict, Any, Tuple, Optional -import json import time -import trio_websocket from trio_typing import TaskStatus -from trio_websocket._impl import ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, -) - +import trio import arrow import asks from fuzzywuzzy import process as fuzzy import numpy as np -import trio import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel - +import wsproto from .api import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray +from ..data._web_bs import open_autorecon_ws log = get_logger(__name__) @@ -57,11 +47,6 @@ log = get_logger(__name__) _url = 'https://api.kraken.com/0' -_search_conf = { - 'pause_period': 0.0616 -} - - # Broker specific ohlc schema which includes a vwap field _ohlc_dtype = [ ('index', int), @@ -399,100 +384,6 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } -class AutoReconWs: - """Make ``trio_websocketw` sockets stay up no matter the bs. - - TODO: - apply any more insights from this: - https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds - - """ - recon_errors = ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, - ) - - def __init__( - self, - url: str, - stack: AsyncExitStack, - serializer: ModuleType = json, - ): - self.url = url - self._stack = stack - self._ws: 'WebSocketConnection' = None # noqa - - async def _connect( - self, - tries: int = 10000, - ) -> None: - while True: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) - else: - break - - last_err = None - for i in range(tries): - try: - self._ws = await self._stack.enter_async_context( - trio_websocket.open_websocket_url(self.url) - ) - log.info(f'Connection success: {self.url}') - return - except self.recon_errors as err: - last_err = err - log.error( - f'{self} connection bail with ' - f'{type(err)}...retry attempt {i}' - ) - await trio.sleep(1) - continue - else: - log.exception('ws connection fail...') - raise last_err - - async def send_msg( - self, - data: Any, - ) -> None: - while True: - try: - return await self._ws.send_message(json.dumps(data)) - except self.recon_errors: - await self._connect() - - async def recv_msg( - self, - ) -> Any: - while True: - try: - return json.loads(await self._ws.get_message()) - except self.recon_errors: - await self._connect() - - -@asynccontextmanager -async def open_autorecon_ws(url): - """Apparently we can QoS for all sorts of reasons..so catch em. - - """ - async with AsyncExitStack() as stack: - ws = AutoReconWs(url, stack) - - await ws._connect() - try: - yield ws - - finally: - await stack.aclose() - - async def backfill_bars( sym: str, @@ -561,8 +452,8 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://ws.kraken.com/') as ws: - + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe # specific logic for this in kraken's shitty sync client: @@ -584,8 +475,28 @@ async def stream_quotes( {'name': 'spread'} # 'depth': 10} ) + # pull a first quote and deliver await ws.send_msg(l1_sub) + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'pair': list(ws_pairs.values()), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # see the tips on reonnection logic: + # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + async with open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws: + # pull a first quote and deliver msg_gen = stream_messages(ws) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py new file mode 100644 index 00000000..543ac19f --- /dev/null +++ b/piker/data/_web_bs.py @@ -0,0 +1,142 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +ToOlS fOr CoPInG wITh "tHE wEB" protocols. + +""" +from contextlib import asynccontextmanager, AsyncExitStack +from types import ModuleType +from typing import Any, Callable +import json + +import trio +import trio_websocket +from trio_websocket._impl import ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, +) + +from ..log import get_logger + +log = get_logger(__name__) + + +class NoBsWs: + """Make ``trio_websocket`` sockets stay up no matter the bs. + + """ + recon_errors = ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, + ) + + def __init__( + self, + url: str, + stack: AsyncExitStack, + fixture: Callable, + serializer: ModuleType = json, + ): + self.url = url + self.fixture = fixture + self._stack = stack + self._ws: 'WebSocketConnection' = None # noqa + + async def _connect( + self, + tries: int = 10000, + ) -> None: + while True: + try: + await self._stack.aclose() + except (DisconnectionTimeout, RuntimeError): + await trio.sleep(1) + else: + break + + last_err = None + for i in range(tries): + try: + self._ws = await self._stack.enter_async_context( + trio_websocket.open_websocket_url(self.url) + ) + # rerun user code fixture + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + assert ret is None + + log.info(f'Connection success: {self.url}') + return self._ws + + except self.recon_errors as err: + last_err = err + log.error( + f'{self} connection bail with ' + f'{type(err)}...retry attempt {i}' + ) + await trio.sleep(1) + continue + else: + log.exception('ws connection fail...') + raise last_err + + async def send_msg( + self, + data: Any, + ) -> None: + while True: + try: + return await self._ws.send_message(json.dumps(data)) + except self.recon_errors: + await self._connect() + + async def recv_msg( + self, + ) -> Any: + while True: + try: + return json.loads(await self._ws.get_message()) + except self.recon_errors: + await self._connect() + + +@asynccontextmanager +async def open_autorecon_ws( + url: str, + + # TODO: proper type annot smh + fixture: Callable, +): + """Apparently we can QoS for all sorts of reasons..so catch em. + + """ + async with AsyncExitStack() as stack: + ws = NoBsWs(url, stack, fixture=fixture) + await ws._connect() + + try: + yield ws + + finally: + await stack.aclose() diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index b92e4aa5..a55202f3 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -192,7 +192,6 @@ class ChartSpace(QtGui.QWidget): loglevel, ) - self.set_chart_symbol(fqsn, linkedcharts) self.vbox.addWidget(linkedcharts) @@ -1000,7 +999,8 @@ async def test_bed( i += 1 -_quote_throttle_rate: int = 60 # Hz +_clear_throttle_rate: int = 60 # Hz +_book_throttle_rate: int = 20 # Hz async def chart_from_quotes( @@ -1077,17 +1077,12 @@ async def chart_from_quotes( tick_size = chart._lc.symbol.tick_size tick_margin = 2 * tick_size - last = time.time() + last_ask = last_bid = last_clear = time.time() async for quotes in stream: - - now = time.time() - period = now - last - if period <= 1/_quote_throttle_rate - 0.001: - # faster then display refresh rate - # print(f'quote too fast: {1/period}') - continue - for sym, quote in quotes.items(): + + now = time.time() + for tick in quote.get('ticks', ()): # print(f"CHART: {quote['symbol']}: {tick}") @@ -1099,8 +1094,19 @@ async def chart_from_quotes( # okkk.. continue + # clearing price event if ticktype in ('trade', 'utrade', 'last'): + # throttle clearing price updates to ~ max 60 FPS + period = now - last_clear + if period <= 1/_clear_throttle_rate: + # faster then display refresh rate + continue + + # print(f'passthrough {tick}\n{1/(now-last_clear)}') + # set time of last graphics update + last_clear = now + array = ohlcv.array # update price sticky(s) @@ -1121,6 +1127,26 @@ async def chart_from_quotes( chart.update_curve_from_array( 'bar_wap', ohlcv.array) + # l1 book events + # throttle the book graphics updates at a lower rate + # since they aren't as critical for a manual user + # viewing the chart + + elif ticktype in ('ask', 'asize'): + if (now - last_ask) <= 1/_book_throttle_rate: + # print(f'skipping\n{tick}') + continue + + # print(f'passthrough {tick}\n{1/(now-last_ask)}') + last_ask = now + + elif ticktype in ('bid', 'bsize'): + if (now - last_bid) <= 1/_book_throttle_rate: + continue + + # print(f'passthrough {tick}\n{1/(now-last_bid)}') + last_bid = now + # compute max and min trade values to display in view # TODO: we need a streaming minmax algorithm here, see # def above. @@ -1133,6 +1159,7 @@ async def chart_from_quotes( # XXX: prettty sure this is correct? # if ticktype in ('trade', 'last'): if ticktype in ('last',): # 'size'): + label = { l1.ask_label.fields['level']: l1.ask_label, l1.bid_label.fields['level']: l1.bid_label, @@ -1173,9 +1200,6 @@ async def chart_from_quotes( last_mx, last_mn = mx, mn - # set time of last graphics update - last = now - async def spawn_fsps( linked_charts: LinkedSplitCharts, @@ -1366,7 +1390,7 @@ async def run_fsp( now = time.time() period = now - last # if period <= 1/30: - if period <= 1/_quote_throttle_rate - 0.001: + if period <= 1/_clear_throttle_rate - 0.001: # faster then display refresh rate # print(f'quote too fast: {1/period}') continue @@ -1641,12 +1665,12 @@ async def _async_main( screen = current_screen() # configure graphics update throttling based on display refresh rate - global _quote_throttle_rate - _quote_throttle_rate = min( + global _clear_throttle_rate + _clear_throttle_rate = min( round(screen.refreshRate()), - _quote_throttle_rate, + _clear_throttle_rate, ) - log.info(f'Set graphics update rate to {_quote_throttle_rate} Hz') + log.info(f'Set graphics update rate to {_clear_throttle_rate} Hz') # configure global DPI aware font size _font.configure_to_dpi(screen)