Proxy heaven, choose one "preferred data client"

In order to expose more `asyncio` powered `Client` methods to endpoint
task-code this adds a more extensive and layered set of `MethodProxy`
loading routines, in dependency order these are:
- `load_clients_for_trio()` a `tractor.to_asyncio.open_channel_from()`
  entry-point factory for loading all scanned clients on the `asyncio` side
  and delivering them over the inter-task channel to a `trio`-side task.
- `get_preferred_data_client()` a simple client instance loading routine
  which reads from the users `brokers.toml -> `prefer_data_account:
  list[str]` which must list account names, in priority order, that are
  acceptable to be used as the main "data connection client" such that
  only one of the detected clients is used for data (whereas the rest
  are used only for order entry).
- `open_client_proxies()` which delivers the detected `Client` set
  wrapped each in a `MethodProxy`.
- `open_data_client()` which directly delivers the preferred data client
  as a proxy for `trio` tasks.
- update `open_client_method_proxy()` and `open_client_proxy` to require
  an input `Client` instance.

Further impl details:
- add `MethodProxy._aio_ns` to ref the original `asyncio` side proxied instance
- add `Client.trades()` to pull executions from the last day/session
- load proxies inside `trades_dialogue` and use the new `.trades()`
  method to try and pull a fill ledger for eventual correct pp price
  calcs (pertains to #307)..
ib_dedicated_data_client
Tyler Goodlet 2022-05-15 11:27:38 -04:00
parent 86caf5f6a3
commit a96f1dec3a
2 changed files with 196 additions and 66 deletions

View File

@ -14,14 +14,26 @@ secret = ""
[ib] [ib]
host = "127.0.0.1" host = "127.0.0.1"
# when clients are being scanned this determines
# which clients are preferred to be used for data
# feeds based on the order of account names, if
# detected as active on an API client.
prefer_data_account = [
'paper',
'margin',
'ira',
]
# the order in which ports will be scanned
# (by the `brokerd` daemon-actor)
# is determined # by the line order here.
ports.gw = 4002 ports.gw = 4002
ports.tws = 7497 ports.tws = 7497
ports.order = ["gw", "tws",] ports.order = ["gw", "tws",]
# the order in which accounts will be selectable
# in the order mode UI (if found via clients during
# API-app scanning)when a new symbol is loaded.
accounts.paper = "XX0000000"
accounts.margin = "X0000000" accounts.margin = "X0000000"
accounts.ira = "X0000000" accounts.ira = "X0000000"
accounts.paper = "XX0000000"
# the order in which accounts will be selected (if found through
# `brokerd`) when a new symbol is loaded
accounts_order = ['paper', 'margin', 'ira']

View File

@ -22,7 +22,9 @@ built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``. ``infected_aio==True``.
""" """
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from contextlib import AsyncExitStack
from dataclasses import asdict, astuple from dataclasses import asdict, astuple
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
@ -39,6 +41,7 @@ import inspect
import logging import logging
from random import randint from random import randint
import time import time
from types import SimpleNamespace
import trio import trio
@ -276,6 +279,27 @@ class Client:
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
async def trades(
self,
# api_only: bool = False,
) -> dict[str, Any]:
# orders = await self.ib.reqCompletedOrdersAsync(
# apiOnly=api_only
# )
fills = await self.ib.reqExecutionsAsync()
norm_fills = []
for fill in fills:
fill = fill._asdict() # namedtuple
for key, val in fill.copy().items():
if isinstance(val, Contract):
fill[key] = asdict(val)
norm_fills.append(fill)
return norm_fills
async def bars( async def bars(
self, self,
fqsn: str, fqsn: str,
@ -894,7 +918,7 @@ async def load_aio_clients(
client_id: Optional[int] = None, client_id: Optional[int] = None,
) -> Client: ) -> dict[str, Client]:
''' '''
Return an ``ib_insync.IB`` instance wrapped in our client API. Return an ``ib_insync.IB`` instance wrapped in our client API.
@ -1063,12 +1087,7 @@ async def load_aio_clients(
'Check your `brokers.toml` and/or network' 'Check your `brokers.toml` and/or network'
) from _err ) from _err
# retreive first loaded client yield _accounts2clients
clients = list(_client_cache.values())
if clients:
client = clients[0]
yield client, _client_cache, _accounts2clients
# TODO: this in a way that works xD # TODO: this in a way that works xD
# finally: # finally:
@ -1080,6 +1099,86 @@ async def load_aio_clients(
# raise # raise
async def load_clients_for_trio(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
'''
Pure async mngr proxy to ``load_aio_clients()``.
'''
async with load_aio_clients() as accts2clients:
to_trio.send_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead?
await asyncio.sleep(float('inf'))
@acm
async def open_client_proxies() -> tuple[
dict[str, MethodProxy],
dict[str, Client],
]:
proxies: dict[str, MethodProxy] = {}
async with (
tractor.to_asyncio.open_channel_from(
load_clients_for_trio,
) as (clients, from_aio),
AsyncExitStack() as stack
):
for acct_name, client in clients.items():
proxy = await stack.enter_async_context(
open_client_proxy(client),
)
proxies[acct_name] = proxy
yield proxies, clients
def get_preferred_data_client(
clients: dict[str, Client],
) -> tuple[str, Client]:
conf = get_config()
data_accounts = conf['prefer_data_account']
for name in data_accounts:
client = clients.get(f'ib.{name}')
if client:
return name, client
else:
raise ValueError(
'No preferred data client could be found:\n'
f'{data_accounts}'
)
@acm
async def open_data_client() -> MethodProxy:
'''
Open the first found preferred "data client" as defined in the
user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable
and deliver that client wrapped in a ``MethodProxy``.
'''
async with (
open_client_proxies() as (proxies, clients),
):
account_name, client = get_preferred_data_client(clients)
proxy = proxies.get(f'ib.{account_name}')
if not proxy:
raise ValueError(
f'No preferred data client could be found for {account_name}!'
)
yield proxy
async def _aio_run_client_method( async def _aio_run_client_method(
meth: str, meth: str,
to_trio=None, to_trio=None,
@ -1088,12 +1187,8 @@ async def _aio_run_client_method(
**kwargs, **kwargs,
) -> None: ) -> None:
async with load_aio_clients() as ( async with load_aio_clients() as accts2clients:
_client, client = list(accts2clients.values())[0]
clients,
accts2clients,
):
client = client or _client
async_meth = getattr(client, meth) async_meth = getattr(client, meth)
# handle streaming methods # handle streaming methods
@ -1144,10 +1239,12 @@ class MethodProxy:
self, self,
chan: to_asyncio.LinkedTaskChannel, chan: to_asyncio.LinkedTaskChannel,
event_table: dict[str, trio.Event], event_table: dict[str, trio.Event],
asyncio_ns: SimpleNamespace,
) -> None: ) -> None:
self.chan = chan self.chan = chan
self.event_table = event_table self.event_table = event_table
self._aio_ns = asyncio_ns
async def _run_method( async def _run_method(
self, self,
@ -1213,22 +1310,18 @@ class MethodProxy:
async def open_aio_client_method_relay( async def open_aio_client_method_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
client: Client,
event_consumers: dict[str, trio.Event], event_consumers: dict[str, trio.Event],
) -> None: ) -> None:
async with load_aio_clients() as (
client,
clients,
accts2clients,
):
to_trio.send_nowait(client) to_trio.send_nowait(client)
# TODO: separate channel for error handling? # TODO: separate channel for error handling?
client.inline_errors(to_trio) client.inline_errors(to_trio)
# relay all method requests to ``asyncio``-side client and # relay all method requests to ``asyncio``-side client and deliver
# deliver back results # back results
while not to_trio._closed: while not to_trio._closed:
msg = await from_trio.get() msg = await from_trio.get()
if msg is None: if msg is None:
@ -1253,21 +1346,28 @@ async def open_aio_client_method_relay(
@acm @acm
async def open_client_proxy() -> MethodProxy: async def open_client_proxy(
client: Client,
) -> MethodProxy:
# try:
event_table = {} event_table = {}
async with ( async with (
to_asyncio.open_channel_from( to_asyncio.open_channel_from(
open_aio_client_method_relay, open_aio_client_method_relay,
client=client,
event_consumers=event_table, event_consumers=event_table,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as relay_n, trio.open_nursery() as relay_n,
): ):
assert isinstance(first, Client) assert isinstance(first, Client)
proxy = MethodProxy(chan, event_table) proxy = MethodProxy(
chan,
event_table,
asyncio_ns=first,
)
# mock all remote methods on ib ``Client``. # mock all remote methods on ib ``Client``.
for name, method in inspect.getmembers( for name, method in inspect.getmembers(
@ -1318,7 +1418,7 @@ async def get_client(
''' '''
# TODO: the IPC via portal relay layer for when this current # TODO: the IPC via portal relay layer for when this current
# actor isn't in aio mode. # actor isn't in aio mode.
async with open_client_proxy() as proxy: async with open_data_client() as proxy:
yield proxy yield proxy
@ -1535,6 +1635,7 @@ async def get_bars(
# ) # )
# TODO: some kinda resp here that indicates success # TODO: some kinda resp here that indicates success
# otherwise retry? # otherwise retry?
# port = proxy._aio_ns.ib.client.port
await data_reset_hack() await data_reset_hack()
# TODO: a while loop here if we timeout? # TODO: a while loop here if we timeout?
@ -1542,14 +1643,9 @@ async def get_bars(
('history', hist_ev), ('history', hist_ev),
# ('live', live_ev), # ('live', live_ev),
]: ]:
# with trio.move_on_after(22) as cs:
await ev.wait() await ev.wait()
log.info(f"{name} DATA RESET") log.info(f"{name} DATA RESET")
# if cs.cancelled_caught:
# log.warning("reset hack failed on first try?")
# await tractor.breakpoint()
fails += 1 fails += 1
continue continue
@ -1566,8 +1662,12 @@ async def open_history_client(
symbol: str, symbol: str,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
'''
History retreival endpoint - delivers a historical frame callble
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
async with open_client_proxy() as proxy: '''
async with open_data_client() as proxy:
async def get_hist( async def get_hist(
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
@ -1632,7 +1732,8 @@ async def backfill_bars(
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# async with open_history_client(fqsn) as proxy: # async with open_history_client(fqsn) as proxy:
async with open_client_proxy() as proxy: # async with open_client_proxy() as proxy:
async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn) out, fails = await get_bars(proxy, fqsn)
@ -1734,11 +1835,8 @@ async def _setup_quote_stream(
to_trio.send_nowait(None) to_trio.send_nowait(None)
async with load_aio_clients() as ( async with load_aio_clients() as accts2clients:
client, client = get_preferred_data_client(accts2clients)
clients,
accts2clients,
):
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -2123,11 +2221,16 @@ async def trades_dialogue(
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
all_positions = [] all_positions = []
accounts = set() accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
async with trio.open_nursery() as nurse:
for account, client in _accounts2clients.items(): async with (
trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
# for account, client in _accounts2clients.items():
for account, proxy in proxies.items():
client = aioclients[account]
async def open_stream( async def open_stream(
task_status: TaskStatus[ task_status: TaskStatus[
@ -2149,7 +2252,8 @@ async def trades_dialogue(
assert account in accounts_def assert account in accounts_def
accounts.add(account) accounts.add(account)
for client in _client_cache.values(): # for client in _client_cache.values():
for client in aioclients.values():
for pos in client.positions(): for pos in client.positions():
msg = pack_position(pos) msg = pack_position(pos)
@ -2160,6 +2264,16 @@ async def trades_dialogue(
all_positions.append(msg.dict()) all_positions.append(msg.dict())
trades: list[dict] = []
for proxy in proxies.values():
trades.append(await proxy.trades())
log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started(( await ctx.started((
all_positions, all_positions,
tuple(name for name in accounts_def if name in accounts), tuple(name for name in accounts_def if name in accounts),
@ -2462,6 +2576,10 @@ async def data_reset_hack(
- integration with ``ib-gw`` run in docker + Xorg? - integration with ``ib-gw`` run in docker + Xorg?
''' '''
# TODO: see if we can find which window is mapped to which process?
# for eg. if we can launch a paper account with docker and then find
# the pid of it, can we send keycommands to that container somehow?
# TODO: try out this lib instead, seems to be the most modern # TODO: try out this lib instead, seems to be the most modern
# and usess the underlying lib: # and usess the underlying lib:
# https://github.com/rshk/python-libxdo # https://github.com/rshk/python-libxdo