Add per-account order entry for ib

Make the `handle_order_requests()` tasks now lookup the appropriate API
client for a given account (or error if it can't be found) and use it
for submission. Account names are loaded from the
`brokers.toml::accounts.ib` section both UI side and in the `brokerd`.
Change `_aio_get_client()` to a `load_aio_client()` which now tries to
scan and load api clients for all connections defined in the config as
well as deliver the client cache and account lookup tables.
fsp_feeds
Tyler Goodlet 2021-09-08 15:55:45 -04:00
parent b01538f183
commit dedfb27a3a
1 changed files with 132 additions and 47 deletions

View File

@ -62,8 +62,7 @@ from ._util import SymbolNotFound, NoData
from ..clearing._messages import ( from ..clearing._messages import (
BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdPosition, BrokerdCancel, BrokerdPosition, BrokerdCancel,
BrokerdFill, BrokerdFill, BrokerdError,
# BrokerdError,
) )
@ -196,8 +195,8 @@ _adhoc_futes_set = {
'mgc.nymex', 'mgc.nymex',
'xagusd.cmdty', # silver spot 'xagusd.cmdty', # silver spot
'ni.nymex', # silver futes 'ni.nymex', # silver futes
'qi.comex', # mini-silver futes 'qi.comex', # mini-silver futes
} }
# exchanges we don't support at the moment due to not knowing # exchanges we don't support at the moment due to not knowing
@ -222,7 +221,9 @@ class Client:
""" """
def __init__( def __init__(
self, self,
ib: ibis.IB, ib: ibis.IB,
) -> None: ) -> None:
self.ib = ib self.ib = ib
self.ib.RaiseRequestErrors = True self.ib.RaiseRequestErrors = True
@ -513,7 +514,7 @@ class Client:
price: float, price: float,
action: str, action: str,
size: int, size: int,
account: str = '', # if blank the "default" tws account is used account: str, # if blank the "default" tws account is used
# XXX: by default 0 tells ``ib_insync`` methods that there is no # XXX: by default 0 tells ``ib_insync`` methods that there is no
# existing order so ask the client to create a new one (which it # existing order so ask the client to create a new one (which it
@ -536,6 +537,7 @@ class Client:
Order( Order(
orderId=reqid or 0, # stupid api devs.. orderId=reqid or 0, # stupid api devs..
action=action.upper(), # BUY/SELL action=action.upper(), # BUY/SELL
# lookup the literal account number by name here.
account=account, account=account,
orderType='LMT', orderType='LMT',
lmtPrice=price, lmtPrice=price,
@ -659,9 +661,10 @@ class Client:
self.ib.errorEvent.connect(push_err) self.ib.errorEvent.connect(push_err)
async def positions( def positions(
self, self,
account: str = '', account: str = '',
) -> list[Position]: ) -> list[Position]:
""" """
Retrieve position info for ``account``. Retrieve position info for ``account``.
@ -695,8 +698,11 @@ def get_config() -> dict[str, Any]:
return section return section
_accounts2clients: dict[str, Client] = {}
@asynccontextmanager @asynccontextmanager
async def _aio_get_client( async def load_aio_clients(
host: str = '127.0.0.1', host: str = '127.0.0.1',
port: int = None, port: int = None,
@ -710,23 +716,23 @@ async def _aio_get_client(
TODO: consider doing this with a ctx mngr eventually? TODO: consider doing this with a ctx mngr eventually?
''' '''
global _accounts2clients
global _client_cache
conf = get_config() conf = get_config()
client = None
# first check cache for existing client # first check cache for existing client
if port:
log.info(f'Loading requested client on port: {port}')
client = _client_cache.get((host, port))
if client and client.ib.isConnected():
yield client, _client_cache, _accounts2clients
return
# allocate new and/or reload disconnected but cached clients
try: try:
if port:
client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
if not client.ib.isConnected():
# we have a stale client to re-allocate
raise KeyError
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record # TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one. # of existing brokerd we need to broadcast for one.
@ -753,6 +759,8 @@ async def _aio_get_client(
) )
order = ports['order'] order = ports['order']
accounts_def = config.load_accounts('ib')
try_ports = [ports[key] for key in order] try_ports = [ports[key] for key in order]
ports = try_ports if port is None else [port] ports = try_ports if port is None else [port]
@ -762,29 +770,64 @@ async def _aio_get_client(
_err = None _err = None
# (re)load any and all clients that can be found
# from connection details in ``brokers.toml``.
for port in ports: for port in ports:
try: client = _client_cache.get((host, port))
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id) if not client or not client.ib.isConnected():
break try:
except ConnectionRefusedError as ce: log.info(f"Connecting to the EYEBEE on port {port}!")
_err = ce await ib.connectAsync(host, port, clientId=client_id)
log.warning(f'Failed to connect on {port}')
# create and cache client
client = Client(ib)
# Pre-collect all accounts available for this
# connection and map account names to this client
# instance.
pps = ib.positions()
if pps:
for pp in pps:
_accounts2clients[
accounts_def.inverse[pp.account]
] = client
# if there are no positions or accounts
# without positions we should still register
# them for this client
for value in ib.accountValues():
acct = value.account
if acct not in _accounts2clients:
_accounts2clients[
accounts_def.inverse[acct]
] = client
log.info(
f'Loaded accounts: {_accounts2clients} for {client} '
f'@ {host}:{port}'
)
log.info(f"Caching client for {(host, port)}")
_client_cache[(host, port)] = client
except ConnectionRefusedError as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
else: else:
raise ConnectionRefusedError(_err) if not _client_cache:
raise ConnectionRefusedError(_err)
# create and cache # retreive first loaded client
try: clients = list(_client_cache.values())
client = Client(ib) if clients:
client = clients[0]
_client_cache[(host, port)] = client yield client, _client_cache, _accounts2clients
log.debug(f"Caching client for {(host, port)}")
yield client except BaseException:
ib.disconnect()
except BaseException: raise
ib.disconnect()
raise
async def _aio_run_client_method( async def _aio_run_client_method(
@ -793,8 +836,12 @@ async def _aio_run_client_method(
from_trio=None, from_trio=None,
**kwargs, **kwargs,
) -> None: ) -> None:
async with _aio_get_client() as client:
async with load_aio_clients() as (
client,
clients,
accts2clients,
):
async_meth = getattr(client, meth) async_meth = getattr(client, meth)
# handle streaming methods # handle streaming methods
@ -1081,8 +1128,11 @@ async def _setup_quote_stream(
""" """
global _quote_streams global _quote_streams
async with _aio_get_client() as client: async with load_aio_clients() as (
client,
clients,
accts2clients,
):
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -1324,11 +1374,41 @@ async def handle_order_requests(
) -> None: ) -> None:
global _accounts2clients
accounts_def = config.load_accounts('ib')
# request_msg: dict # request_msg: dict
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}') log.info(f'Received order request {request_msg}')
action = request_msg['action'] action = request_msg['action']
account = request_msg['account']
acct_number = accounts_def.get(account)
if not acct_number:
log.error(
f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
continue
client = _accounts2clients.get(account)
if not client:
log.error(
f'An IB client for account name {account} is not found.\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
# validate # validate
@ -1343,6 +1423,7 @@ async def handle_order_requests(
price=order.price, price=order.price,
action=order.action, action=order.action,
size=order.size, size=order.size,
account=order.account,
# XXX: by default 0 tells ``ib_insync`` methods that # XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create # there is no existing order so ask the client to create
@ -1359,6 +1440,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
account=account,
).dict() ).dict()
) )
@ -1388,15 +1470,16 @@ async def trades_dialogue(
ib_trade_events_stream = await _trio_run_client_method( ib_trade_events_stream = await _trio_run_client_method(
method='recv_trade_updates', method='recv_trade_updates',
) )
global _accounts2clients
global _client_cache
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
positions = await _trio_run_client_method(method='positions')
all_positions = {} all_positions = {}
for pos in positions: for client in _client_cache.values():
msg = pack_position(pos) for pos in client.positions():
all_positions[msg.symbol] = msg.dict() msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
await ctx.started(all_positions) await ctx.started(all_positions)
@ -1413,7 +1496,8 @@ async def trades_dialogue(
# ib-gw goes down? Not sure exactly how that's happening looking # ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it... # at the eventkit code above but we should probably handle it...
async for event_name, item in ib_trade_events_stream: async for event_name, item in ib_trade_events_stream:
print(f' ib sending {item}')
log.info(f'ib sending {event_name}:\n{pformat(item)}')
# TODO: templating the ib statuses in comparison with other # TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go: # brokers is likely the way to go:
@ -1453,6 +1537,7 @@ async def trades_dialogue(
reqid=trade.order.orderId, reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not time_ns=time.time_ns(), # cuz why not
# account=client.
# everyone doin camel case.. # everyone doin camel case..
status=status.status.lower(), # force lower case status=status.status.lower(), # force lower case