Merge pull request #321 from pikers/ib_dedicated_data_client

Ib dedicated data client
flexxin
goodboy 2022-06-05 22:12:46 -04:00 committed by GitHub
commit be7c4e70f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 719 additions and 560 deletions

View File

@ -12,16 +12,34 @@ api_key = ""
secret = "" secret = ""
[ib] [ib]
host = "127.0.0.1" hosts = [
"127.0.0.1",
]
# XXX: the order in which ports will be scanned
# (by the `brokerd` daemon-actor)
# is determined # by the line order here.
# TODO: when we eventually spawn gateways in our
# container, we can just dynamically allocate these
# using IBC.
ports = [
4002, # gw
7497, # tws
]
ports.gw = 4002 # when clients are being scanned this determines
ports.tws = 7497 # which clients are preferred to be used for data
ports.order = ["gw", "tws",] # feeds based on the order of account names, if
# detected as active on an API client.
prefer_data_account = [
'paper',
'margin',
'ira',
]
accounts.margin = "X0000000" [ib.accounts]
accounts.ira = "X0000000" # the order in which accounts will be selectable
accounts.paper = "XX0000000" # in the order mode UI (if found via clients during
# API-app scanning)when a new symbol is loaded.
# the order in which accounts will be selected (if found through paper = "XX0000000"
# `brokerd`) when a new symbol is loaded margin = "X0000000"
accounts_order = ['paper', 'margin', 'ira'] ira = "X0000000"

View File

@ -13,4 +13,4 @@ x11vnc \
-autoport 3003 \ -autoport 3003 \
# can't use this because of ``asyncvnc`` issue: # can't use this because of ``asyncvnc`` issue:
# https://github.com/barneygale/asyncvnc/issues/1 # https://github.com/barneygale/asyncvnc/issues/1
# -passwd "$VNC_SERVER_PASSWORD" # -passwd 'ibcansmbz'

View File

