Merge pull request #213 from pikers/brokers_config

Brokers config schema bump, ib patches
pause_feeds_on_sym_switch
goodboy 2021-08-24 10:37:44 -04:00 committed by GitHub
commit f03f051e7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 19 deletions

View File

@ -0,0 +1,25 @@
[questrade]
refresh_token = ""
access_token = ""
api_server = "https://api06.iq.questrade.com/"
expires_in = 1800
token_type = "Bearer"
expires_at = 1616095326.355846
[kraken]
key_descr = "api_0"
public_key = ""
private_key = ""
[ib]
host = "127.0.0.1"
[ib.accounts]
margin = ""
registered = ""
paper = ""
[ib.ports]
gw = 4002
tws = 7497
order = [ "gw", "tws",]

View File

@ -1,9 +0,0 @@
[binance]
[kraken]
# [ib]
# [questrade]

View File

@ -40,6 +40,16 @@ def _override_config_dir(
def get_broker_conf_path():
"""Return the default config path normally under
``~/.config/piker`` on linux.
Contains files such as:
- brokers.toml
- watchlists.toml
- signals.toml
- strats.toml
"""
return os.path.join(_config_dir, _file_name)

View File

@ -31,6 +31,7 @@ from pprint import pformat
import inspect
import itertools
import logging
from random import randint
import time
import trio
@ -49,6 +50,7 @@ from ib_insync.client import Client as ib_Client
from fuzzywuzzy import process as fuzzy
import numpy as np
from . import config
from ..log import get_logger, get_console_log
from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df
@ -310,7 +312,8 @@ class Client:
unique_sym = f'{con.symbol}.{con.primaryExchange}'
as_dict = asdict(d)
# nested dataclass we probably don't need and that won't IPC serialize
# nested dataclass we probably don't need and that
# won't IPC serialize
as_dict.pop('secIdList')
details[unique_sym] = as_dict
@ -637,25 +640,47 @@ class Client:
# default config ports
_tws_port: int = 7497
_gw_port: int = 4002
_try_ports = [_gw_port, _tws_port]
_client_ids = itertools.count()
_try_ports = [
_gw_port,
_tws_port
]
# TODO: remove the randint stuff and use proper error checking in client
# factor below..
_client_ids = itertools.count(randint(1, 100))
_client_cache = {}
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('ib')
if section is None:
log.warning(f'No config section found for ib in {path}')
return {}
return section
@asynccontextmanager
async def _aio_get_client(
host: str = '127.0.0.1',
port: int = None,
client_id: Optional[int] = None,
) -> Client:
"""Return an ``ib_insync.IB`` instance wrapped in our client API.
'''Return an ``ib_insync.IB`` instance wrapped in our client API.
Client instances are cached for later use.
TODO: consider doing this with a ctx mngr eventually?
"""
# first check cache for existing client
'''
conf = get_config()
# first check cache for existing client
try:
if port:
client = _client_cache[(host, port)]
@ -666,6 +691,7 @@ async def _aio_get_client(
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.
@ -675,9 +701,31 @@ async def _aio_get_client(
client_id = next(_client_ids)
ib = NonShittyIB()
ports = _try_ports if port is None else [port]
# attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection.
host = conf.get('host', '127.0.0.1')
ports = conf.get(
'ports',
# default order is to check for gw first
{
'gw': 4002,
'tws': 7497,
'order': ['gw', 'tws']
}
)
order = ports['order']
try_ports = [ports[key] for key in order]
ports = try_ports if port is None else [port]
# TODO: support multiple clients allowing for execution on
# multiple accounts (including a paper instance running on the
# same machine) and switching between accounts in the EMs
_err = None
for port in ports:
try:
log.info(f"Connecting to the EYEBEE on port {port}!")
@ -1360,7 +1408,7 @@ async def trades_dialogue(
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly IB server fill time
'broker_time': execu.time, # supposedly server fill time
'name': 'ib',
}
@ -1401,14 +1449,14 @@ async def trades_dialogue(
if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg.external = True
msg.broker_details['external_src'] = 'tws'
continue
# XXX: we always serialize to a dict for msgpack