ib: maybe incr client id; can't catch api errors..

Turns out we don't hookup our eventkit handler until after the
`load_aio_clients()` is complete, which means we can't get
`ib_insync.Client.apiError` events unless inside the asyncio side task.
So I guess try to report any such errors during API scan (note the
duplicate client id case is a special one from ibis itself) even though
we're not going to catch them trio side. The hack to work around this is
to just increment the client id value with the `connect_retries` led `i`
value even though that will break on more then 3 clients attached to an
API endpoint lul ..

Further adjustments that were to the end of trying to fix this proper:
- add `remove_handler_on_err()` cm to disconnect a handler when the trio
  side of the channel closes.
- actually connect to client api erros in our `Client.inline_errors()`
- increase connect timeout to a sec.
- change the trio-asyncio proxy response-msg loop over to `match:`
  syntax and raise on unhandled msgs from eventkit handlers.
rekt_pps
Tyler Goodlet 2023-04-02 17:59:42 -04:00
parent 879657cc75
commit 56cd15fa51
1 changed files with 73 additions and 30 deletions

View File

@ -20,18 +20,22 @@
"""
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from contextlib import AsyncExitStack
from dataclasses import asdict, astuple
from datetime import datetime
from functools import (
partial,
lru_cache,
# lru_cache,
)
import itertools
from math import isnan
from typing import (
Any,
Callable,
Optional,
Union,
)
@ -47,6 +51,7 @@ import trio
import tractor
from tractor import to_asyncio
import pendulum
from eventkit import Event
import ib_insync as ibis
from ib_insync.contract import (
Contract,
@ -131,11 +136,13 @@ class NonShittyWrapper(Wrapper):
class NonShittyIB(ibis.IB):
"""The beginning of overriding quite a few decisions in this lib.
'''
The beginning of overriding quite a few decisions in this lib.
- Don't use datetimes
- Don't use named tuples
"""
'''
def __init__(self):
# override `ib_insync` internal loggers so we can see wtf
@ -312,6 +319,22 @@ _samplings: dict[int, tuple[str, str]] = {
}
@cm
def remove_handler_on_err(
event: Event,
handler: Callable,
) -> None:
try:
yield
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
log.exception(f'Disconnected from {event} updates')
event.disconnect(handler)
class Client:
'''
IB wrapped for our broker backend API.
@ -1015,6 +1038,21 @@ class Client:
self.ib.errorEvent.connect(push_err)
api_err = self.ib.client.apiError
def report_api_err(msg: str) -> None:
with remove_handler_on_err(
api_err,
report_api_err,
):
breakpoint()
to_trio.send_nowait((
'error',
msg,
))
api_err.connect(report_api_err)
def positions(
self,
account: str = '',
@ -1144,7 +1182,7 @@ async def load_aio_clients(
# the API TCP in `ib_insync` connection can be flaky af so instead
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 0.5,
connect_timeout: float = 1,
disconnect_on_exit: bool = True,
) -> dict[str, Client]:
@ -1216,9 +1254,9 @@ async def load_aio_clients(
await ib.connectAsync(
host,
port,
clientId=client_id,
clientId=client_id + i,
# this timeout is sensative on windows and will
# this timeout is sensitive on windows and will
# fail without a good "timeout error" so be
# careful.
timeout=connect_timeout,
@ -1242,15 +1280,10 @@ async def load_aio_clients(
OSError,
) as ce:
_err = ce
if i > 8:
# cache logic to avoid rescanning if we already have all
# clients loaded.
_scan_ignore.add(sockaddr)
raise
log.warning(
f'Failed to connect on {port} for {i} time, retrying...')
f'Failed to connect on {port} for {i} time with,\n'
f'{ib.client.apiError.value()}\n'
'retrying with a new client id..')
# Pre-collect all accounts available for this
# connection and map account names to this client
@ -1457,6 +1490,7 @@ async def open_aio_client_method_relay(
) -> None:
# sync with `open_client_proxy()` caller
to_trio.send_nowait(client)
# TODO: separate channel for error handling?
@ -1466,25 +1500,34 @@ async def open_aio_client_method_relay(
# back results
while not to_trio._closed:
msg = await from_trio.get()
if msg is None:
print('asyncio PROXY-RELAY SHUTDOWN')
break
meth_name, kwargs = msg
meth = getattr(client, meth_name)
match msg:
case None: # termination sentinel
print('asyncio PROXY-RELAY SHUTDOWN')
break
try:
resp = await meth(**kwargs)
# echo the msg back
to_trio.send_nowait({'result': resp})
case (meth_name, kwargs):
meth_name, kwargs = msg
meth = getattr(client, meth_name)
except (
RequestError,
try:
resp = await meth(**kwargs)
# echo the msg back
to_trio.send_nowait({'result': resp})
# TODO: relay all errors to trio?
# BaseException,
) as err:
to_trio.send_nowait({'exception': err})
except (
RequestError,
# TODO: relay all errors to trio?
# BaseException,
) as err:
to_trio.send_nowait({'exception': err})
case {'error': content}:
to_trio.send_nowait({'exception': content})
case _:
raise ValueError(f'Unhandled msg {msg}')
@acm