@ -22,7 +22,9 @@ built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``. ``infected_aio==True``.
""" """
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from contextlib import AsyncExitStack
from dataclasses import asdict, astuple from dataclasses import asdict, astuple
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
@ -36,9 +38,8 @@ from typing import (
import asyncio import asyncio
from pprint import pformat from pprint import pformat
import inspect import inspect
import logging
from random import randint
import time import time
from types import SimpleNamespace
import trio import trio
@ -161,13 +162,23 @@ class NonShittyIB(ibis.IB):
- Don't use named tuples - Don't use named tuples
""" """
def __init__(self): def __init__(self):
# override `ib_insync` internal loggers so we can see wtf
# it's doing..
self._logger = get_logger(
'ib_insync.ib',
)
self._createEvents() self._createEvents()
# XXX: just to override this wrapper # XXX: just to override this wrapper
self.wrapper = NonShittyWrapper(self) self.wrapper = NonShittyWrapper(self)
self.client = ib_Client(self.wrapper) self.client = ib_Client(self.wrapper)
self.client._logger = get_logger(
'ib_insync.client',
)
# self.errorEvent += self._onError # self.errorEvent += self._onError
self.client.apiEnd += self.disconnectedEvent self.client.apiEnd += self.disconnectedEvent
self._logger = logging.getLogger('ib_insync.ib')
# map of symbols to contract ids # map of symbols to contract ids
@ -276,6 +287,27 @@ class Client:
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
async def trades(
self,
# api_only: bool = False,
) -> dict[str, Any]:
# orders = await self.ib.reqCompletedOrdersAsync(
# apiOnly=api_only
# )
fills = await self.ib.reqExecutionsAsync()
norm_fills = []
for fill in fills:
fill = fill._asdict() # namedtuple
for key, val in fill.copy().items():
if isinstance(val, Contract):
fill[key] = asdict(val)
norm_fills.append(fill)
return norm_fills
async def bars( async def bars(
self, self,
fqsn: str, fqsn: str,
@ -496,7 +528,7 @@ class Client:
# XXX UPDATE: we can probably do the tick/trades scraping # XXX UPDATE: we can probably do the tick/trades scraping
# inside our eventkit handler instead to bypass this entirely? # inside our eventkit handler instead to bypass this entirely?
if 'ib' in pattern: if '.ib' in pattern:
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
broker, symbol, expiry = unpack_fqsn(pattern) broker, symbol, expiry = unpack_fqsn(pattern)
else: else:
@ -512,11 +544,7 @@ class Client:
symbol, _, expiry = symbol.rpartition('.') symbol, _, expiry = symbol.rpartition('.')
# use heuristics to figure out contract "type" # use heuristics to figure out contract "type"
try:
sym, exch = symbol.upper().rsplit('.', maxsplit=1) sym, exch = symbol.upper().rsplit('.', maxsplit=1)
except ValueError:
# likely there's an embedded `.` for a forex pair
breakpoint()
qualify: bool = True qualify: bool = True
@ -855,17 +883,7 @@ async def recv_trade_updates(
# let the engine run and stream # let the engine run and stream
await client.ib.disconnectedEvent await client.ib.disconnectedEvent
# per-actor API ep caching
# default config ports
_tws_port: int = 7497
_gw_port: int = 4002
_try_ports = [
_gw_port,
_tws_port
]
# TODO: remove the randint stuff and use proper error checking in client
# factor below..
_client_ids = itertools.count(randint(1, 100))
_client_cache: dict[tuple[str, int], Client] = {} _client_cache: dict[tuple[str, int], Client] = {}
_scan_ignore: set[tuple[str, int]] = set() _scan_ignore: set[tuple[str, int]] = set()
@ -891,10 +909,14 @@ async def load_aio_clients(
host: str = '127.0.0.1', host: str = '127.0.0.1',
port: int = None, port: int = None,
client_id: int = 6116,
client_id: Optional[int] = None, # the API TCP in `ib_insync` connection can be flaky af so instead
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 0.5,
) -> Client: ) -> dict[str, Client]:
''' '''
Return an ``ib_insync.IB`` instance wrapped in our client API. Return an ``ib_insync.IB`` instance wrapped in our client API.
@ -922,56 +944,39 @@ async def load_aio_clients(
raise ValueError( raise ValueError(
'Specify only one of `host` or `hosts` in `brokers.toml` config') 'Specify only one of `host` or `hosts` in `brokers.toml` config')
ports = conf.get( try_ports = conf.get(
'ports', 'ports',
# default order is to check for gw first # default order is to check for gw first
{ [4002, 7497]
'gw': 4002,
'tws': 7497,
# 'order': ['gw', 'tws']
}
) )
order = ports.pop('order', None) if isinstance(try_ports, dict):
if order: log.warning(
log.warning('`ports.order` section in `brokers.toml` is deprecated') '`ib.ports` in `brokers.toml` should be a `list` NOT a `dict`'
)
accounts_def = config.load_accounts(['ib']) try_ports = list(try_ports.values())
try_ports = list(ports.values())
ports = try_ports if port is None else [port]
# we_connected = []
connect_timeout = 2
combos = list(itertools.product(hosts, ports))
# allocate new and/or reload disconnected but cached clients
# try:
# TODO: support multiple clients allowing for execution on
# multiple accounts (including a paper instance running on the
# same machine) and switching between accounts in the ems.
_err = None _err = None
accounts_def = config.load_accounts(['ib'])
ports = try_ports if port is None else [port]
combos = list(itertools.product(hosts, ports))
accounts_found: dict[str, Client] = {}
# (re)load any and all clients that can be found # (re)load any and all clients that can be found
# from connection details in ``brokers.toml``. # from connection details in ``brokers.toml``.
for host, port in combos: for host, port in combos:
sockaddr = (host, port) sockaddr = (host, port)
client = _client_cache.get(sockaddr)
accounts_found: dict[str, Client] = {}
if ( if (
client and client.ib.isConnected() sockaddr in _client_cache
or sockaddr in _scan_ignore or sockaddr in _scan_ignore
): ):
continue continue
try:
ib = NonShittyIB() ib = NonShittyIB()
# XXX: not sure if we ever really need to increment the for i in range(connect_retries):
# client id if teardown is sucessful. try:
client_id = 6116
await ib.connectAsync( await ib.connectAsync(
host, host,
port, port,
@ -982,6 +987,28 @@ async def load_aio_clients(
# careful. # careful.
timeout=connect_timeout, timeout=connect_timeout,
) )
break
except (
ConnectionRefusedError,
# TODO: if trying to scan for remote api clients
# pretty sure we need to catch this, though it
# definitely needs a shorter timeout since it hangs
# for like 5s..
asyncio.exceptions.TimeoutError,
OSError,
) as ce:
_err = ce
if i > 8:
# cache logic to avoid rescanning if we already have all
# clients loaded.
_scan_ignore.add(sockaddr)
raise
log.warning(
f'Failed to connect on {port} for {i} time, retrying...')
# create and cache client # create and cache client
client = Client(ib) client = Client(ib)
@ -1019,43 +1046,14 @@ async def load_aio_clients(
) )
# update all actor-global caches # update all actor-global caches
log.info(f"Caching client for {(host, port)}") log.info(f"Caching client for {sockaddr}")
_client_cache[(host, port)] = client _client_cache[sockaddr] = client
# we_connected.append((host, port, client))
# TODO: don't do it this way, get a gud to_asyncio
# context / .start() system goin..
def pop_and_discon():
log.info(f'Disconnecting client {client}')
client.ib.disconnect()
_client_cache.pop((host, port), None)
# NOTE: the above callback **CAN'T FAIL** or shm won't get
# torn down correctly ...
tractor._actor._lifetime_stack.callback(pop_and_discon)
# XXX: why aren't we just updating this directy above # XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`? # instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found) _accounts2clients.update(accounts_found)
except ( # if we have no clients after the scan loop then error out.
ConnectionRefusedError,
# TODO: if trying to scan for remote api clients
# pretty sure we need to catch this, though it
# definitely needs a shorter timeout since it hangs
# for like 5s..
asyncio.exceptions.TimeoutError,
OSError,
) as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
# cache logic to avoid rescanning if we already have all
# clients loaded.
_scan_ignore.add(sockaddr)
if not _client_cache: if not _client_cache:
raise ConnectionError( raise ConnectionError(
'No ib APIs could be found scanning @:\n' 'No ib APIs could be found scanning @:\n'
@ -1063,79 +1061,121 @@ async def load_aio_clients(
'Check your `brokers.toml` and/or network' 'Check your `brokers.toml` and/or network'
) from _err ) from _err
# retreive first loaded client try:
clients = list(_client_cache.values()) yield _accounts2clients
if clients: finally:
client = clients[0] # TODO: for re-scans we'll want to not teardown clients which
# are up and stable right?
yield client, _client_cache, _accounts2clients for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
# TODO: this in a way that works xD client.ib.disconnect()
# finally: _client_cache.pop((host, port))
# pass
# # async with trio.CancelScope(shield=True):
# for host, port, client in we_connected:
# client.ib.disconnect()
# _client_cache.pop((host, port))
# raise
async def _aio_run_client_method( async def load_clients_for_trio(
meth: str, from_trio: asyncio.Queue,
to_trio=None, to_trio: trio.abc.SendChannel,
from_trio=None,
client=None,
**kwargs,
) -> None: ) -> None:
'''
Pure async mngr proxy to ``load_aio_clients()``.
async with load_aio_clients() as ( This is a bootstrap entrypoing to call from
_client, a ``tractor.to_asyncio.open_channel_from()``.
clients,
accts2clients, '''
global _accounts2clients
if _accounts2clients:
to_trio.send_nowait(_accounts2clients)
await asyncio.sleep(float('inf'))
else:
async with load_aio_clients() as accts2clients:
to_trio.send_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead?
await asyncio.sleep(float('inf'))
_proxies: dict[str, MethodProxy] = {}
@acm
async def open_client_proxies() -> tuple[
dict[str, MethodProxy],
dict[str, Client],
]:
async with (
tractor.trionics.maybe_open_context(
# acm_func=open_client_proxies,
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio},
# lock around current actor task access
# TODO: maybe this should be the default in tractor?
key=tractor.current_actor().uid,
) as (cache_hit, (clients, from_aio)),
AsyncExitStack() as stack
): ):
client = client or _client if cache_hit:
async_meth = getattr(client, meth) log.info(f'Re-using cached clients: {clients}')
# handle streaming methods for acct_name, client in clients.items():
args = tuple(inspect.getfullargspec(async_meth).args) proxy = await stack.enter_async_context(
if to_trio and 'to_trio' in args: open_client_proxy(client),
kwargs['to_trio'] = to_trio
log.runtime(f'Running {meth}({kwargs})')
return await async_meth(**kwargs)
async def _trio_run_client_method(
method: str,
client: Optional[Client] = None,
**kwargs,
) -> None:
'''
Asyncio entry point to run tasks against the ``ib_insync`` api.
'''
ca = tractor.current_actor()
assert ca.is_infected_aio()
# if the method is an *async gen* stream for it
# meth = getattr(Client, method)
# args = tuple(inspect.getfullargspec(meth).args)
# if inspect.isasyncgenfunction(meth) or (
# # if the method is an *async func* but manually
# # streams back results, make sure to also stream it
# 'to_trio' in args
# ):
# kwargs['_treat_as_stream'] = True
return await to_asyncio.run_task(
_aio_run_client_method,
meth=method,
client=client,
**kwargs
) )
_proxies[acct_name] = proxy
yield _proxies, clients
def get_preferred_data_client(
clients: dict[str, Client],
) -> tuple[str, Client]:
'''
Load and return the (first found) `Client` instance that is
preferred and should be used for data by iterating, in priority
order, the ``ib.prefer_data_account: list[str]`` account names in
the users ``brokers.toml`` file.
'''
conf = get_config()
data_accounts = conf['prefer_data_account']
for name in data_accounts:
client = clients.get(f'ib.{name}')
if client:
return name, client
else:
raise ValueError(
'No preferred data client could be found:\n'
f'{data_accounts}'
)
@acm
async def open_data_client() -> MethodProxy:
'''
Open the first found preferred "data client" as defined in the
user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable
and deliver that client wrapped in a ``MethodProxy``.
'''
async with (
open_client_proxies() as (proxies, clients),
):
account_name, client = get_preferred_data_client(clients)
proxy = proxies.get(f'ib.{account_name}')
if not proxy:
raise ValueError(
f'No preferred data client could be found for {account_name}!'
)
yield proxy
class MethodProxy: class MethodProxy:
@ -1144,10 +1184,12 @@ class MethodProxy:
self, self,
chan: to_asyncio.LinkedTaskChannel, chan: to_asyncio.LinkedTaskChannel,
event_table: dict[str, trio.Event], event_table: dict[str, trio.Event],
asyncio_ns: SimpleNamespace,
) -> None: ) -> None:
self.chan = chan self.chan = chan
self.event_table = event_table self.event_table = event_table
self._aio_ns = asyncio_ns
async def _run_method( async def _run_method(
self, self,
@ -1213,22 +1255,18 @@ class MethodProxy:
async def open_aio_client_method_relay( async def open_aio_client_method_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
client: Client,
event_consumers: dict[str, trio.Event], event_consumers: dict[str, trio.Event],
) -> None: ) -> None:
async with load_aio_clients() as (
client,
clients,
accts2clients,
):
to_trio.send_nowait(client) to_trio.send_nowait(client)
# TODO: separate channel for error handling? # TODO: separate channel for error handling?
client.inline_errors(to_trio) client.inline_errors(to_trio)
# relay all method requests to ``asyncio``-side client and # relay all method requests to ``asyncio``-side client and deliver
# deliver back results # back results
while not to_trio._closed: while not to_trio._closed:
msg = await from_trio.get() msg = await from_trio.get()
if msg is None: if msg is None:
@ -1253,21 +1291,28 @@ async def open_aio_client_method_relay(
@acm @acm
async def open_client_proxy() -> MethodProxy: async def open_client_proxy(
client: Client,
) -> MethodProxy:
# try:
event_table = {} event_table = {}
async with ( async with (
to_asyncio.open_channel_from( to_asyncio.open_channel_from(
open_aio_client_method_relay, open_aio_client_method_relay,
client=client,
event_consumers=event_table, event_consumers=event_table,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as relay_n, trio.open_nursery() as relay_n,
): ):
assert isinstance(first, Client) assert isinstance(first, Client)
proxy = MethodProxy(chan, event_table) proxy = MethodProxy(
chan,
event_table,
asyncio_ns=first,
)
# mock all remote methods on ib ``Client``. # mock all remote methods on ib ``Client``.
for name, method in inspect.getmembers( for name, method in inspect.getmembers(
@ -1318,7 +1363,7 @@ async def get_client(
''' '''
# TODO: the IPC via portal relay layer for when this current # TODO: the IPC via portal relay layer for when this current
# actor isn't in aio mode. # actor isn't in aio mode.
async with open_client_proxy() as proxy: async with open_data_client() as proxy:
yield proxy yield proxy
@ -1463,10 +1508,15 @@ async def get_bars(
for _ in range(10): for _ in range(10):
try: try:
bars, bars_array = await proxy.bars( out = await proxy.bars(
fqsn=fqsn, fqsn=fqsn,
end_dt=end_dt, end_dt=end_dt,
) )
if out:
bars, bars_array = out
else:
await tractor.breakpoint()
if bars_array is None: if bars_array is None:
raise SymbolNotFound(fqsn) raise SymbolNotFound(fqsn)
@ -1529,29 +1579,69 @@ async def get_bars(
hist_ev = proxy.status_event( hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds' 'HMDS data farm connection is OK:ushmds'
) )
# live_ev = proxy.status_event(
# # 'Market data farm connection is OK:usfuture'
# 'Market data farm connection is OK:usfarm'
# )
# TODO: some kinda resp here that indicates success
# otherwise retry?
await data_reset_hack()
# TODO: a while loop here if we timeout? # XXX: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
tries: int = 2
timeout: float = 10
# try 3 time with a data reset then fail over to
# a connection reset.
for i in range(1, tries):
log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data')
with trio.move_on_after(timeout) as cs:
for name, ev in [ for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
break
if cs.cancelled_caught:
fails += 1
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else:
log.warning('Sending CONNECTION RESET')
await data_reset_hack(reset_type='connection')
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev), ('history', hist_ev),
# ('live', live_ev),
]: ]:
# with trio.move_on_after(22) as cs:
await ev.wait() await ev.wait()
log.info(f"{name} DATA RESET") log.info(f"{name} DATA RESET")
# if cs.cancelled_caught: if cs.cancelled_caught:
# log.warning("reset hack failed on first try?")
# await tractor.breakpoint()
fails += 1 fails += 1
continue log.warning('Data CONNECTION RESET timeout!?')
else: else:
raise raise
@ -1566,8 +1656,12 @@ async def open_history_client(
symbol: str, symbol: str,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
'''
History retreival endpoint - delivers a historical frame callble
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
async with open_client_proxy() as proxy: '''
async with open_data_client() as proxy:
async def get_hist( async def get_hist(
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
@ -1579,7 +1673,7 @@ async def open_history_client(
# TODO: add logic here to handle tradable hours and only grab # TODO: add logic here to handle tradable hours and only grab
# valid bars in the range # valid bars in the range
if out == (None, None): if out is None:
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?") log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData( raise NoData(
@ -1631,8 +1725,7 @@ async def backfill_bars(
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# async with open_history_client(fqsn) as proxy: async with open_data_client() as proxy:
async with open_client_proxy() as proxy:
out, fails = await get_bars(proxy, fqsn) out, fails = await get_bars(proxy, fqsn)
@ -1729,16 +1822,16 @@ async def _setup_quote_stream(
''' '''
Stream a ticker using the std L1 api. Stream a ticker using the std L1 api.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
''' '''
global _quote_streams global _quote_streams
to_trio.send_nowait(None) to_trio.send_nowait(None)
async with load_aio_clients() as ( async with load_aio_clients() as accts2clients:
client, caccount_name, client = get_preferred_data_client(accts2clients)
clients,
accts2clients,
):
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -1828,6 +1921,7 @@ async def open_aio_quote_stream(
_setup_quote_stream, _setup_quote_stream,
symbol=symbol, symbol=symbol,
contract=contract, contract=contract,
) as (first, from_aio): ) as (first, from_aio):
# cache feed for later consumers # cache feed for later consumers
@ -1858,19 +1952,24 @@ async def stream_quotes(
sym = symbols[0] sym = symbols[0]
log.info(f'request for real-time quotes: {sym}') log.info(f'request for real-time quotes: {sym}')
con, first_ticker, details = await _trio_run_client_method( async with open_data_client() as proxy:
method='get_sym_details',
symbol=sym, con, first_ticker, details = await proxy.get_sym_details(symbol=sym)
)
first_quote = normalize(first_ticker) first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}') # print(f'first quote: {first_quote}')
def mk_init_msgs() -> dict[str, dict]: def mk_init_msgs() -> dict[str, dict]:
'''
Collect a bunch of meta-data useful for feed startup and
pack in a `dict`-msg.
'''
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details) syminfo = asdict(details)
syminfo.update(syminfo['contract']) syminfo.update(syminfo['contract'])
# nested dataclass we probably don't need and that won't IPC serialize # nested dataclass we probably don't need and that won't IPC
# serialize
syminfo.pop('secIdList') syminfo.pop('secIdList')
# TODO: more consistent field translation # TODO: more consistent field translation
@ -1882,9 +1981,13 @@ async def stream_quotes(
syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick)
# for "traditional" assets, volume is normally discreet, not a float # for "traditional" assets, volume is normally discreet, not
# a float
syminfo['lot_tick_size'] = 0.0 syminfo['lot_tick_size'] = 0.0
ibclient = proxy._aio_ns.ib.client
host, port = ibclient.host, ibclient.port
# TODO: for loop through all symbols passed in # TODO: for loop through all symbols passed in
init_msgs = { init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
@ -1892,7 +1995,11 @@ async def stream_quotes(
sym: { sym: {
'symbol_info': syminfo, 'symbol_info': syminfo,
'fqsn': first_quote['fqsn'], 'fqsn': first_quote['fqsn'],
} },
'status': {
'data_ep': f'{host}:{port}',
},
} }
return init_msgs return init_msgs
@ -1901,10 +2008,7 @@ async def stream_quotes(
# TODO: we should instead spawn a task that waits on a feed to start # TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff. # and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1): with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method( contract, first_ticker, details = await proxy.get_quote(symbol=sym)
method='get_quote',
symbol=sym,
)
# it might be outside regular trading hours so see if we can at # it might be outside regular trading hours so see if we can at
# least grab history. # least grab history.
@ -2123,11 +2227,16 @@ async def trades_dialogue(
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
all_positions = [] all_positions = []
accounts = set() accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
async with trio.open_nursery() as nurse:
for account, client in _accounts2clients.items(): async with (
trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
# for account, client in _accounts2clients.items():
for account, proxy in proxies.items():
client = aioclients[account]
async def open_stream( async def open_stream(
task_status: TaskStatus[ task_status: TaskStatus[
@ -2149,7 +2258,7 @@ async def trades_dialogue(
assert account in accounts_def assert account in accounts_def
accounts.add(account) accounts.add(account)
for client in _client_cache.values(): for client in aioclients.values():
for pos in client.positions(): for pos in client.positions():
msg = pack_position(pos) msg = pack_position(pos)
@ -2160,6 +2269,16 @@ async def trades_dialogue(
all_positions.append(msg.dict()) all_positions.append(msg.dict())
trades: list[dict] = []
for proxy in proxies.values():
trades.append(await proxy.trades())
log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started(( await ctx.started((
all_positions, all_positions,
tuple(name for name in accounts_def if name in accounts), tuple(name for name in accounts_def if name in accounts),
@ -2352,9 +2471,11 @@ async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
# load all symbols locally for fast search
# TODO: load user defined symbol set locally for fast search?
await ctx.started({}) await ctx.started({})
async with open_data_client() as proxy:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
last = time.time() last = time.time()
@ -2404,11 +2525,10 @@ async def open_symbol_search(
async with trio.open_nursery() as sn: async with trio.open_nursery() as sn:
sn.start_soon( sn.start_soon(
stash_results, stash_results,
_trio_run_client_method( proxy.search_symbols(
method='search_symbols',
pattern=pattern, pattern=pattern,
upto=5, upto=5,
) ),
) )
# trigger async request # trigger async request
@ -2462,83 +2582,35 @@ async def data_reset_hack(
- integration with ``ib-gw`` run in docker + Xorg? - integration with ``ib-gw`` run in docker + Xorg?
''' '''
# TODO: try out this lib instead, seems to be the most modern
# and usess the underlying lib:
# https://github.com/rshk/python-libxdo
# TODO: seems to be a few libs for python but not sure async def vnc_click_hack(
# if they support all the sub commands we need, order of reset_type: str = 'data'
# most recent commit history: ) -> None:
# https://github.com/rr-/pyxdotool '''
# https://github.com/ShaneHutter/pyxdotool Reset the data or netowork connection for the VNC attached
# https://github.com/cphyc/pyxdotool ib gateway using magic combos.
try: '''
import i3ipc key = {'data': 'f', 'connection': 'r'}[reset_type]
except ImportError:
return False
log.warning('IB data hack no-supported on ur platformz')
i3 = i3ipc.Connection() import asyncvnc
t = i3.get_tree()
orig_win_id = t.find_focused().window async with asyncvnc.connect(
'localhost',
port=3003,
# password='ibcansmbz',
) as client:
# for tws # move to middle of screen
win_names: list[str] = [ # 640x1800
'Interactive Brokers', # tws running in i3 client.mouse.move(
'IB Gateway', # gw running in i3 x=500,
# 'IB', # gw running in i3 (newer version?) y=500,
] )
client.mouse.click()
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
combos: dict[str, str] = { await tractor.to_asyncio.run_task(vnc_click_hack)
# only required if we need a connection reset.
'connection': ('ctrl+alt+r', 12),
# data feed reset. # we don't really need the ``xdotool`` approach any more B)
'data': ('ctrl+alt+f', 6)
}
for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
key_combo, timeout = combos[reset_type]
# for key_combo, timeout in [
# # only required if we need a connection reset.
# # ('ctrl+alt+r', 12),
# # data feed reset.
# ('ctrl+alt+f', 6)
# ]:
await trio.run_process([
'xdotool',
'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where there should
# be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# hackzorzes
'key', key_combo,
# ],
# timeout=timeout,
])
# re-activate and focus original window
await trio.run_process([
'xdotool',
'windowactivate', '--sync', str(orig_win_id),
'click', '--window', str(orig_win_id), '1',
])
return True return True

View File

@ -46,6 +46,7 @@ import numpy as np
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_context from .._cacheables import maybe_open_context
from ..calc import humanize
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import ( from .._daemon import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
@ -1183,10 +1184,10 @@ class Feed:
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
first_quotes: dict # symbol names to first quote dicts first_quotes: dict # symbol names to first quote dicts
_portal: tractor.Portal _portal: tractor.Portal
stream: trio.abc.ReceiveChannel[dict[str, Any]] stream: trio.abc.ReceiveChannel[dict[str, Any]]
status: dict[str, Any]
throttle_rate: Optional[int] = None throttle_rate: Optional[int] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
@ -1327,9 +1328,24 @@ async def open_feed(
first_quotes=first_quotes, first_quotes=first_quotes,
stream=stream, stream=stream,
_portal=portal, _portal=portal,
status={},
throttle_rate=tick_throttle, throttle_rate=tick_throttle,
) )
# fill out "status info" that the UI can show
host, port = feed.portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
feed.status.update({
'actor_name': feed.portal.channel.uid[0],
'host': host,
'port': port,
'shm': f'{humanize(feed.shm._shm.size)}',
'throttle_rate': feed.throttle_rate,
})
feed.status.update(init_msg.pop('status', {}))
for sym, data in init_msg.items(): for sym, data in init_msg.items():
si = data['symbol_info'] si = data['symbol_info']
fqsn = data['fqsn'] + f'.{brokername}' fqsn = data['fqsn'] + f'.{brokername}'

View File

@ -0,0 +1,83 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Feed status and controls widget(s) for embedding in a UI-pane.
"""
from __future__ import annotations
from textwrap import dedent
from typing import TYPE_CHECKING
# from PyQt5.QtCore import Qt
from ._style import _font, _font_small
# from ..calc import humanize
from ._label import FormatLabel
if TYPE_CHECKING:
from ._chart import ChartPlotWidget
from ..data.feed import Feed
from ._forms import FieldsForm
def mk_feed_label(
form: FieldsForm,
feed: Feed,
chart: ChartPlotWidget,
) -> FormatLabel:
'''
Generate a label from feed meta-data to be displayed
in a UI sidepane.
TODO: eventually buttons for changing settings over
a feed control protocol.
'''
status = feed.status
assert status
msg = dedent("""
actor: **{actor_name}**\n
|_ @**{host}:{port}**\n
""")
for key, val in status.items():
if key in ('host', 'port', 'actor_name'):
continue
msg += f'\n|_ {key}: **{{{key}}}**\n'
feed_label = FormatLabel(
fmt_str=msg,
# |_ streams: **{symbols}**\n
font=_font.font,
font_size=_font_small.px_size,
font_color='default_lightest',
)
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
# form.vbox.setAlignment(Qt.AlignBottom)
_ = chart.height() - (
form.height() +
form.fill_bar.height()
# feed_label.height()
)
feed_label.format(**feed.status)
return feed_label

View File

@ -750,12 +750,12 @@ def mk_order_pane_layout(
parent=parent, parent=parent,
fields_schema={ fields_schema={
'account': { 'account': {
'label': '**account**:', 'label': '**accnt**:',
'type': 'select', 'type': 'select',
'default_value': ['paper'], 'default_value': ['paper'],
}, },
'size_unit': { 'size_unit': {
'label': '**allocate**:', 'label': '**alloc**:',
'type': 'select', 'type': 'select',
'default_value': [ 'default_value': [
'$ size', '$ size',

View File

@ -30,6 +30,7 @@ import uuid
from pydantic import BaseModel from pydantic import BaseModel
import tractor import tractor
import trio import trio
from PyQt5.QtCore import Qt
from .. import config from .. import config
from ..clearing._client import open_ems, OrderBook from ..clearing._client import open_ems, OrderBook
@ -37,6 +38,7 @@ from ..clearing._allocate import (
mk_allocator, mk_allocator,
Position, Position,
) )
from ._style import _font
from ..data._source import Symbol from ..data._source import Symbol
from ..data.feed import Feed from ..data.feed import Feed
from ..log import get_logger from ..log import get_logger
@ -46,7 +48,8 @@ from ._position import (
PositionTracker, PositionTracker,
SettingsPane, SettingsPane,
) )
from ._label import FormatLabel from ._forms import FieldsForm
# from ._label import FormatLabel
from ._window import MultiStatus from ._window import MultiStatus
from ..clearing._messages import Order, BrokerdPosition from ..clearing._messages import Order, BrokerdPosition
from ._forms import open_form_input_handling from ._forms import open_form_input_handling
@ -639,63 +642,21 @@ async def open_order_mode(
pp_tracker.hide_info() pp_tracker.hide_info()
# setup order mode sidepane widgets # setup order mode sidepane widgets
form = chart.sidepane form: FieldsForm = chart.sidepane
vbox = form.vbox form.vbox.setSpacing(
from textwrap import dedent
from PyQt5.QtCore import Qt
from ._style import _font, _font_small
from ..calc import humanize
feed_label = FormatLabel(
fmt_str=dedent("""
actor: **{actor_name}**\n
|_ @**{host}:{port}**\n
|_ throttle_hz: **{throttle_rate}**\n
|_ streams: **{symbols}**\n
|_ shm: **{shm}**\n
"""),
font=_font.font,
font_size=_font_small.px_size,
font_color='default_lightest',
)
form.feed_label = feed_label
# add feed info label to top
vbox.insertWidget(
0,
feed_label,
alignment=Qt.AlignBottom,
)
# vbox.setAlignment(feed_label, Qt.AlignBottom)
# vbox.setAlignment(Qt.AlignBottom)
_ = chart.height() - (
form.height() +
form.fill_bar.height()
# feed_label.height()
)
vbox.setSpacing(
int((1 + 5/8)*_font.px_size) int((1 + 5/8)*_font.px_size)
) )
# fill in brokerd feed info from ._feedstatus import mk_feed_label
host, port = feed.portal.channel.raddr
if host == '127.0.0.1': feed_label = mk_feed_label(
host = 'localhost' form,
mpshm = feed.shm._shm feed,
shmstr = f'{humanize(mpshm.size)}' chart,
form.feed_label.format(
actor_name=feed.portal.channel.uid[0],
host=host,
port=port,
symbols=len(feed.symbols),
shm=shmstr,
throttle_rate=feed.throttle_rate,
) )
# XXX: we set this because?
form.feed_label = feed_label
order_pane = SettingsPane( order_pane = SettingsPane(
form=form, form=form,
# XXX: ugh, so hideous... # XXX: ugh, so hideous...
@ -706,6 +667,11 @@ async def open_order_mode(
) )
order_pane.set_accounts(list(trackers.keys())) order_pane.set_accounts(list(trackers.keys()))
form.vbox.addWidget(
feed_label,
alignment=Qt.AlignBottom,
)
# update pp icons # update pp icons
for name, tracker in trackers.items(): for name, tracker in trackers.items():
order_pane.update_account_icons({name: tracker.live_pp}) order_pane.update_account_icons({name: tracker.live_pp})

View File

@ -15,3 +15,7 @@
# ``trimeter`` for asysnc history fetching # ``trimeter`` for asysnc history fetching
-e git+https://github.com/python-trio/trimeter.git@master#egg=trimeter -e git+https://github.com/python-trio/trimeter.git@master#egg=trimeter
# ``asyncvnc`` for sending interactions to ib-gw inside docker
-e git+https://github.com/pikers/asyncvnc.git@vid_passthrough#egg=asyncvnc