diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 2281fa25..355ab362 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -20,18 +20,22 @@ """ from __future__ import annotations -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) from contextlib import AsyncExitStack from dataclasses import asdict, astuple from datetime import datetime from functools import ( partial, - lru_cache, + # lru_cache, ) import itertools from math import isnan from typing import ( Any, + Callable, Optional, Union, ) @@ -47,6 +51,7 @@ import trio import tractor from tractor import to_asyncio import pendulum +from eventkit import Event import ib_insync as ibis from ib_insync.contract import ( Contract, @@ -131,11 +136,13 @@ class NonShittyWrapper(Wrapper): class NonShittyIB(ibis.IB): - """The beginning of overriding quite a few decisions in this lib. + ''' + The beginning of overriding quite a few decisions in this lib. - Don't use datetimes - Don't use named tuples - """ + + ''' def __init__(self): # override `ib_insync` internal loggers so we can see wtf @@ -312,6 +319,22 @@ _samplings: dict[int, tuple[str, str]] = { } +@cm +def remove_handler_on_err( + event: Event, + handler: Callable, +) -> None: + try: + yield + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + log.exception(f'Disconnected from {event} updates') + event.disconnect(handler) + + class Client: ''' IB wrapped for our broker backend API. @@ -1015,6 +1038,21 @@ class Client: self.ib.errorEvent.connect(push_err) + api_err = self.ib.client.apiError + + def report_api_err(msg: str) -> None: + with remove_handler_on_err( + api_err, + report_api_err, + ): + breakpoint() + to_trio.send_nowait(( + 'error', + msg, + )) + + api_err.connect(report_api_err) + def positions( self, account: str = '', @@ -1144,7 +1182,7 @@ async def load_aio_clients( # the API TCP in `ib_insync` connection can be flaky af so instead # retry a few times to get the client going.. connect_retries: int = 3, - connect_timeout: float = 0.5, + connect_timeout: float = 1, disconnect_on_exit: bool = True, ) -> dict[str, Client]: @@ -1216,9 +1254,9 @@ async def load_aio_clients( await ib.connectAsync( host, port, - clientId=client_id, + clientId=client_id + i, - # this timeout is sensative on windows and will + # this timeout is sensitive on windows and will # fail without a good "timeout error" so be # careful. timeout=connect_timeout, @@ -1242,15 +1280,10 @@ async def load_aio_clients( OSError, ) as ce: _err = ce - - if i > 8: - # cache logic to avoid rescanning if we already have all - # clients loaded. - _scan_ignore.add(sockaddr) - raise - log.warning( - f'Failed to connect on {port} for {i} time, retrying...') + f'Failed to connect on {port} for {i} time with,\n' + f'{ib.client.apiError.value()}\n' + 'retrying with a new client id..') # Pre-collect all accounts available for this # connection and map account names to this client @@ -1457,6 +1490,7 @@ async def open_aio_client_method_relay( ) -> None: + # sync with `open_client_proxy()` caller to_trio.send_nowait(client) # TODO: separate channel for error handling? @@ -1466,25 +1500,34 @@ async def open_aio_client_method_relay( # back results while not to_trio._closed: msg = await from_trio.get() - if msg is None: - print('asyncio PROXY-RELAY SHUTDOWN') - break - meth_name, kwargs = msg - meth = getattr(client, meth_name) + match msg: + case None: # termination sentinel + print('asyncio PROXY-RELAY SHUTDOWN') + break - try: - resp = await meth(**kwargs) - # echo the msg back - to_trio.send_nowait({'result': resp}) + case (meth_name, kwargs): + meth_name, kwargs = msg + meth = getattr(client, meth_name) - except ( - RequestError, + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - # TODO: relay all errors to trio? - # BaseException, - ) as err: - to_trio.send_nowait({'exception': err}) + except ( + RequestError, + + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'exception': err}) + + case {'error': content}: + to_trio.send_nowait({'exception': content}) + + case _: + raise ValueError(f'Unhandled msg {msg}') @acm