From 9197e6decb35c08f5ff6515f816bd45087716cbb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Feb 2023 12:46:29 -0500 Subject: [PATCH 1/4] `binance`: use built-in `anext()` add note about new ws ep URL --- piker/brokers/binance.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 0c177db8..46da1f37 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -531,17 +531,19 @@ async def stream_quotes( async with open_autorecon_ws( 'wss://stream.binance.com/ws', + # XXX: see api docs which show diff addr? + # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information + # 'wss://ws-api.binance.com:443/ws-api/v3', fixture=subscribe, ) as ws: # pull a first quote and deliver msg_gen = stream_messages(ws) - typ, quote = await msg_gen.__anext__() + typ, quote = await anext(msg_gen) while typ != 'trade': - # TODO: use ``anext()`` when it lands in 3.10! - typ, quote = await msg_gen.__anext__() + typ, quote = await anext(msg_gen) task_status.started((init_msgs, quote)) From 973e4b5f445399348654d4729bb723e34b5352ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 13 Mar 2023 12:29:17 -0400 Subject: [PATCH 2/4] `binance`: wrap streamer async-gen in `aclosing()` --- piker/brokers/binance.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 46da1f37..9bb35b00 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,6 +26,7 @@ from typing import ( ) import time +from async_generator import aclosing import trio from trio_typing import TaskStatus import pendulum @@ -529,19 +530,21 @@ async def stream_quotes( # XXX: do we need to ack the unsub? # await ws.recv_msg() - async with open_autorecon_ws( - 'wss://stream.binance.com/ws', - # XXX: see api docs which show diff addr? - # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information - # 'wss://ws-api.binance.com:443/ws-api/v3', - fixture=subscribe, - ) as ws: - - # pull a first quote and deliver - msg_gen = stream_messages(ws) + async with ( + open_autorecon_ws( + # XXX: see api docs which show diff addr? + # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information + # 'wss://ws-api.binance.com:443/ws-api/v3', + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws, + # avoid stream-gen closure from breaking trio.. + aclosing(stream_messages(ws)) as msg_gen, + ): typ, quote = await anext(msg_gen) + # pull a first quote and deliver while typ != 'trade': typ, quote = await anext(msg_gen) From 78eb784091508b4075b0cda54ee30792f27e606e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Mar 2023 09:53:05 -0500 Subject: [PATCH 3/4] Stick `try:` outside all `xdotool` subproc calls --- piker/brokers/ib/_util.py | 80 ++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index d6491ee7..14fd4d0b 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -24,6 +24,10 @@ import subprocess import tractor +from piker.log import get_logger + +log = get_logger(__name__) + _reset_tech: Literal[ 'vnc', @@ -134,54 +138,54 @@ def i3ipc_xdotool_manual_click_hack() -> None: # 'IB', # gw running in i3 (newer version?) ] - for name in win_names: - results = t.find_titled(name) - print(f'results for {name}: {results}') - if results: - con = results[0] - print(f'Resetting data feed for {name}') - win_id = str(con.window) - w, h = con.rect.width, con.rect.height + try: + for name in win_names: + results = t.find_titled(name) + print(f'results for {name}: {results}') + if results: + con = results[0] + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height - # TODO: seems to be a few libs for python but not sure - # if they support all the sub commands we need, order of - # most recent commit history: - # https://github.com/rr-/pyxdotool - # https://github.com/ShaneHutter/pyxdotool - # https://github.com/cphyc/pyxdotool + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool - # TODO: only run the reconnect (2nd) kc on a detected - # disconnect? - for key_combo, timeout in [ - # only required if we need a connection reset. - # ('ctrl+alt+r', 12), - # data feed reset. - ('ctrl+alt+f', 6) - ]: - subprocess.call([ - 'xdotool', - 'windowactivate', '--sync', win_id, + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + for key_combo, timeout in [ + # only required if we need a connection reset. + # ('ctrl+alt+r', 12), + # data feed reset. + ('ctrl+alt+f', 6) + ]: + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', win_id, - # move mouse to bottom left of window (where there should - # be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + # move mouse to bottom left of window (where + # there should be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), - # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, - '--repeat', '3', '1', + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', - # hackzorzes - 'key', key_combo, - ], - timeout=timeout, - ) + # hackzorzes + 'key', key_combo, + ], + timeout=timeout, + ) # re-activate and focus original window - try: subprocess.call([ 'xdotool', 'windowactivate', '--sync', str(orig_win_id), 'click', '--window', str(orig_win_id), '1', ]) except subprocess.TimeoutExpired: - log.exception(f'xdotool timed out?') + log.exception('xdotool timed out?') From 609b91e848740ab92a41a2666d1ca2c2f3c9ef9a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 17 Mar 2023 20:21:19 -0400 Subject: [PATCH 4/4] Try out `@trio_util.async_generator` for streaming Apparently it will likely fix our `trio`-cancel-scopes-corrupted crash when we try to let our `._web_bs.NoBsWs` do reconnect logic around the asyn-generator implemented data-feed streaming routines in `binance` and `kraken`. See the project docs for deatz; obvs we add the lib as a dep. --- piker/brokers/binance.py | 9 ++++++--- piker/brokers/kraken/feed.py | 10 ++++++++-- setup.py | 1 + 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 9bb35b00..4bd16f22 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,7 +26,7 @@ from typing import ( ) import time -from async_generator import aclosing +from trio_util import trio_async_generator import trio from trio_typing import TaskStatus import pendulum @@ -318,7 +318,10 @@ class AggTrade(Struct): M: bool # Ignore -async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: +@trio_async_generator +async def stream_messages( + ws: NoBsWs, +) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 while True: @@ -540,7 +543,7 @@ async def stream_quotes( ) as ws, # avoid stream-gen closure from breaking trio.. - aclosing(stream_messages(ws)) as msg_gen, + stream_messages(ws) as msg_gen, ): typ, quote = await anext(msg_gen) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index a737aaad..b8228a55 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -27,11 +27,11 @@ from typing import ( ) import time -from async_generator import aclosing from fuzzywuzzy import process as fuzzy import numpy as np import pendulum from trio_typing import TaskStatus +from trio_util import trio_async_generator import tractor import trio @@ -122,6 +122,7 @@ async def stream_messages( yield msg +@trio_async_generator async def process_data_feed_msgs( ws: NoBsWs, ): @@ -378,7 +379,12 @@ async def stream_quotes( 'wss://ws.kraken.com/', fixture=subscribe, ) as ws, - aclosing(process_data_feed_msgs(ws)) as msg_gen, + + # avoid stream-gen closure from breaking trio.. + # NOTE: not sure this actually works XD particularly + # if we call `ws._connect()` manally in the streaming + # async gen.. + process_data_feed_msgs(ws) as msg_gen, ): # pull a first quote and deliver typ, ohlc_last = await anext(msg_gen) diff --git a/setup.py b/setup.py index 2a686cc5..19729481 100755 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ setup( # async 'trio', 'trio-websocket', + 'trio-util', 'async_generator', # from github currently (see requirements.txt)