From 58da3ceecf9ff93d2182eeb8ea1fbe263a86776f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 May 2022 11:27:38 -0400 Subject: [PATCH] Proxy heaven, choose one "preferred data client" In order to expose more `asyncio` powered `Client` methods to endpoint task-code this adds a more extensive and layered set of `MethodProxy` loading routines, in dependency order these are: - `load_clients_for_trio()` a `tractor.to_asyncio.open_channel_from()` entry-point factory for loading all scanned clients on the `asyncio` side and delivering them over the inter-task channel to a `trio`-side task. - `get_preferred_data_client()` a simple client instance loading routine which reads from the users `brokers.toml -> `prefer_data_account: list[str]` which must list account names, in priority order, that are acceptable to be used as the main "data connection client" such that only one of the detected clients is used for data (whereas the rest are used only for order entry). - `open_client_proxies()` which delivers the detected `Client` set wrapped each in a `MethodProxy`. - `open_data_client()` which directly delivers the preferred data client as a proxy for `trio` tasks. - update `open_client_method_proxy()` and `open_client_proxy` to require an input `Client` instance. Further impl details: - add `MethodProxy._aio_ns` to ref the original `asyncio` side proxied instance - add `Client.trades()` to pull executions from the last day/session - load proxies inside `trades_dialogue` and use the new `.trades()` method to try and pull a fill ledger for eventual correct pp price calcs (pertains to #307).. --- config/brokers.toml | 22 +++- piker/brokers/ib.py | 240 +++++++++++++++++++++++++++++++++----------- 2 files changed, 196 insertions(+), 66 deletions(-) diff --git a/config/brokers.toml b/config/brokers.toml index 20216bde..1c6d9c29 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -14,14 +14,26 @@ secret = "" [ib] host = "127.0.0.1" +# when clients are being scanned this determines +# which clients are preferred to be used for data +# feeds based on the order of account names, if +# detected as active on an API client. +prefer_data_account = [ + 'paper', + 'margin', + 'ira', +] + +# the order in which ports will be scanned +# (by the `brokerd` daemon-actor) +# is determined # by the line order here. ports.gw = 4002 ports.tws = 7497 ports.order = ["gw", "tws",] +# the order in which accounts will be selectable +# in the order mode UI (if found via clients during +# API-app scanning)when a new symbol is loaded. +accounts.paper = "XX0000000" accounts.margin = "X0000000" accounts.ira = "X0000000" -accounts.paper = "XX0000000" - -# the order in which accounts will be selected (if found through -# `brokerd`) when a new symbol is loaded -accounts_order = ['paper', 'margin', 'ira'] diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b861d895..bb131fec 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -22,7 +22,9 @@ built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ +from __future__ import annotations from contextlib import asynccontextmanager as acm +from contextlib import AsyncExitStack from dataclasses import asdict, astuple from datetime import datetime from functools import partial @@ -39,6 +41,7 @@ import inspect import logging from random import randint import time +from types import SimpleNamespace import trio @@ -276,6 +279,27 @@ class Client: # NOTE: the ib.client here is "throttled" to 45 rps by default + async def trades( + self, + # api_only: bool = False, + + ) -> dict[str, Any]: + + # orders = await self.ib.reqCompletedOrdersAsync( + # apiOnly=api_only + # ) + fills = await self.ib.reqExecutionsAsync() + norm_fills = [] + for fill in fills: + fill = fill._asdict() # namedtuple + for key, val in fill.copy().items(): + if isinstance(val, Contract): + fill[key] = asdict(val) + + norm_fills.append(fill) + + return norm_fills + async def bars( self, fqsn: str, @@ -894,7 +918,7 @@ async def load_aio_clients( client_id: Optional[int] = None, -) -> Client: +) -> dict[str, Client]: ''' Return an ``ib_insync.IB`` instance wrapped in our client API. @@ -1063,12 +1087,7 @@ async def load_aio_clients( 'Check your `brokers.toml` and/or network' ) from _err - # retreive first loaded client - clients = list(_client_cache.values()) - if clients: - client = clients[0] - - yield client, _client_cache, _accounts2clients + yield _accounts2clients # TODO: this in a way that works xD # finally: @@ -1080,6 +1099,86 @@ async def load_aio_clients( # raise +async def load_clients_for_trio( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + +) -> None: + ''' + Pure async mngr proxy to ``load_aio_clients()``. + + ''' + async with load_aio_clients() as accts2clients: + to_trio.send_nowait(accts2clients) + + # TODO: maybe a sync event to wait on instead? + await asyncio.sleep(float('inf')) + + +@acm +async def open_client_proxies() -> tuple[ + dict[str, MethodProxy], + dict[str, Client], +]: + + proxies: dict[str, MethodProxy] = {} + + async with ( + tractor.to_asyncio.open_channel_from( + load_clients_for_trio, + ) as (clients, from_aio), + + AsyncExitStack() as stack + ): + for acct_name, client in clients.items(): + proxy = await stack.enter_async_context( + open_client_proxy(client), + ) + proxies[acct_name] = proxy + + yield proxies, clients + + +def get_preferred_data_client( + clients: dict[str, Client], + +) -> tuple[str, Client]: + conf = get_config() + data_accounts = conf['prefer_data_account'] + + for name in data_accounts: + client = clients.get(f'ib.{name}') + if client: + return name, client + else: + raise ValueError( + 'No preferred data client could be found:\n' + f'{data_accounts}' + ) + + +@acm +async def open_data_client() -> MethodProxy: + ''' + Open the first found preferred "data client" as defined in the + user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable + and deliver that client wrapped in a ``MethodProxy``. + + ''' + + async with ( + open_client_proxies() as (proxies, clients), + ): + account_name, client = get_preferred_data_client(clients) + proxy = proxies.get(f'ib.{account_name}') + if not proxy: + raise ValueError( + f'No preferred data client could be found for {account_name}!' + ) + + yield proxy + + async def _aio_run_client_method( meth: str, to_trio=None, @@ -1088,12 +1187,8 @@ async def _aio_run_client_method( **kwargs, ) -> None: - async with load_aio_clients() as ( - _client, - clients, - accts2clients, - ): - client = client or _client + async with load_aio_clients() as accts2clients: + client = list(accts2clients.values())[0] async_meth = getattr(client, meth) # handle streaming methods @@ -1144,10 +1239,12 @@ class MethodProxy: self, chan: to_asyncio.LinkedTaskChannel, event_table: dict[str, trio.Event], + asyncio_ns: SimpleNamespace, ) -> None: self.chan = chan self.event_table = event_table + self._aio_ns = asyncio_ns async def _run_method( self, @@ -1213,61 +1310,64 @@ class MethodProxy: async def open_aio_client_method_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, + client: Client, event_consumers: dict[str, trio.Event], ) -> None: - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): - to_trio.send_nowait(client) + to_trio.send_nowait(client) - # TODO: separate channel for error handling? - client.inline_errors(to_trio) + # TODO: separate channel for error handling? + client.inline_errors(to_trio) - # relay all method requests to ``asyncio``-side client and - # deliver back results - while not to_trio._closed: - msg = await from_trio.get() - if msg is None: - print('asyncio PROXY-RELAY SHUTDOWN') - break + # relay all method requests to ``asyncio``-side client and deliver + # back results + while not to_trio._closed: + msg = await from_trio.get() + if msg is None: + print('asyncio PROXY-RELAY SHUTDOWN') + break - meth_name, kwargs = msg - meth = getattr(client, meth_name) + meth_name, kwargs = msg + meth = getattr(client, meth_name) - try: - resp = await meth(**kwargs) - # echo the msg back - to_trio.send_nowait({'result': resp}) + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - except ( - RequestError, + except ( + RequestError, - # TODO: relay all errors to trio? - # BaseException, - ) as err: - to_trio.send_nowait({'exception': err}) + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'exception': err}) @acm -async def open_client_proxy() -> MethodProxy: +async def open_client_proxy( + client: Client, + +) -> MethodProxy: - # try: event_table = {} async with ( to_asyncio.open_channel_from( open_aio_client_method_relay, + client=client, event_consumers=event_table, ) as (first, chan), trio.open_nursery() as relay_n, ): assert isinstance(first, Client) - proxy = MethodProxy(chan, event_table) + proxy = MethodProxy( + chan, + event_table, + asyncio_ns=first, + ) # mock all remote methods on ib ``Client``. for name, method in inspect.getmembers( @@ -1318,7 +1418,7 @@ async def get_client( ''' # TODO: the IPC via portal relay layer for when this current # actor isn't in aio mode. - async with open_client_proxy() as proxy: + async with open_data_client() as proxy: yield proxy @@ -1535,6 +1635,7 @@ async def get_bars( # ) # TODO: some kinda resp here that indicates success # otherwise retry? + # port = proxy._aio_ns.ib.client.port await data_reset_hack() # TODO: a while loop here if we timeout? @@ -1542,14 +1643,9 @@ async def get_bars( ('history', hist_ev), # ('live', live_ev), ]: - # with trio.move_on_after(22) as cs: await ev.wait() log.info(f"{name} DATA RESET") - # if cs.cancelled_caught: - # log.warning("reset hack failed on first try?") - # await tractor.breakpoint() - fails += 1 continue @@ -1566,8 +1662,12 @@ async def open_history_client( symbol: str, ) -> tuple[Callable, int]: + ''' + History retreival endpoint - delivers a historical frame callble + that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. - async with open_client_proxy() as proxy: + ''' + async with open_data_client() as proxy: async def get_hist( end_dt: Optional[datetime] = None, @@ -1632,7 +1732,8 @@ async def backfill_bars( with trio.CancelScope() as cs: # async with open_history_client(fqsn) as proxy: - async with open_client_proxy() as proxy: + # async with open_client_proxy() as proxy: + async with open_data_client() as proxy: out, fails = await get_bars(proxy, fqsn) @@ -1734,11 +1835,8 @@ async def _setup_quote_stream( to_trio.send_nowait(None) - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): + async with load_aio_clients() as accts2clients: + client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -2123,11 +2221,16 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] - async with trio.open_nursery() as nurse: - for account, client in _accounts2clients.items(): + async with ( + trio.open_nursery() as nurse, + open_client_proxies() as (proxies, aioclients), + ): + # for account, client in _accounts2clients.items(): + for account, proxy in proxies.items(): + + client = aioclients[account] async def open_stream( task_status: TaskStatus[ @@ -2149,7 +2252,8 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - for client in _client_cache.values(): + # for client in _client_cache.values(): + for client in aioclients.values(): for pos in client.positions(): msg = pack_position(pos) @@ -2160,6 +2264,16 @@ async def trades_dialogue( all_positions.append(msg.dict()) + trades: list[dict] = [] + for proxy in proxies.values(): + trades.append(await proxy.trades()) + + log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + await ctx.started(( all_positions, tuple(name for name in accounts_def if name in accounts), @@ -2462,6 +2576,10 @@ async def data_reset_hack( - integration with ``ib-gw`` run in docker + Xorg? ''' + # TODO: see if we can find which window is mapped to which process? + # for eg. if we can launch a paper account with docker and then find + # the pid of it, can we send keycommands to that container somehow? + # TODO: try out this lib instead, seems to be the most modern # and usess the underlying lib: # https://github.com/rshk/python-libxdo @@ -2523,7 +2641,7 @@ async def data_reset_hack( # move mouse to bottom left of window (where there should # be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + 'mousemove_relative', '--sync', str(w - 4), str(h - 4), # NOTE: we may need to stick a `--retry 3` in here.. 'click', '--window', win_id,