diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 5e64ab0b..4adb32ea 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -21,13 +21,20 @@ runnable script-programs. ''' from __future__ import annotations from functools import partial -from typing import Literal +from typing import ( + Literal, + TYPE_CHECKING, +) import subprocess import tractor from .._util import get_logger +if TYPE_CHECKING: + from .api import Client + from ib_insync import IB + log = get_logger('piker.brokers.ib') _reset_tech: Literal[ @@ -42,7 +49,8 @@ _reset_tech: Literal[ async def data_reset_hack( - vnc_host: str, + # vnc_host: str, + client: Client, reset_type: Literal['data', 'connection'], ) -> None: diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index a5c68c36..d6c36133 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -512,9 +512,9 @@ async def open_trade_event_stream( async with tractor.to_asyncio.open_channel_from( recv_trade_updates, client=client, - ) as (ibclient, trade_event_stream): + ) as (_, trade_event_stream): - assert ibclient is client.ib + # assert ibclient is client.ib task_status.started(trade_event_stream) await trio.sleep_forever() diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index d855539a..f2a00825 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -37,6 +37,7 @@ from typing import ( from async_generator import aclosing from fuzzywuzzy import process as fuzzy +import ib_insync as ibis import numpy as np import pendulum import tractor @@ -50,10 +51,10 @@ from .._util import ( ) from .api import ( # _adhoc_futes_set, + Client, con2fqme, log, load_aio_clients, - ibis, MethodProxy, open_client_proxies, get_preferred_data_client, @@ -276,7 +277,8 @@ async def wait_on_data_reset( # ) # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). - client = proxy._aio_ns.ib.client + client: Client = proxy._aio_ns + ib_client: ibis.IB = client.ib done = trio.Event() with trio.move_on_after(timeout) as cs: @@ -285,10 +287,11 @@ async def wait_on_data_reset( log.warning( 'Sending DATA RESET request:\n' - f'{client}' + f'{ib_client.client}' ) res = await data_reset_hack( - vnc_host=client.host, + # vnc_host=client.host, + ib_client=ib_client, reset_type=reset_type, )