From 56cd15fa51df126ebe576478d0fef48ec7fba559 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 2 Apr 2023 17:59:42 -0400 Subject: [PATCH] ib: maybe incr client id; can't catch api errors.. Turns out we don't hookup our eventkit handler until after the `load_aio_clients()` is complete, which means we can't get `ib_insync.Client.apiError` events unless inside the asyncio side task. So I guess try to report any such errors during API scan (note the duplicate client id case is a special one from ibis itself) even though we're not going to catch them trio side. The hack to work around this is to just increment the client id value with the `connect_retries` led `i` value even though that will break on more then 3 clients attached to an API endpoint lul .. Further adjustments that were to the end of trying to fix this proper: - add `remove_handler_on_err()` cm to disconnect a handler when the trio side of the channel closes. - actually connect to client api erros in our `Client.inline_errors()` - increase connect timeout to a sec. - change the trio-asyncio proxy response-msg loop over to `match:` syntax and raise on unhandled msgs from eventkit handlers. --- piker/brokers/ib/api.py | 103 ++++++++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 30 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 2281fa25..355ab362 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -20,18 +20,22 @@ """ from __future__ import annotations -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) from contextlib import AsyncExitStack from dataclasses import asdict, astuple from datetime import datetime from functools import ( partial, - lru_cache, + # lru_cache, ) import itertools from math import isnan from typing import ( Any, + Callable, Optional, Union, ) @@ -47,6 +51,7 @@ import trio import tractor from tractor import to_asyncio import pendulum +from eventkit import Event import ib_insync as ibis from ib_insync.contract import ( Contract, @@ -131,11 +136,13 @@ class NonShittyWrapper(Wrapper): class NonShittyIB(ibis.IB): - """The beginning of overriding quite a few decisions in this lib. + ''' + The beginning of overriding quite a few decisions in this lib. - Don't use datetimes - Don't use named tuples - """ + + ''' def __init__(self): # override `ib_insync` internal loggers so we can see wtf @@ -312,6 +319,22 @@ _samplings: dict[int, tuple[str, str]] = { } +@cm +def remove_handler_on_err( + event: Event, + handler: Callable, +) -> None: + try: + yield + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + log.exception(f'Disconnected from {event} updates') + event.disconnect(handler) + + class Client: ''' IB wrapped for our broker backend API. @@ -1015,6 +1038,21 @@ class Client: self.ib.errorEvent.connect(push_err) + api_err = self.ib.client.apiError + + def report_api_err(msg: str) -> None: + with remove_handler_on_err( + api_err, + report_api_err, + ): + breakpoint() + to_trio.send_nowait(( + 'error', + msg, + )) + + api_err.connect(report_api_err) + def positions( self, account: str = '', @@ -1144,7 +1182,7 @@ async def load_aio_clients( # the API TCP in `ib_insync` connection can be flaky af so instead # retry a few times to get the client going.. connect_retries: int = 3, - connect_timeout: float = 0.5, + connect_timeout: float = 1, disconnect_on_exit: bool = True, ) -> dict[str, Client]: @@ -1216,9 +1254,9 @@ async def load_aio_clients( await ib.connectAsync( host, port, - clientId=client_id, + clientId=client_id + i, - # this timeout is sensative on windows and will + # this timeout is sensitive on windows and will # fail without a good "timeout error" so be # careful. timeout=connect_timeout, @@ -1242,15 +1280,10 @@ async def load_aio_clients( OSError, ) as ce: _err = ce - - if i > 8: - # cache logic to avoid rescanning if we already have all - # clients loaded. - _scan_ignore.add(sockaddr) - raise - log.warning( - f'Failed to connect on {port} for {i} time, retrying...') + f'Failed to connect on {port} for {i} time with,\n' + f'{ib.client.apiError.value()}\n' + 'retrying with a new client id..') # Pre-collect all accounts available for this # connection and map account names to this client @@ -1457,6 +1490,7 @@ async def open_aio_client_method_relay( ) -> None: + # sync with `open_client_proxy()` caller to_trio.send_nowait(client) # TODO: separate channel for error handling? @@ -1466,25 +1500,34 @@ async def open_aio_client_method_relay( # back results while not to_trio._closed: msg = await from_trio.get() - if msg is None: - print('asyncio PROXY-RELAY SHUTDOWN') - break - meth_name, kwargs = msg - meth = getattr(client, meth_name) + match msg: + case None: # termination sentinel + print('asyncio PROXY-RELAY SHUTDOWN') + break - try: - resp = await meth(**kwargs) - # echo the msg back - to_trio.send_nowait({'result': resp}) + case (meth_name, kwargs): + meth_name, kwargs = msg + meth = getattr(client, meth_name) - except ( - RequestError, + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - # TODO: relay all errors to trio? - # BaseException, - ) as err: - to_trio.send_nowait({'exception': err}) + except ( + RequestError, + + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'exception': err}) + + case {'error': content}: + to_trio.send_nowait({'exception': content}) + + case _: + raise ValueError(f'Unhandled msg {msg}') @acm