From a96f1dec3a16e23a4f8d4fc1c31c612e4071b6e5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 May 2022 11:27:38 -0400 Subject: [PATCH 01/18] Proxy heaven, choose one "preferred data client" In order to expose more `asyncio` powered `Client` methods to endpoint task-code this adds a more extensive and layered set of `MethodProxy` loading routines, in dependency order these are: - `load_clients_for_trio()` a `tractor.to_asyncio.open_channel_from()` entry-point factory for loading all scanned clients on the `asyncio` side and delivering them over the inter-task channel to a `trio`-side task. - `get_preferred_data_client()` a simple client instance loading routine which reads from the users `brokers.toml -> `prefer_data_account: list[str]` which must list account names, in priority order, that are acceptable to be used as the main "data connection client" such that only one of the detected clients is used for data (whereas the rest are used only for order entry). - `open_client_proxies()` which delivers the detected `Client` set wrapped each in a `MethodProxy`. - `open_data_client()` which directly delivers the preferred data client as a proxy for `trio` tasks. - update `open_client_method_proxy()` and `open_client_proxy` to require an input `Client` instance. Further impl details: - add `MethodProxy._aio_ns` to ref the original `asyncio` side proxied instance - add `Client.trades()` to pull executions from the last day/session - load proxies inside `trades_dialogue` and use the new `.trades()` method to try and pull a fill ledger for eventual correct pp price calcs (pertains to #307).. --- config/brokers.toml | 22 +++- piker/brokers/ib.py | 240 +++++++++++++++++++++++++++++++++----------- 2 files changed, 196 insertions(+), 66 deletions(-) diff --git a/config/brokers.toml b/config/brokers.toml index 20216bde..1c6d9c29 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -14,14 +14,26 @@ secret = "" [ib] host = "127.0.0.1" +# when clients are being scanned this determines +# which clients are preferred to be used for data +# feeds based on the order of account names, if +# detected as active on an API client. +prefer_data_account = [ + 'paper', + 'margin', + 'ira', +] + +# the order in which ports will be scanned +# (by the `brokerd` daemon-actor) +# is determined # by the line order here. ports.gw = 4002 ports.tws = 7497 ports.order = ["gw", "tws",] +# the order in which accounts will be selectable +# in the order mode UI (if found via clients during +# API-app scanning)when a new symbol is loaded. +accounts.paper = "XX0000000" accounts.margin = "X0000000" accounts.ira = "X0000000" -accounts.paper = "XX0000000" - -# the order in which accounts will be selected (if found through -# `brokerd`) when a new symbol is loaded -accounts_order = ['paper', 'margin', 'ira'] diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b861d895..bb131fec 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -22,7 +22,9 @@ built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ +from __future__ import annotations from contextlib import asynccontextmanager as acm +from contextlib import AsyncExitStack from dataclasses import asdict, astuple from datetime import datetime from functools import partial @@ -39,6 +41,7 @@ import inspect import logging from random import randint import time +from types import SimpleNamespace import trio @@ -276,6 +279,27 @@ class Client: # NOTE: the ib.client here is "throttled" to 45 rps by default + async def trades( + self, + # api_only: bool = False, + + ) -> dict[str, Any]: + + # orders = await self.ib.reqCompletedOrdersAsync( + # apiOnly=api_only + # ) + fills = await self.ib.reqExecutionsAsync() + norm_fills = [] + for fill in fills: + fill = fill._asdict() # namedtuple + for key, val in fill.copy().items(): + if isinstance(val, Contract): + fill[key] = asdict(val) + + norm_fills.append(fill) + + return norm_fills + async def bars( self, fqsn: str, @@ -894,7 +918,7 @@ async def load_aio_clients( client_id: Optional[int] = None, -) -> Client: +) -> dict[str, Client]: ''' Return an ``ib_insync.IB`` instance wrapped in our client API. @@ -1063,12 +1087,7 @@ async def load_aio_clients( 'Check your `brokers.toml` and/or network' ) from _err - # retreive first loaded client - clients = list(_client_cache.values()) - if clients: - client = clients[0] - - yield client, _client_cache, _accounts2clients + yield _accounts2clients # TODO: this in a way that works xD # finally: @@ -1080,6 +1099,86 @@ async def load_aio_clients( # raise +async def load_clients_for_trio( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + +) -> None: + ''' + Pure async mngr proxy to ``load_aio_clients()``. + + ''' + async with load_aio_clients() as accts2clients: + to_trio.send_nowait(accts2clients) + + # TODO: maybe a sync event to wait on instead? + await asyncio.sleep(float('inf')) + + +@acm +async def open_client_proxies() -> tuple[ + dict[str, MethodProxy], + dict[str, Client], +]: + + proxies: dict[str, MethodProxy] = {} + + async with ( + tractor.to_asyncio.open_channel_from( + load_clients_for_trio, + ) as (clients, from_aio), + + AsyncExitStack() as stack + ): + for acct_name, client in clients.items(): + proxy = await stack.enter_async_context( + open_client_proxy(client), + ) + proxies[acct_name] = proxy + + yield proxies, clients + + +def get_preferred_data_client( + clients: dict[str, Client], + +) -> tuple[str, Client]: + conf = get_config() + data_accounts = conf['prefer_data_account'] + + for name in data_accounts: + client = clients.get(f'ib.{name}') + if client: + return name, client + else: + raise ValueError( + 'No preferred data client could be found:\n' + f'{data_accounts}' + ) + + +@acm +async def open_data_client() -> MethodProxy: + ''' + Open the first found preferred "data client" as defined in the + user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable + and deliver that client wrapped in a ``MethodProxy``. + + ''' + + async with ( + open_client_proxies() as (proxies, clients), + ): + account_name, client = get_preferred_data_client(clients) + proxy = proxies.get(f'ib.{account_name}') + if not proxy: + raise ValueError( + f'No preferred data client could be found for {account_name}!' + ) + + yield proxy + + async def _aio_run_client_method( meth: str, to_trio=None, @@ -1088,12 +1187,8 @@ async def _aio_run_client_method( **kwargs, ) -> None: - async with load_aio_clients() as ( - _client, - clients, - accts2clients, - ): - client = client or _client + async with load_aio_clients() as accts2clients: + client = list(accts2clients.values())[0] async_meth = getattr(client, meth) # handle streaming methods @@ -1144,10 +1239,12 @@ class MethodProxy: self, chan: to_asyncio.LinkedTaskChannel, event_table: dict[str, trio.Event], + asyncio_ns: SimpleNamespace, ) -> None: self.chan = chan self.event_table = event_table + self._aio_ns = asyncio_ns async def _run_method( self, @@ -1213,61 +1310,64 @@ class MethodProxy: async def open_aio_client_method_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, + client: Client, event_consumers: dict[str, trio.Event], ) -> None: - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): - to_trio.send_nowait(client) + to_trio.send_nowait(client) - # TODO: separate channel for error handling? - client.inline_errors(to_trio) + # TODO: separate channel for error handling? + client.inline_errors(to_trio) - # relay all method requests to ``asyncio``-side client and - # deliver back results - while not to_trio._closed: - msg = await from_trio.get() - if msg is None: - print('asyncio PROXY-RELAY SHUTDOWN') - break + # relay all method requests to ``asyncio``-side client and deliver + # back results + while not to_trio._closed: + msg = await from_trio.get() + if msg is None: + print('asyncio PROXY-RELAY SHUTDOWN') + break - meth_name, kwargs = msg - meth = getattr(client, meth_name) + meth_name, kwargs = msg + meth = getattr(client, meth_name) - try: - resp = await meth(**kwargs) - # echo the msg back - to_trio.send_nowait({'result': resp}) + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - except ( - RequestError, + except ( + RequestError, - # TODO: relay all errors to trio? - # BaseException, - ) as err: - to_trio.send_nowait({'exception': err}) + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'exception': err}) @acm -async def open_client_proxy() -> MethodProxy: +async def open_client_proxy( + client: Client, + +) -> MethodProxy: - # try: event_table = {} async with ( to_asyncio.open_channel_from( open_aio_client_method_relay, + client=client, event_consumers=event_table, ) as (first, chan), trio.open_nursery() as relay_n, ): assert isinstance(first, Client) - proxy = MethodProxy(chan, event_table) + proxy = MethodProxy( + chan, + event_table, + asyncio_ns=first, + ) # mock all remote methods on ib ``Client``. for name, method in inspect.getmembers( @@ -1318,7 +1418,7 @@ async def get_client( ''' # TODO: the IPC via portal relay layer for when this current # actor isn't in aio mode. - async with open_client_proxy() as proxy: + async with open_data_client() as proxy: yield proxy @@ -1535,6 +1635,7 @@ async def get_bars( # ) # TODO: some kinda resp here that indicates success # otherwise retry? + # port = proxy._aio_ns.ib.client.port await data_reset_hack() # TODO: a while loop here if we timeout? @@ -1542,14 +1643,9 @@ async def get_bars( ('history', hist_ev), # ('live', live_ev), ]: - # with trio.move_on_after(22) as cs: await ev.wait() log.info(f"{name} DATA RESET") - # if cs.cancelled_caught: - # log.warning("reset hack failed on first try?") - # await tractor.breakpoint() - fails += 1 continue @@ -1566,8 +1662,12 @@ async def open_history_client( symbol: str, ) -> tuple[Callable, int]: + ''' + History retreival endpoint - delivers a historical frame callble + that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. - async with open_client_proxy() as proxy: + ''' + async with open_data_client() as proxy: async def get_hist( end_dt: Optional[datetime] = None, @@ -1632,7 +1732,8 @@ async def backfill_bars( with trio.CancelScope() as cs: # async with open_history_client(fqsn) as proxy: - async with open_client_proxy() as proxy: + # async with open_client_proxy() as proxy: + async with open_data_client() as proxy: out, fails = await get_bars(proxy, fqsn) @@ -1734,11 +1835,8 @@ async def _setup_quote_stream( to_trio.send_nowait(None) - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): + async with load_aio_clients() as accts2clients: + client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -2123,11 +2221,16 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] - async with trio.open_nursery() as nurse: - for account, client in _accounts2clients.items(): + async with ( + trio.open_nursery() as nurse, + open_client_proxies() as (proxies, aioclients), + ): + # for account, client in _accounts2clients.items(): + for account, proxy in proxies.items(): + + client = aioclients[account] async def open_stream( task_status: TaskStatus[ @@ -2149,7 +2252,8 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - for client in _client_cache.values(): + # for client in _client_cache.values(): + for client in aioclients.values(): for pos in client.positions(): msg = pack_position(pos) @@ -2160,6 +2264,16 @@ async def trades_dialogue( all_positions.append(msg.dict()) + trades: list[dict] = [] + for proxy in proxies.values(): + trades.append(await proxy.trades()) + + log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + await ctx.started(( all_positions, tuple(name for name in accounts_def if name in accounts), @@ -2462,6 +2576,10 @@ async def data_reset_hack( - integration with ``ib-gw`` run in docker + Xorg? ''' + # TODO: see if we can find which window is mapped to which process? + # for eg. if we can launch a paper account with docker and then find + # the pid of it, can we send keycommands to that container somehow? + # TODO: try out this lib instead, seems to be the most modern # and usess the underlying lib: # https://github.com/rshk/python-libxdo @@ -2523,7 +2641,7 @@ async def data_reset_hack( # move mouse to bottom left of window (where there should # be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + 'mousemove_relative', '--sync', str(w - 4), str(h - 4), # NOTE: we may need to stick a `--retry 3` in here.. 'click', '--window', win_id, From 6f172479ebab9e6d91be46a143bf9d429d0ed90c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 May 2022 13:32:39 -0400 Subject: [PATCH 02/18] Drop task-per-method `trio`-`asyncio` proxying Use method proxies through the remaining endpoints and drop the old spawn-a-task-per-method-call style helpers from module. --- piker/brokers/ib.py | 415 ++++++++++++++++++++------------------------ 1 file changed, 185 insertions(+), 230 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index bb131fec..bf66485b 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1143,6 +1143,13 @@ def get_preferred_data_client( clients: dict[str, Client], ) -> tuple[str, Client]: + ''' + Load and return the (first found) `Client` instance that is + preferred and should be used for data by iterating, in priority + order, the ``ib.prefer_data_account: list[str]`` account names in + the users ``brokers.toml`` file. + + ''' conf = get_config() data_accounts = conf['prefer_data_account'] @@ -1165,7 +1172,6 @@ async def open_data_client() -> MethodProxy: and deliver that client wrapped in a ``MethodProxy``. ''' - async with ( open_client_proxies() as (proxies, clients), ): @@ -1179,60 +1185,6 @@ async def open_data_client() -> MethodProxy: yield proxy -async def _aio_run_client_method( - meth: str, - to_trio=None, - from_trio=None, - client=None, - **kwargs, -) -> None: - - async with load_aio_clients() as accts2clients: - client = list(accts2clients.values())[0] - async_meth = getattr(client, meth) - - # handle streaming methods - args = tuple(inspect.getfullargspec(async_meth).args) - if to_trio and 'to_trio' in args: - kwargs['to_trio'] = to_trio - - log.runtime(f'Running {meth}({kwargs})') - return await async_meth(**kwargs) - - -async def _trio_run_client_method( - method: str, - client: Optional[Client] = None, - **kwargs, - -) -> None: - ''' - Asyncio entry point to run tasks against the ``ib_insync`` api. - - ''' - ca = tractor.current_actor() - assert ca.is_infected_aio() - - # if the method is an *async gen* stream for it - # meth = getattr(Client, method) - - # args = tuple(inspect.getfullargspec(meth).args) - - # if inspect.isasyncgenfunction(meth) or ( - # # if the method is an *async func* but manually - # # streams back results, make sure to also stream it - # 'to_trio' in args - # ): - # kwargs['_treat_as_stream'] = True - - return await to_asyncio.run_task( - _aio_run_client_method, - meth=method, - client=client, - **kwargs - ) - - class MethodProxy: def __init__( @@ -1830,6 +1782,9 @@ async def _setup_quote_stream( ''' Stream a ticker using the std L1 api. + This task is ``asyncio``-side and must be called from + ``tractor.to_asyncio.open_channel_from()``. + ''' global _quote_streams @@ -1926,6 +1881,7 @@ async def open_aio_quote_stream( _setup_quote_stream, symbol=symbol, contract=contract, + ) as (first, from_aio): # cache feed for later consumers @@ -1956,122 +1912,120 @@ async def stream_quotes( sym = symbols[0] log.info(f'request for real-time quotes: {sym}') - con, first_ticker, details = await _trio_run_client_method( - method='get_sym_details', - symbol=sym, - ) - first_quote = normalize(first_ticker) - # print(f'first quote: {first_quote}') + async with open_data_client() as proxy: - def mk_init_msgs() -> dict[str, dict]: - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) + con, first_ticker, details = await proxy.get_sym_details(symbol=sym) + first_quote = normalize(first_ticker) + # print(f'first quote: {first_quote}') - # nested dataclass we probably don't need and that won't IPC serialize - syminfo.pop('secIdList') + def mk_init_msgs() -> dict[str, dict]: + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] + # nested dataclass we probably don't need and that won't IPC + # serialize + syminfo.pop('secIdList') - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 - # for "traditional" assets, volume is normally discreet, not a float - syminfo['lot_tick_size'] = 0.0 + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': syminfo, - 'fqsn': first_quote['fqsn'], + # for "traditional" assets, volume is normally discreet, not + # a float + syminfo['lot_tick_size'] = 0.0 + + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], + } } - } - return init_msgs + return init_msgs - init_msgs = mk_init_msgs() + init_msgs = mk_init_msgs() - # TODO: we should instead spawn a task that waits on a feed to start - # and let it wait indefinitely..instead of this hard coded stuff. - with trio.move_on_after(1): - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(1): + contract, first_ticker, details = await proxy.get_quote(symbol=sym) - # it might be outside regular trading hours so see if we can at - # least grab history. - if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) + # it might be outside regular trading hours so see if we can at + # least grab history. + if isnan(first_ticker.last): + task_status.started((init_msgs, first_quote)) - # it's not really live but this will unblock - # the brokerd feed task to tell the ui to update? - feed_is_live.set() - - # block and let data history backfill code run. - await trio.sleep_forever() - return # we never expect feed to come up? - - async with open_aio_quote_stream( - symbol=sym, - contract=con, - ) as stream: - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - - task_status.started((init_msgs, first_quote)) - - async with aclosing(stream): - if type(first_ticker.contract) not in ( - ibis.Commodity, - ibis.Forex - ): - # wait for real volume on feed (trading might be closed) - while True: - ticker = await stream.receive() - - # for a real volume contract we rait for the first - # "real" trade to take place - if ( - # not calc_price - # and not ticker.rtTime - not ticker.rtTime - ): - # spin consuming tickers until we get a real - # market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - quote = normalize(ticker) - log.debug(f"First ticker received {quote}") - - # tell caller quotes are now coming in live + # it's not really live but this will unblock + # the brokerd feed task to tell the ui to update? feed_is_live.set() - # last = time.time() - async for ticker in stream: - quote = normalize(ticker) - await send_chan.send({quote['fqsn']: quote}) + # block and let data history backfill code run. + await trio.sleep_forever() + return # we never expect feed to come up? + + async with open_aio_quote_stream( + symbol=sym, + contract=con, + ) as stream: + + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + task_status.started((init_msgs, first_quote)) + + async with aclosing(stream): + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): + # wait for real volume on feed (trading might be closed) + while True: + ticker = await stream.receive() + + # for a real volume contract we rait for the first + # "real" trade to take place + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): + # spin consuming tickers until we get a real + # market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + + # tell caller quotes are now coming in live + feed_is_live.set() - # ugh, clear ticks since we've consumed them - ticker.ticks = [] # last = time.time() + async for ticker in stream: + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() def pack_position( @@ -2466,95 +2420,96 @@ async def open_symbol_search( ctx: tractor.Context, ) -> None: - # load all symbols locally for fast search + + # TODO: load user defined symbol set locally for fast search? await ctx.started({}) - async with ctx.open_stream() as stream: - - last = time.time() - - async for pattern in stream: - log.debug(f'received {pattern}') - now = time.time() - - assert pattern, 'IB can not accept blank search pattern' - - # throttle search requests to no faster then 1Hz - diff = now - last - if diff < 1.0: - log.debug('throttle sleeping') - await trio.sleep(diff) - try: - pattern = stream.receive_nowait() - except trio.WouldBlock: - pass - - if not pattern or pattern.isspace(): - log.warning('empty pattern received, skipping..') - - # TODO: *BUG* if nothing is returned here the client - # side will cache a null set result and not showing - # anything to the use on re-searches when this query - # timed out. We probably need a special "timeout" msg - # or something... - - # XXX: this unblocks the far end search task which may - # hold up a multi-search nursery block - await stream.send({}) - - continue - - log.debug(f'searching for {pattern}') + async with open_data_client() as proxy: + async with ctx.open_stream() as stream: last = time.time() - # async batch search using api stocks endpoint and module - # defined adhoc symbol set. - stock_results = [] + async for pattern in stream: + log.debug(f'received {pattern}') + now = time.time() - async def stash_results(target: Awaitable[list]): - stock_results.extend(await target) + assert pattern, 'IB can not accept blank search pattern' - async with trio.open_nursery() as sn: - sn.start_soon( - stash_results, - _trio_run_client_method( - method='search_symbols', - pattern=pattern, - upto=5, + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + except trio.WouldBlock: + pass + + if not pattern or pattern.isspace(): + log.warning('empty pattern received, skipping..') + + # TODO: *BUG* if nothing is returned here the client + # side will cache a null set result and not showing + # anything to the use on re-searches when this query + # timed out. We probably need a special "timeout" msg + # or something... + + # XXX: this unblocks the far end search task which may + # hold up a multi-search nursery block + await stream.send({}) + + continue + + log.debug(f'searching for {pattern}') + + last = time.time() + + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + stock_results.extend(await target) + + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + proxy.search_symbols( + pattern=pattern, + upto=5, + ), ) - ) - # trigger async request - await trio.sleep(0) + # trigger async request + await trio.sleep(0) - # match against our ad-hoc set immediately - adhoc_matches = fuzzy.extractBests( + # match against our ad-hoc set immediately + adhoc_matches = fuzzy.extractBests( + pattern, + list(_adhoc_futes_set), + score_cutoff=90, + ) + log.info(f'fuzzy matched adhocs: {adhoc_matches}') + adhoc_match_results = {} + if adhoc_matches: + # TODO: do we need to pull contract details? + adhoc_match_results = {i[0]: {} for i in adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( pattern, - list(_adhoc_futes_set), - score_cutoff=90, + stock_results, + score_cutoff=50, ) - log.info(f'fuzzy matched adhocs: {adhoc_matches}') - adhoc_match_results = {} - if adhoc_matches: - # TODO: do we need to pull contract details? - adhoc_match_results = {i[0]: {} for i in adhoc_matches} - log.debug(f'fuzzy matching stocks {stock_results}') - stock_matches = fuzzy.extractBests( - pattern, - stock_results, - score_cutoff=50, - ) + matches = adhoc_match_results | { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} - matches = adhoc_match_results | { - item[0]: {} for item in stock_matches - } - # TODO: we used to deliver contract details - # {item[2]: item[0] for item in stock_matches} - - log.debug(f"sending matches: {matches.keys()}") - await stream.send(matches) + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches) async def data_reset_hack( From bff625725e35c5369a002d505cbfa05558751300 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 24 May 2022 08:53:47 -0400 Subject: [PATCH 03/18] Implement reset hacks via our patched `asyncvnc` client --- piker/brokers/ib.py | 27 +++++++++++++++++++++++++++ requirements.txt | 4 ++++ 2 files changed, 31 insertions(+) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index bf66485b..b1d1cc42 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -2531,6 +2531,33 @@ async def data_reset_hack( - integration with ``ib-gw`` run in docker + Xorg? ''' + + async def vnc_click_hack( + reset_type: str = 'data' + ) -> None: + ''' + Reset the data or netowork connection for the VNC attached + ib gateway using magic combos. + + ''' + key = {'data': 'f', 'connection': 'r'}[reset_type] + + import asyncvnc + + async with asyncvnc.connect( + 'localhost', + port=5900, + force_video_mode='rgba', + ) as client: + + client.mouse.click() + client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked + + await tractor.to_asyncio.run_task(vnc_click_hack) + + # we don't really need the ``xdotool`` approach any more B) + return + # TODO: see if we can find which window is mapped to which process? # for eg. if we can launch a paper account with docker and then find # the pid of it, can we send keycommands to that container somehow? diff --git a/requirements.txt b/requirements.txt index 78255d32..64dd78c1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,7 @@ # ``trimeter`` for asysnc history fetching -e git+https://github.com/python-trio/trimeter.git@master#egg=trimeter + + +# ``asyncvnc`` for sending interactions to ib-gw inside docker +-e git+https://github.com/pikers/asyncvnc.git@vid_passthrough#egg=asyncvnc From c3142aec81ca1ac4dbb8f06454bad0906b0fc014 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 24 May 2022 08:54:55 -0400 Subject: [PATCH 04/18] Drop `i3ipc + `xdotool` approach for feed hacks --- piker/brokers/ib.py | 85 --------------------------------------------- 1 file changed, 85 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b1d1cc42..420087a3 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -2556,89 +2556,4 @@ async def data_reset_hack( await tractor.to_asyncio.run_task(vnc_click_hack) # we don't really need the ``xdotool`` approach any more B) - return - - # TODO: see if we can find which window is mapped to which process? - # for eg. if we can launch a paper account with docker and then find - # the pid of it, can we send keycommands to that container somehow? - - # TODO: try out this lib instead, seems to be the most modern - # and usess the underlying lib: - # https://github.com/rshk/python-libxdo - - # TODO: seems to be a few libs for python but not sure - # if they support all the sub commands we need, order of - # most recent commit history: - # https://github.com/rr-/pyxdotool - # https://github.com/ShaneHutter/pyxdotool - # https://github.com/cphyc/pyxdotool - - try: - import i3ipc - except ImportError: - return False - log.warning('IB data hack no-supported on ur platformz') - - i3 = i3ipc.Connection() - t = i3.get_tree() - - orig_win_id = t.find_focused().window - - # for tws - win_names: list[str] = [ - 'Interactive Brokers', # tws running in i3 - 'IB Gateway', # gw running in i3 - # 'IB', # gw running in i3 (newer version?) - ] - - combos: dict[str, str] = { - # only required if we need a connection reset. - 'connection': ('ctrl+alt+r', 12), - - # data feed reset. - 'data': ('ctrl+alt+f', 6) - } - - for name in win_names: - results = t.find_titled(name) - print(f'results for {name}: {results}') - if results: - con = results[0] - print(f'Resetting data feed for {name}') - win_id = str(con.window) - w, h = con.rect.width, con.rect.height - - # TODO: only run the reconnect (2nd) kc on a detected - # disconnect? - key_combo, timeout = combos[reset_type] - # for key_combo, timeout in [ - # # only required if we need a connection reset. - # # ('ctrl+alt+r', 12), - # # data feed reset. - # ('ctrl+alt+f', 6) - # ]: - await trio.run_process([ - 'xdotool', - 'windowactivate', '--sync', win_id, - - # move mouse to bottom left of window (where there should - # be nothing to click). - 'mousemove_relative', '--sync', str(w - 4), str(h - 4), - - # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, - '--repeat', '3', '1', - - # hackzorzes - 'key', key_combo, - # ], - # timeout=timeout, - ]) - - # re-activate and focus original window - await trio.run_process([ - 'xdotool', - 'windowactivate', '--sync', str(orig_win_id), - 'click', '--window', str(orig_win_id), '1', - ]) return True From aba8b05a334b802060b9eb73fa7b197e26745d19 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Apr 2022 12:33:48 -0400 Subject: [PATCH 05/18] Fix null match --- piker/brokers/ib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 420087a3..5991f766 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1631,7 +1631,7 @@ async def open_history_client( # TODO: add logic here to handle tradable hours and only grab # valid bars in the range - if out == (None, None): + if out is None: # could be trying to retreive bars over weekend log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData( From b357a120b96b69247fb06d39ab0a9151b65e578e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 May 2022 08:12:07 -0400 Subject: [PATCH 06/18] Fix output unpack --- piker/brokers/ib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 5991f766..cd3bd8ab 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1791,7 +1791,7 @@ async def _setup_quote_stream( to_trio.send_nowait(None) async with load_aio_clients() as accts2clients: - client = get_preferred_data_client(accts2clients) + caccount_name, client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) From 26f47227d2ff7d77df7d5b5615b7b9a74c6423e1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 May 2022 08:45:10 -0400 Subject: [PATCH 07/18] Fix `.ib` pattern match --- piker/brokers/ib.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cd3bd8ab..5f81a158 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -520,7 +520,7 @@ class Client: # XXX UPDATE: we can probably do the tick/trades scraping # inside our eventkit handler instead to bypass this entirely? - if 'ib' in pattern: + if '.ib' in pattern: from ..data._source import unpack_fqsn broker, symbol, expiry = unpack_fqsn(pattern) else: @@ -536,11 +536,7 @@ class Client: symbol, _, expiry = symbol.rpartition('.') # use heuristics to figure out contract "type" - try: - sym, exch = symbol.upper().rsplit('.', maxsplit=1) - except ValueError: - # likely there's an embedded `.` for a forex pair - breakpoint() + sym, exch = symbol.upper().rsplit('.', maxsplit=1) qualify: bool = True From a5389beccd19e623900f1d7a88b536a329b8bf88 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 21 May 2022 10:59:34 -0400 Subject: [PATCH 08/18] Rejig scan loop for flaky TCP connects, better caching `ib-gw` seems particularly fragile to connections from clients with the same id (can result in weird connect hangs and even crashes) and `ib_insync` doesn't handle intermittent tcp disconnects that well..(especially on dockerized IBC setups). This adds a bunch of changes to our client caching and scan loop as well a proper task-locking-to-cache-proxies so that, - `asyncio`-side clients aren't double-loaded/connected even when explicitly trying to reconnect repeatedly with a given client to work around the unreliability of the `asyncio.Transport` design in `ib_insync`. - we can use `tractor.trionics.maybe_open_context()` to lock the `trio` side from loading more then one `Client` on the `asyncio` side and instead on cache hits only making a new `MethodProxy` around the reused `asyncio`-side client (since each `trio` task needs its own inter-task msg channel). - a `finally:` block teardown on all clients loaded in the scan loop avoids stale connections. - the connect params are now exposed as named args to `load_aio_clients()` can be easily controlled from caller code. Oh, and we properly hooked up the internal `ib_insync` logging to our own internal schema - makes it a lot easier to debug wtf is going on XD --- piker/brokers/ib.py | 253 +++++++++++++++++++++++--------------------- 1 file changed, 132 insertions(+), 121 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 5f81a158..00f8ca35 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -38,8 +38,6 @@ from typing import ( import asyncio from pprint import pformat import inspect -import logging -from random import randint import time from types import SimpleNamespace @@ -164,13 +162,23 @@ class NonShittyIB(ibis.IB): - Don't use named tuples """ def __init__(self): + + # override `ib_insync` internal loggers so we can see wtf + # it's doing.. + self._logger = get_logger( + 'ib_insync.ib', + ) self._createEvents() + # XXX: just to override this wrapper self.wrapper = NonShittyWrapper(self) self.client = ib_Client(self.wrapper) + self.client._logger = get_logger( + 'ib_insync.client', + ) + # self.errorEvent += self._onError self.client.apiEnd += self.disconnectedEvent - self._logger = logging.getLogger('ib_insync.ib') # map of symbols to contract ids @@ -883,9 +891,6 @@ _try_ports = [ _gw_port, _tws_port ] -# TODO: remove the randint stuff and use proper error checking in client -# factor below.. -_client_ids = itertools.count(randint(1, 100)) _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() @@ -911,8 +916,12 @@ async def load_aio_clients( host: str = '127.0.0.1', port: int = None, + client_id: int = 6116, - client_id: Optional[int] = None, + # the API TCP in `ib_insync` connection can be flaky af so instead + # retry a few times to get the client going.. + connect_retries: int = 3, + connect_timeout: float = 0.5, ) -> dict[str, Client]: ''' @@ -949,133 +958,111 @@ async def load_aio_clients( { 'gw': 4002, 'tws': 7497, - # 'order': ['gw', 'tws'] } ) order = ports.pop('order', None) if order: log.warning('`ports.order` section in `brokers.toml` is deprecated') + _err = None accounts_def = config.load_accounts(['ib']) try_ports = list(ports.values()) ports = try_ports if port is None else [port] - # we_connected = [] - connect_timeout = 2 combos = list(itertools.product(hosts, ports)) - - # allocate new and/or reload disconnected but cached clients - # try: - # TODO: support multiple clients allowing for execution on - # multiple accounts (including a paper instance running on the - # same machine) and switching between accounts in the ems. - - _err = None + accounts_found: dict[str, Client] = {} # (re)load any and all clients that can be found # from connection details in ``brokers.toml``. for host, port in combos: sockaddr = (host, port) - client = _client_cache.get(sockaddr) - accounts_found: dict[str, Client] = {} - if ( - client and client.ib.isConnected() + sockaddr in _client_cache or sockaddr in _scan_ignore ): continue - try: - ib = NonShittyIB() + ib = NonShittyIB() - # XXX: not sure if we ever really need to increment the - # client id if teardown is sucessful. - client_id = 6116 + for i in range(connect_retries): + try: + await ib.connectAsync( + host, + port, + clientId=client_id, - await ib.connectAsync( - host, - port, - clientId=client_id, + # this timeout is sensative on windows and will + # fail without a good "timeout error" so be + # careful. + timeout=connect_timeout, + ) + break - # this timeout is sensative on windows and will - # fail without a good "timeout error" so be - # careful. - timeout=connect_timeout, - ) + except ( + ConnectionRefusedError, - # create and cache client - client = Client(ib) + # TODO: if trying to scan for remote api clients + # pretty sure we need to catch this, though it + # definitely needs a shorter timeout since it hangs + # for like 5s.. + asyncio.exceptions.TimeoutError, + OSError, + ) as ce: + _err = ce - # Pre-collect all accounts available for this - # connection and map account names to this client - # instance. - pps = ib.positions() - if pps: - for pp in pps: - accounts_found[ - accounts_def.inverse[pp.account] - ] = client + if i > 8: + # cache logic to avoid rescanning if we already have all + # clients loaded. + _scan_ignore.add(sockaddr) + raise - # if there are accounts without positions we should still - # register them for this client - for value in ib.accountValues(): - acct_number = value.account + log.warning( + f'Failed to connect on {port} for {i} time, retrying...') - entry = accounts_def.inverse.get(acct_number) - if not entry: - raise ValueError( - 'No section in brokers.toml for account:' - f' {acct_number}\n' - f'Please add entry to continue using this API client' - ) + # create and cache client + client = Client(ib) - # surjection of account names to operating clients. - if acct_number not in accounts_found: - accounts_found[entry] = client + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + accounts_found[ + accounts_def.inverse[pp.account] + ] = client - log.info( - f'Loaded accounts for client @ {host}:{port}\n' - f'{pformat(accounts_found)}' - ) + # if there are accounts without positions we should still + # register them for this client + for value in ib.accountValues(): + acct_number = value.account - # update all actor-global caches - log.info(f"Caching client for {(host, port)}") - _client_cache[(host, port)] = client + entry = accounts_def.inverse.get(acct_number) + if not entry: + raise ValueError( + 'No section in brokers.toml for account:' + f' {acct_number}\n' + f'Please add entry to continue using this API client' + ) - # we_connected.append((host, port, client)) + # surjection of account names to operating clients. + if acct_number not in accounts_found: + accounts_found[entry] = client - # TODO: don't do it this way, get a gud to_asyncio - # context / .start() system goin.. - def pop_and_discon(): - log.info(f'Disconnecting client {client}') - client.ib.disconnect() - _client_cache.pop((host, port), None) + log.info( + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' + ) - # NOTE: the above callback **CAN'T FAIL** or shm won't get - # torn down correctly ... - tractor._actor._lifetime_stack.callback(pop_and_discon) + # update all actor-global caches + log.info(f"Caching client for {sockaddr}") + _client_cache[sockaddr] = client - # XXX: why aren't we just updating this directy above - # instead of using the intermediary `accounts_found`? - _accounts2clients.update(accounts_found) - - except ( - ConnectionRefusedError, - - # TODO: if trying to scan for remote api clients - # pretty sure we need to catch this, though it - # definitely needs a shorter timeout since it hangs - # for like 5s.. - asyncio.exceptions.TimeoutError, - OSError, - ) as ce: - _err = ce - log.warning(f'Failed to connect on {port}') - - # cache logic to avoid rescanning if we already have all - # clients loaded. - _scan_ignore.add(sockaddr) + # XXX: why aren't we just updating this directy above + # instead of using the intermediary `accounts_found`? + _accounts2clients.update(accounts_found) + # if we have no clients after the scan loop then error out. if not _client_cache: raise ConnectionError( 'No ib APIs could be found scanning @:\n' @@ -1083,16 +1070,15 @@ async def load_aio_clients( 'Check your `brokers.toml` and/or network' ) from _err - yield _accounts2clients - - # TODO: this in a way that works xD - # finally: - # pass - # # async with trio.CancelScope(shield=True): - # for host, port, client in we_connected: - # client.ib.disconnect() - # _client_cache.pop((host, port)) - # raise + try: + yield _accounts2clients + finally: + # TODO: for re-scans we'll want to not teardown clients which + # are up and stable right? + for acct, client in _accounts2clients.items(): + log.info(f'Disconnecting {acct}@{client}') + client.ib.disconnect() + _client_cache.pop((host, port)) async def load_clients_for_trio( @@ -1103,36 +1089,56 @@ async def load_clients_for_trio( ''' Pure async mngr proxy to ``load_aio_clients()``. - ''' - async with load_aio_clients() as accts2clients: - to_trio.send_nowait(accts2clients) + This is a bootstrap entrypoing to call from + a ``tractor.to_asyncio.open_channel_from()``. - # TODO: maybe a sync event to wait on instead? + ''' + global _accounts2clients + + if _accounts2clients: + to_trio.send_nowait(_accounts2clients) await asyncio.sleep(float('inf')) + else: + async with load_aio_clients() as accts2clients: + to_trio.send_nowait(accts2clients) + + # TODO: maybe a sync event to wait on instead? + await asyncio.sleep(float('inf')) + + +_proxies: dict[str, MethodProxy] = {} + @acm async def open_client_proxies() -> tuple[ dict[str, MethodProxy], dict[str, Client], ]: - - proxies: dict[str, MethodProxy] = {} - async with ( - tractor.to_asyncio.open_channel_from( - load_clients_for_trio, - ) as (clients, from_aio), + tractor.trionics.maybe_open_context( + # acm_func=open_client_proxies, + acm_func=tractor.to_asyncio.open_channel_from, + kwargs={'target': load_clients_for_trio}, + + # lock around current actor task access + # TODO: maybe this should be the default in tractor? + key=tractor.current_actor().uid, + + ) as (cache_hit, (clients, from_aio)), AsyncExitStack() as stack ): + if cache_hit: + log.info(f'Re-using cached clients: {clients}') + for acct_name, client in clients.items(): proxy = await stack.enter_async_context( open_client_proxy(client), ) - proxies[acct_name] = proxy + _proxies[acct_name] = proxy - yield proxies, clients + yield _proxies, clients def get_preferred_data_client( @@ -1511,10 +1517,15 @@ async def get_bars( for _ in range(10): try: - bars, bars_array = await proxy.bars( + out = await proxy.bars( fqsn=fqsn, end_dt=end_dt, ) + if out: + bars, bars_array = out + + else: + await tractor.breakpoint() if bars_array is None: raise SymbolNotFound(fqsn) From 8d6c5b214ef86f0e820b561443f41fb57a289933 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Jun 2022 12:12:35 -0400 Subject: [PATCH 09/18] Add 6, 6s retries on feed resets --- piker/brokers/ib.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 00f8ca35..26198a59 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1592,21 +1592,27 @@ async def get_bars( # # 'Market data farm connection is OK:usfuture' # 'Market data farm connection is OK:usfarm' # ) - # TODO: some kinda resp here that indicates success - # otherwise retry? + # port = proxy._aio_ns.ib.client.port await data_reset_hack() - # TODO: a while loop here if we timeout? - for name, ev in [ - ('history', hist_ev), - # ('live', live_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") + # try to wait on the reset event(s) to arrive, a timeout + # will trigger a retry up to 6 times (for now). + for i in range(6): + with trio.move_on_after(6) as cs: + for name, ev in [ + ('history', hist_ev), + # ('live', live_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + break - fails += 1 - continue + fails += 1 + if cs.cancelled_caught: + log.warning(f'Data reset hack failed, retrying {i}...') + + continue else: raise @@ -2554,7 +2560,6 @@ async def data_reset_hack( async with asyncvnc.connect( 'localhost', port=5900, - force_video_mode='rgba', ) as client: client.mouse.click() From 06832b94d485248e43118d410837904695fd88e2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Jun 2022 12:58:21 -0400 Subject: [PATCH 10/18] Add vnc password auth, connection reset logic Now that we have working client auth thanks to: https://github.com/barneygale/asyncvnc/pull/4 and related issue, we can use a pw for the vnc server, though we should eventually auto-generate a random one from a docker super obviously. Add logic to the data reset hack loop to do a connection reset after 2 failed/timeout attempts at the regular data reset. We need to also add this logic around reconnectionn events that are due to the host network connection: aka roaming that's faster then timing logic builtin to the gateway. --- dockering/ib/run_x11_vnc.sh | 2 +- piker/brokers/ib.py | 56 +++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/dockering/ib/run_x11_vnc.sh b/dockering/ib/run_x11_vnc.sh index c87dc2bb..c7c088b6 100755 --- a/dockering/ib/run_x11_vnc.sh +++ b/dockering/ib/run_x11_vnc.sh @@ -13,4 +13,4 @@ x11vnc \ -autoport 3003 \ # can't use this because of ``asyncvnc`` issue: # https://github.com/barneygale/asyncvnc/issues/1 - # -passwd "$VNC_SERVER_PASSWORD" + -passwd 'ibcansmbz' diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 26198a59..a1de5c4c 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1588,29 +1588,55 @@ async def get_bars( hist_ev = proxy.status_event( 'HMDS data farm connection is OK:ushmds' ) - # live_ev = proxy.status_event( - # # 'Market data farm connection is OK:usfuture' - # 'Market data farm connection is OK:usfarm' - # ) - # port = proxy._aio_ns.ib.client.port - await data_reset_hack() + # XXX: other event messages we might want to try and + # wait for but i wasn't able to get any of this + # reliable.. + # reconnect_start = proxy.status_event( + # 'Market data farm is connecting:usfuture' + # ) + # live_ev = proxy.status_event( + # 'Market data farm connection is OK:usfuture' + # ) # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). - for i in range(6): - with trio.move_on_after(6) as cs: + tries: int = 6 + reset_type: str = 'data' + resends: int = 0 + + for i in range(1, tries): + + log.warning(f'Sending reset request {reset_type}') + await data_reset_hack(reset_type=reset_type) + reset_type = 'data' + + with trio.move_on_after(3) as cs: for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? ('history', hist_ev), - # ('live', live_ev), ]: await ev.wait() log.info(f"{name} DATA RESET") break + if cs.cancelled_caught: fails += 1 - if cs.cancelled_caught: - log.warning(f'Data reset hack failed, retrying {i}...') + log.warning( + f'Data reset {name} timeout, retrying {i}.' + ) + + if resends > 1: + # on each 3rd timeout, do a full connection + # reset instead. + reset_type = 'connection' + resends = 0 + else: + resends += 1 continue @@ -2219,7 +2245,6 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - # for client in _client_cache.values(): for client in aioclients.values(): for pos in client.positions(): @@ -2560,8 +2585,15 @@ async def data_reset_hack( async with asyncvnc.connect( 'localhost', port=5900, + password='ibcansmbz', ) as client: + # move to middle of screen + # 640x1800 + client.mouse.move( + x=100, + y=100, + ) client.mouse.click() client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked From 5d53ecb433893b05af11b2a4be3f374888227902 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Jun 2022 10:17:50 -0400 Subject: [PATCH 11/18] Switch vnc server to port 3003 --- dockering/ib/run_x11_vnc.sh | 2 +- piker/brokers/ib.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dockering/ib/run_x11_vnc.sh b/dockering/ib/run_x11_vnc.sh index c7c088b6..69b6da85 100755 --- a/dockering/ib/run_x11_vnc.sh +++ b/dockering/ib/run_x11_vnc.sh @@ -13,4 +13,4 @@ x11vnc \ -autoport 3003 \ # can't use this because of ``asyncvnc`` issue: # https://github.com/barneygale/asyncvnc/issues/1 - -passwd 'ibcansmbz' + # -passwd 'ibcansmbz' diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index a1de5c4c..fb22a4fb 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -2584,8 +2584,8 @@ async def data_reset_hack( async with asyncvnc.connect( 'localhost', - port=5900, - password='ibcansmbz', + port=3003, + # password='ibcansmbz', ) as client: # move to middle of screen From d870a09a4bc90bc8d9e3049bbc7caf8a6b311779 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 4 Jun 2022 16:00:38 -0400 Subject: [PATCH 12/18] Increase timeouts, always connection reset after 3 tries --- piker/brokers/ib.py | 46 ++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index fb22a4fb..39174867 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1601,17 +1601,17 @@ async def get_bars( # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). - tries: int = 6 - reset_type: str = 'data' - resends: int = 0 + tries: int = 3 + timeout: float = 10 + # try 3 time with a data reset then fail over to + # a connection reset. for i in range(1, tries): - log.warning(f'Sending reset request {reset_type}') - await data_reset_hack(reset_type=reset_type) - reset_type = 'data' + log.warning('Sending DATA RESET request') + await data_reset_hack(reset_type='data') - with trio.move_on_after(3) as cs: + with trio.move_on_after(timeout) as cs: for name, ev in [ # TODO: not sure if waiting on other events # is all that useful here or not. in theory @@ -1630,15 +1630,27 @@ async def get_bars( f'Data reset {name} timeout, retrying {i}.' ) - if resends > 1: - # on each 3rd timeout, do a full connection - # reset instead. - reset_type = 'connection' - resends = 0 - else: - resends += 1 - continue + else: + + log.warning('Sending CONNECTION RESET') + await data_reset_hack(reset_type='connection') + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + + if cs.cancelled_caught: + fails += 1 + log.warning('Data CONNECTION RESET timeout!?') else: raise @@ -2591,8 +2603,8 @@ async def data_reset_hack( # move to middle of screen # 640x1800 client.mouse.move( - x=100, - y=100, + x=500, + y=500, ) client.mouse.click() client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked From 7229a39f479e0e9d254a159f00457036868c22a7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 4 Jun 2022 20:44:43 -0400 Subject: [PATCH 13/18] Drop data reset tries to 2 before connection reset --- piker/brokers/ib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 39174867..ab22f4d6 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1601,7 +1601,7 @@ async def get_bars( # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). - tries: int = 3 + tries: int = 2 timeout: float = 10 # try 3 time with a data reset then fail over to From 78b9333bcd371789e2c83dd98a7f653c10cb720d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Jun 2022 16:49:30 -0400 Subject: [PATCH 14/18] Expect `list` of ports in `ib.ports` section Given that naming the port map is mostly pointless, since accounts can be detected once the client connects, just expect a `brokers.toml` to define a simple sequence of port numbers. Toss in a warning for using the old map/`dict` style. --- piker/brokers/ib.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index ab22f4d6..bfb78697 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -883,14 +883,7 @@ async def recv_trade_updates( # let the engine run and stream await client.ib.disconnectedEvent - -# default config ports -_tws_port: int = 7497 -_gw_port: int = 4002 -_try_ports = [ - _gw_port, - _tws_port -] +# per-actor API ep caching _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() @@ -951,22 +944,20 @@ async def load_aio_clients( raise ValueError( 'Specify only one of `host` or `hosts` in `brokers.toml` config') - ports = conf.get( + try_ports = conf.get( 'ports', # default order is to check for gw first - { - 'gw': 4002, - 'tws': 7497, - } + [4002, 7497,] ) - order = ports.pop('order', None) - if order: - log.warning('`ports.order` section in `brokers.toml` is deprecated') + if isinstance(try_ports, dict): + log.warning( + '`ib.ports` in `brokers.toml` should be a `list` NOT a `dict`' + ) + try_ports = list(ports.values()) _err = None accounts_def = config.load_accounts(['ib']) - try_ports = list(ports.values()) ports = try_ports if port is None else [port] combos = list(itertools.product(hosts, ports)) accounts_found: dict[str, Client] = {} From 488506d8b8789c9d7c27e66733fa8d298e9e715f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Jun 2022 17:01:41 -0400 Subject: [PATCH 15/18] Move feed status label generation into a new module --- piker/ui/_feedstatus.py | 91 +++++++++++++++++++++++++++++++++++++++++ piker/ui/_forms.py | 4 +- piker/ui/order_mode.py | 72 +++++++++----------------------- 3 files changed, 112 insertions(+), 55 deletions(-) create mode 100644 piker/ui/_feedstatus.py diff --git a/piker/ui/_feedstatus.py b/piker/ui/_feedstatus.py new file mode 100644 index 00000000..b4e2e930 --- /dev/null +++ b/piker/ui/_feedstatus.py @@ -0,0 +1,91 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Feed status and controls widget(s) for embedding in a UI-pane. + +""" + +from __future__ import annotations +from textwrap import dedent +from typing import TYPE_CHECKING + +# from PyQt5.QtCore import Qt + +from ._style import _font, _font_small +from ..calc import humanize +from ._label import FormatLabel + +if TYPE_CHECKING: + from ._chart import ChartPlotWidget + from ..data.feed import Feed + from ._forms import FieldsForm + + +def mk_feed_label( + form: FieldsForm, + feed: Feed, + chart: ChartPlotWidget, + +) -> FormatLabel: + ''' + Generate a label from feed meta-data to be displayed + in a UI sidepane. + + TODO: eventually buttons for changing settings over + a feed control protocol. + + ''' + msg = """ + actor: **{actor_name}**\n + |_ @**{host}:{port}**\n + |_ throttle_hz: **{throttle_rate}**\n + |_ shm: **{shm}**\n + """ + + feed_label = FormatLabel( + fmt_str=dedent(msg), + # |_ streams: **{symbols}**\n + font=_font.font, + font_size=_font_small.px_size, + font_color='default_lightest', + ) + + # form.vbox.setAlignment(feed_label, Qt.AlignBottom) + # form.vbox.setAlignment(Qt.AlignBottom) + _ = chart.height() - ( + form.height() + + form.fill_bar.height() + # feed_label.height() + ) + + # fill in brokerd feed info + host, port = feed.portal.channel.raddr + if host == '127.0.0.1': + host = 'localhost' + mpshm = feed.shm._shm + shmstr = f'{humanize(mpshm.size)}' + + feed_label.format( + actor_name=feed.portal.channel.uid[0], + host=host, + port=port, + # symbols=len(feed.symbols), + shm=shmstr, + throttle_rate=feed.throttle_rate, + ) + + return feed_label diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index 3b33e032..c6d09594 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -750,12 +750,12 @@ def mk_order_pane_layout( parent=parent, fields_schema={ 'account': { - 'label': '**account**:', + 'label': '**accnt**:', 'type': 'select', 'default_value': ['paper'], }, 'size_unit': { - 'label': '**allocate**:', + 'label': '**alloc**:', 'type': 'select', 'default_value': [ '$ size', diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 6316f116..3e230b71 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -30,6 +30,7 @@ import uuid from pydantic import BaseModel import tractor import trio +from PyQt5.QtCore import Qt from .. import config from ..clearing._client import open_ems, OrderBook @@ -37,6 +38,7 @@ from ..clearing._allocate import ( mk_allocator, Position, ) +from ._style import _font from ..data._source import Symbol from ..data.feed import Feed from ..log import get_logger @@ -46,7 +48,8 @@ from ._position import ( PositionTracker, SettingsPane, ) -from ._label import FormatLabel +from ._forms import FieldsForm +# from ._label import FormatLabel from ._window import MultiStatus from ..clearing._messages import Order, BrokerdPosition from ._forms import open_form_input_handling @@ -639,63 +642,21 @@ async def open_order_mode( pp_tracker.hide_info() # setup order mode sidepane widgets - form = chart.sidepane - vbox = form.vbox - - from textwrap import dedent - - from PyQt5.QtCore import Qt - - from ._style import _font, _font_small - from ..calc import humanize - - feed_label = FormatLabel( - fmt_str=dedent(""" - actor: **{actor_name}**\n - |_ @**{host}:{port}**\n - |_ throttle_hz: **{throttle_rate}**\n - |_ streams: **{symbols}**\n - |_ shm: **{shm}**\n - """), - font=_font.font, - font_size=_font_small.px_size, - font_color='default_lightest', - ) - - form.feed_label = feed_label - - # add feed info label to top - vbox.insertWidget( - 0, - feed_label, - alignment=Qt.AlignBottom, - ) - # vbox.setAlignment(feed_label, Qt.AlignBottom) - # vbox.setAlignment(Qt.AlignBottom) - _ = chart.height() - ( - form.height() + - form.fill_bar.height() - # feed_label.height() - ) - vbox.setSpacing( + form: FieldsForm = chart.sidepane + form.vbox.setSpacing( int((1 + 5/8)*_font.px_size) ) - # fill in brokerd feed info - host, port = feed.portal.channel.raddr - if host == '127.0.0.1': - host = 'localhost' - mpshm = feed.shm._shm - shmstr = f'{humanize(mpshm.size)}' - form.feed_label.format( - actor_name=feed.portal.channel.uid[0], - host=host, - port=port, - symbols=len(feed.symbols), - shm=shmstr, - throttle_rate=feed.throttle_rate, + from ._feedstatus import mk_feed_label + + feed_label = mk_feed_label( + form, + feed, + chart, ) + # XXX: we set this because? + form.feed_label = feed_label order_pane = SettingsPane( form=form, # XXX: ugh, so hideous... @@ -706,6 +667,11 @@ async def open_order_mode( ) order_pane.set_accounts(list(trackers.keys())) + form.vbox.addWidget( + feed_label, + alignment=Qt.AlignBottom, + ) + # update pp icons for name, tracker in trackers.items(): order_pane.update_account_icons({name: tracker.live_pp}) From 88eccc1e15ff982890f8bf8ff831b9dee57625c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Jun 2022 22:01:37 -0400 Subject: [PATCH 16/18] Fill in label with pairs from `status` value of backend init msg --- piker/data/feed.py | 20 ++++++++++++++++++-- piker/ui/_feedstatus.py | 34 +++++++++++++--------------------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index e77052bf..605349e9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -46,6 +46,7 @@ import numpy as np from ..brokers import get_brokermod from .._cacheables import maybe_open_context +from ..calc import humanize from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, @@ -1183,10 +1184,10 @@ class Feed: shm: ShmArray mod: ModuleType first_quotes: dict # symbol names to first quote dicts - _portal: tractor.Portal - stream: trio.abc.ReceiveChannel[dict[str, Any]] + status: dict[str, Any] + throttle_rate: Optional[int] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None @@ -1327,9 +1328,24 @@ async def open_feed( first_quotes=first_quotes, stream=stream, _portal=portal, + status={}, throttle_rate=tick_throttle, ) + # fill out "status info" that the UI can show + host, port = feed.portal.channel.raddr + if host == '127.0.0.1': + host = 'localhost' + + feed.status.update({ + 'actor_name': feed.portal.channel.uid[0], + 'host': host, + 'port': port, + 'shm': f'{humanize(feed.shm._shm.size)}', + 'throttle_rate': feed.throttle_rate, + }) + feed.status.update(init_msg.pop('status', {})) + for sym, data in init_msg.items(): si = data['symbol_info'] fqsn = data['fqsn'] + f'.{brokername}' diff --git a/piker/ui/_feedstatus.py b/piker/ui/_feedstatus.py index b4e2e930..1c9eb772 100644 --- a/piker/ui/_feedstatus.py +++ b/piker/ui/_feedstatus.py @@ -26,7 +26,7 @@ from typing import TYPE_CHECKING # from PyQt5.QtCore import Qt from ._style import _font, _font_small -from ..calc import humanize +# from ..calc import humanize from ._label import FormatLabel if TYPE_CHECKING: @@ -49,15 +49,21 @@ def mk_feed_label( a feed control protocol. ''' - msg = """ + status = feed.status + assert status + + msg = dedent(""" actor: **{actor_name}**\n |_ @**{host}:{port}**\n - |_ throttle_hz: **{throttle_rate}**\n - |_ shm: **{shm}**\n - """ + """) + + for key, val in status.items(): + if key in ('host', 'port', 'actor_name'): + continue + msg += f'\n|_ {key}: **{{{key}}}**\n' feed_label = FormatLabel( - fmt_str=dedent(msg), + fmt_str=msg, # |_ streams: **{symbols}**\n font=_font.font, font_size=_font_small.px_size, @@ -72,20 +78,6 @@ def mk_feed_label( # feed_label.height() ) - # fill in brokerd feed info - host, port = feed.portal.channel.raddr - if host == '127.0.0.1': - host = 'localhost' - mpshm = feed.shm._shm - shmstr = f'{humanize(mpshm.size)}' - - feed_label.format( - actor_name=feed.portal.channel.uid[0], - host=host, - port=port, - # symbols=len(feed.symbols), - shm=shmstr, - throttle_rate=feed.throttle_rate, - ) + feed_label.format(**feed.status) return feed_label From 55a453a710a5c7926d72b4d7a518ba7f4282d44a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Jun 2022 22:05:00 -0400 Subject: [PATCH 17/18] Update `ib` section in brokers config template --- config/brokers.toml | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/config/brokers.toml b/config/brokers.toml index 1c6d9c29..adee409e 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -12,7 +12,19 @@ api_key = "" secret = "" [ib] -host = "127.0.0.1" +hosts = [ + "127.0.0.1", +] +# XXX: the order in which ports will be scanned +# (by the `brokerd` daemon-actor) +# is determined # by the line order here. +# TODO: when we eventually spawn gateways in our +# container, we can just dynamically allocate these +# using IBC. +ports = [ + 4002, # gw + 7497, # tws +] # when clients are being scanned this determines # which clients are preferred to be used for data @@ -24,16 +36,10 @@ prefer_data_account = [ 'ira', ] -# the order in which ports will be scanned -# (by the `brokerd` daemon-actor) -# is determined # by the line order here. -ports.gw = 4002 -ports.tws = 7497 -ports.order = ["gw", "tws",] - +[ib.accounts] # the order in which accounts will be selectable # in the order mode UI (if found via clients during # API-app scanning)when a new symbol is loaded. -accounts.paper = "XX0000000" -accounts.margin = "X0000000" -accounts.ira = "X0000000" +paper = "XX0000000" +margin = "X0000000" +ira = "X0000000" From 051680e25946095e90c9ae2147d0d5ca29d3f691 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Jun 2022 22:05:53 -0400 Subject: [PATCH 18/18] Fill data client sockaddr in feed status as `data_ep` field --- piker/brokers/ib.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index bfb78697..2fed7581 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -948,13 +948,13 @@ async def load_aio_clients( 'ports', # default order is to check for gw first - [4002, 7497,] + [4002, 7497] ) if isinstance(try_ports, dict): log.warning( '`ib.ports` in `brokers.toml` should be a `list` NOT a `dict`' ) - try_ports = list(ports.values()) + try_ports = list(try_ports.values()) _err = None accounts_def = config.load_accounts(['ib']) @@ -1725,8 +1725,6 @@ async def backfill_bars( with trio.CancelScope() as cs: - # async with open_history_client(fqsn) as proxy: - # async with open_client_proxy() as proxy: async with open_data_client() as proxy: out, fails = await get_bars(proxy, fqsn) @@ -1961,6 +1959,11 @@ async def stream_quotes( # print(f'first quote: {first_quote}') def mk_init_msgs() -> dict[str, dict]: + ''' + Collect a bunch of meta-data useful for feed startup and + pack in a `dict`-msg. + + ''' # pass back some symbol info like min_tick, trading_hours, etc. syminfo = asdict(details) syminfo.update(syminfo['contract']) @@ -1982,6 +1985,9 @@ async def stream_quotes( # a float syminfo['lot_tick_size'] = 0.0 + ibclient = proxy._aio_ns.ib.client + host, port = ibclient.host, ibclient.port + # TODO: for loop through all symbols passed in init_msgs = { # pass back token, and bool, signalling if we're the writer @@ -1989,7 +1995,11 @@ async def stream_quotes( sym: { 'symbol_info': syminfo, 'fqsn': first_quote['fqsn'], - } + }, + 'status': { + 'data_ep': f'{host}:{port}', + }, + } return init_msgs