diff --git a/config/brokers.toml b/config/brokers.toml index 20216bde..1c6d9c29 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -14,14 +14,26 @@ secret = "" [ib] host = "127.0.0.1" +# when clients are being scanned this determines +# which clients are preferred to be used for data +# feeds based on the order of account names, if +# detected as active on an API client. +prefer_data_account = [ + 'paper', + 'margin', + 'ira', +] + +# the order in which ports will be scanned +# (by the `brokerd` daemon-actor) +# is determined # by the line order here. ports.gw = 4002 ports.tws = 7497 ports.order = ["gw", "tws",] +# the order in which accounts will be selectable +# in the order mode UI (if found via clients during +# API-app scanning)when a new symbol is loaded. +accounts.paper = "XX0000000" accounts.margin = "X0000000" accounts.ira = "X0000000" -accounts.paper = "XX0000000" - -# the order in which accounts will be selected (if found through -# `brokerd`) when a new symbol is loaded -accounts_order = ['paper', 'margin', 'ira'] diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b861d895..bb131fec 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -22,7 +22,9 @@ built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ +from __future__ import annotations from contextlib import asynccontextmanager as acm +from contextlib import AsyncExitStack from dataclasses import asdict, astuple from datetime import datetime from functools import partial @@ -39,6 +41,7 @@ import inspect import logging from random import randint import time +from types import SimpleNamespace import trio @@ -276,6 +279,27 @@ class Client: # NOTE: the ib.client here is "throttled" to 45 rps by default + async def trades( + self, + # api_only: bool = False, + + ) -> dict[str, Any]: + + # orders = await self.ib.reqCompletedOrdersAsync( + # apiOnly=api_only + # ) + fills = await self.ib.reqExecutionsAsync() + norm_fills = [] + for fill in fills: + fill = fill._asdict() # namedtuple + for key, val in fill.copy().items(): + if isinstance(val, Contract): + fill[key] = asdict(val) + + norm_fills.append(fill) + + return norm_fills + async def bars( self, fqsn: str, @@ -894,7 +918,7 @@ async def load_aio_clients( client_id: Optional[int] = None, -) -> Client: +) -> dict[str, Client]: ''' Return an ``ib_insync.IB`` instance wrapped in our client API. @@ -1063,12 +1087,7 @@ async def load_aio_clients( 'Check your `brokers.toml` and/or network' ) from _err - # retreive first loaded client - clients = list(_client_cache.values()) - if clients: - client = clients[0] - - yield client, _client_cache, _accounts2clients + yield _accounts2clients # TODO: this in a way that works xD # finally: @@ -1080,6 +1099,86 @@ async def load_aio_clients( # raise +async def load_clients_for_trio( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + +) -> None: + ''' + Pure async mngr proxy to ``load_aio_clients()``. + + ''' + async with load_aio_clients() as accts2clients: + to_trio.send_nowait(accts2clients) + + # TODO: maybe a sync event to wait on instead? + await asyncio.sleep(float('inf')) + + +@acm +async def open_client_proxies() -> tuple[ + dict[str, MethodProxy], + dict[str, Client], +]: + + proxies: dict[str, MethodProxy] = {} + + async with ( + tractor.to_asyncio.open_channel_from( + load_clients_for_trio, + ) as (clients, from_aio), + + AsyncExitStack() as stack + ): + for acct_name, client in clients.items(): + proxy = await stack.enter_async_context( + open_client_proxy(client), + ) + proxies[acct_name] = proxy + + yield proxies, clients + + +def get_preferred_data_client( + clients: dict[str, Client], + +) -> tuple[str, Client]: + conf = get_config() + data_accounts = conf['prefer_data_account'] + + for name in data_accounts: + client = clients.get(f'ib.{name}') + if client: + return name, client + else: + raise ValueError( + 'No preferred data client could be found:\n' + f'{data_accounts}' + ) + + +@acm +async def open_data_client() -> MethodProxy: + ''' + Open the first found preferred "data client" as defined in the + user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable + and deliver that client wrapped in a ``MethodProxy``. + + ''' + + async with ( + open_client_proxies() as (proxies, clients), + ): + account_name, client = get_preferred_data_client(clients) + proxy = proxies.get(f'ib.{account_name}') + if not proxy: + raise ValueError( + f'No preferred data client could be found for {account_name}!' + ) + + yield proxy + + async def _aio_run_client_method( meth: str, to_trio=None, @@ -1088,12 +1187,8 @@ async def _aio_run_client_method( **kwargs, ) -> None: - async with load_aio_clients() as ( - _client, - clients, - accts2clients, - ): - client = client or _client + async with load_aio_clients() as accts2clients: + client = list(accts2clients.values())[0] async_meth = getattr(client, meth) # handle streaming methods @@ -1144,10 +1239,12 @@ class MethodProxy: self, chan: to_asyncio.LinkedTaskChannel, event_table: dict[str, trio.Event], + asyncio_ns: SimpleNamespace, ) -> None: self.chan = chan self.event_table = event_table + self._aio_ns = asyncio_ns async def _run_method( self, @@ -1213,61 +1310,64 @@ class MethodProxy: async def open_aio_client_method_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, + client: Client, event_consumers: dict[str, trio.Event], ) -> None: - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): - to_trio.send_nowait(client) + to_trio.send_nowait(client) - # TODO: separate channel for error handling? - client.inline_errors(to_trio) + # TODO: separate channel for error handling? + client.inline_errors(to_trio) - # relay all method requests to ``asyncio``-side client and - # deliver back results - while not to_trio._closed: - msg = await from_trio.get() - if msg is None: - print('asyncio PROXY-RELAY SHUTDOWN') - break + # relay all method requests to ``asyncio``-side client and deliver + # back results + while not to_trio._closed: + msg = await from_trio.get() + if msg is None: + print('asyncio PROXY-RELAY SHUTDOWN') + break - meth_name, kwargs = msg - meth = getattr(client, meth_name) + meth_name, kwargs = msg + meth = getattr(client, meth_name) - try: - resp = await meth(**kwargs) - # echo the msg back - to_trio.send_nowait({'result': resp}) + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - except ( - RequestError, + except ( + RequestError, - # TODO: relay all errors to trio? - # BaseException, - ) as err: - to_trio.send_nowait({'exception': err}) + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'exception': err}) @acm -async def open_client_proxy() -> MethodProxy: +async def open_client_proxy( + client: Client, + +) -> MethodProxy: - # try: event_table = {} async with ( to_asyncio.open_channel_from( open_aio_client_method_relay, + client=client, event_consumers=event_table, ) as (first, chan), trio.open_nursery() as relay_n, ): assert isinstance(first, Client) - proxy = MethodProxy(chan, event_table) + proxy = MethodProxy( + chan, + event_table, + asyncio_ns=first, + ) # mock all remote methods on ib ``Client``. for name, method in inspect.getmembers( @@ -1318,7 +1418,7 @@ async def get_client( ''' # TODO: the IPC via portal relay layer for when this current # actor isn't in aio mode. - async with open_client_proxy() as proxy: + async with open_data_client() as proxy: yield proxy @@ -1535,6 +1635,7 @@ async def get_bars( # ) # TODO: some kinda resp here that indicates success # otherwise retry? + # port = proxy._aio_ns.ib.client.port await data_reset_hack() # TODO: a while loop here if we timeout? @@ -1542,14 +1643,9 @@ async def get_bars( ('history', hist_ev), # ('live', live_ev), ]: - # with trio.move_on_after(22) as cs: await ev.wait() log.info(f"{name} DATA RESET") - # if cs.cancelled_caught: - # log.warning("reset hack failed on first try?") - # await tractor.breakpoint() - fails += 1 continue @@ -1566,8 +1662,12 @@ async def open_history_client( symbol: str, ) -> tuple[Callable, int]: + ''' + History retreival endpoint - delivers a historical frame callble + that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. - async with open_client_proxy() as proxy: + ''' + async with open_data_client() as proxy: async def get_hist( end_dt: Optional[datetime] = None, @@ -1632,7 +1732,8 @@ async def backfill_bars( with trio.CancelScope() as cs: # async with open_history_client(fqsn) as proxy: - async with open_client_proxy() as proxy: + # async with open_client_proxy() as proxy: + async with open_data_client() as proxy: out, fails = await get_bars(proxy, fqsn) @@ -1734,11 +1835,8 @@ async def _setup_quote_stream( to_trio.send_nowait(None) - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): + async with load_aio_clients() as accts2clients: + client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -2123,11 +2221,16 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] - async with trio.open_nursery() as nurse: - for account, client in _accounts2clients.items(): + async with ( + trio.open_nursery() as nurse, + open_client_proxies() as (proxies, aioclients), + ): + # for account, client in _accounts2clients.items(): + for account, proxy in proxies.items(): + + client = aioclients[account] async def open_stream( task_status: TaskStatus[ @@ -2149,7 +2252,8 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - for client in _client_cache.values(): + # for client in _client_cache.values(): + for client in aioclients.values(): for pos in client.positions(): msg = pack_position(pos) @@ -2160,6 +2264,16 @@ async def trades_dialogue( all_positions.append(msg.dict()) + trades: list[dict] = [] + for proxy in proxies.values(): + trades.append(await proxy.trades()) + + log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + await ctx.started(( all_positions, tuple(name for name in accounts_def if name in accounts), @@ -2462,6 +2576,10 @@ async def data_reset_hack( - integration with ``ib-gw`` run in docker + Xorg? ''' + # TODO: see if we can find which window is mapped to which process? + # for eg. if we can launch a paper account with docker and then find + # the pid of it, can we send keycommands to that container somehow? + # TODO: try out this lib instead, seems to be the most modern # and usess the underlying lib: # https://github.com/rshk/python-libxdo @@ -2523,7 +2641,7 @@ async def data_reset_hack( # move mouse to bottom left of window (where there should # be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + 'mousemove_relative', '--sync', str(w - 4), str(h - 4), # NOTE: we may need to stick a `--retry 3` in here.. 'click', '--window', win_id,