diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 0c177db8..4bd16f22 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -26,6 +26,7 @@ from typing import ( ) import time +from trio_util import trio_async_generator import trio from trio_typing import TaskStatus import pendulum @@ -317,7 +318,10 @@ class AggTrade(Struct): M: bool # Ignore -async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: +@trio_async_generator +async def stream_messages( + ws: NoBsWs, +) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 while True: @@ -529,19 +533,23 @@ async def stream_quotes( # XXX: do we need to ack the unsub? # await ws.recv_msg() - async with open_autorecon_ws( - 'wss://stream.binance.com/ws', - fixture=subscribe, - ) as ws: + async with ( + open_autorecon_ws( + # XXX: see api docs which show diff addr? + # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information + # 'wss://ws-api.binance.com:443/ws-api/v3', + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws, + + # avoid stream-gen closure from breaking trio.. + stream_messages(ws) as msg_gen, + ): + typ, quote = await anext(msg_gen) # pull a first quote and deliver - msg_gen = stream_messages(ws) - - typ, quote = await msg_gen.__anext__() - while typ != 'trade': - # TODO: use ``anext()`` when it lands in 3.10! - typ, quote = await msg_gen.__anext__() + typ, quote = await anext(msg_gen) task_status.started((init_msgs, quote)) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index d6491ee7..14fd4d0b 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -24,6 +24,10 @@ import subprocess import tractor +from piker.log import get_logger + +log = get_logger(__name__) + _reset_tech: Literal[ 'vnc', @@ -134,54 +138,54 @@ def i3ipc_xdotool_manual_click_hack() -> None: # 'IB', # gw running in i3 (newer version?) ] - for name in win_names: - results = t.find_titled(name) - print(f'results for {name}: {results}') - if results: - con = results[0] - print(f'Resetting data feed for {name}') - win_id = str(con.window) - w, h = con.rect.width, con.rect.height + try: + for name in win_names: + results = t.find_titled(name) + print(f'results for {name}: {results}') + if results: + con = results[0] + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height - # TODO: seems to be a few libs for python but not sure - # if they support all the sub commands we need, order of - # most recent commit history: - # https://github.com/rr-/pyxdotool - # https://github.com/ShaneHutter/pyxdotool - # https://github.com/cphyc/pyxdotool + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool - # TODO: only run the reconnect (2nd) kc on a detected - # disconnect? - for key_combo, timeout in [ - # only required if we need a connection reset. - # ('ctrl+alt+r', 12), - # data feed reset. - ('ctrl+alt+f', 6) - ]: - subprocess.call([ - 'xdotool', - 'windowactivate', '--sync', win_id, + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + for key_combo, timeout in [ + # only required if we need a connection reset. + # ('ctrl+alt+r', 12), + # data feed reset. + ('ctrl+alt+f', 6) + ]: + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', win_id, - # move mouse to bottom left of window (where there should - # be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + # move mouse to bottom left of window (where + # there should be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), - # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, - '--repeat', '3', '1', + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', - # hackzorzes - 'key', key_combo, - ], - timeout=timeout, - ) + # hackzorzes + 'key', key_combo, + ], + timeout=timeout, + ) # re-activate and focus original window - try: subprocess.call([ 'xdotool', 'windowactivate', '--sync', str(orig_win_id), 'click', '--window', str(orig_win_id), '1', ]) except subprocess.TimeoutExpired: - log.exception(f'xdotool timed out?') + log.exception('xdotool timed out?') diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index a737aaad..b8228a55 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -27,11 +27,11 @@ from typing import ( ) import time -from async_generator import aclosing from fuzzywuzzy import process as fuzzy import numpy as np import pendulum from trio_typing import TaskStatus +from trio_util import trio_async_generator import tractor import trio @@ -122,6 +122,7 @@ async def stream_messages( yield msg +@trio_async_generator async def process_data_feed_msgs( ws: NoBsWs, ): @@ -378,7 +379,12 @@ async def stream_quotes( 'wss://ws.kraken.com/', fixture=subscribe, ) as ws, - aclosing(process_data_feed_msgs(ws)) as msg_gen, + + # avoid stream-gen closure from breaking trio.. + # NOTE: not sure this actually works XD particularly + # if we call `ws._connect()` manally in the streaming + # async gen.. + process_data_feed_msgs(ws) as msg_gen, ): # pull a first quote and deliver typ, ohlc_last = await anext(msg_gen) diff --git a/setup.py b/setup.py index 2a686cc5..19729481 100755 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ setup( # async 'trio', 'trio-websocket', + 'trio-util', 'async_generator', # from github currently (see requirements.txt)