Make `ib` backend multi-client capable

This adds full support for a single `brokerd` managing multiple API
endpoint clients in tandem. Get the client scan loop correct and load
accounts from all discovered clients as specified in a user's
`broker.toml`. We now just always re-scan for all clients and if there's
a cache hit just skip a creation/connection logic.

Route orders with an account name to the correct client in the
`handle_order_requests()` endpoint and spawn an event relay task per
client for transmitting trade events back to `emsd`.
fsp_feeds
Tyler Goodlet 2021-09-09 07:57:11 -04:00
parent dedfb27a3a
commit c53b8ec43c
1 changed files with 202 additions and 177 deletions

View File

@ -219,6 +219,8 @@ class Client:
Note: this client requires running inside an ``asyncio`` loop. Note: this client requires running inside an ``asyncio`` loop.
""" """
_contracts: dict[str, Contract] = {}
def __init__( def __init__(
self, self,
@ -229,7 +231,6 @@ class Client:
self.ib.RaiseRequestErrors = True self.ib.RaiseRequestErrors = True
# contract cache # contract cache
self._contracts: dict[str, Contract] = {}
self._feeds: dict[str, trio.abc.SendChannel] = {} self._feeds: dict[str, trio.abc.SendChannel] = {}
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
@ -505,7 +506,7 @@ class Client:
return contract, ticker, details return contract, ticker, details
# async to be consistent for the client proxy, and cuz why not. # async to be consistent for the client proxy, and cuz why not.
async def submit_limit( def submit_limit(
self, self,
# ignored since ib doesn't support defining your # ignored since ib doesn't support defining your
# own order id # own order id
@ -554,7 +555,7 @@ class Client:
# their own weird client int counting ids.. # their own weird client int counting ids..
return trade.order.orderId return trade.order.orderId
async def submit_cancel( def submit_cancel(
self, self,
reqid: str, reqid: str,
) -> None: ) -> None:
@ -571,6 +572,7 @@ class Client:
async def recv_trade_updates( async def recv_trade_updates(
self, self,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
@ -720,50 +722,32 @@ async def load_aio_clients(
global _client_cache global _client_cache
conf = get_config() conf = get_config()
ib = None
client = None client = None
# first check cache for existing client # attempt to get connection info from config; if no .toml entry
if port: # exists, we try to load from a default localhost connection.
log.info(f'Loading requested client on port: {port}') host = conf.get('host', '127.0.0.1')
client = _client_cache.get((host, port)) ports = conf.get(
'ports',
if client and client.ib.isConnected(): # default order is to check for gw first
yield client, _client_cache, _accounts2clients {
return 'gw': 4002,
'tws': 7497,
'order': ['gw', 'tws']
}
)
order = ports['order']
accounts_def = config.load_accounts('ib')
try_ports = [ports[key] for key in order]
ports = try_ports if port is None else [port]
we_connected = []
# allocate new and/or reload disconnected but cached clients # allocate new and/or reload disconnected but cached clients
try: try:
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.
if client_id is None:
# if this is a persistent brokerd, try to allocate a new id for
# each client
client_id = next(_client_ids)
ib = NonShittyIB()
# attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection.
host = conf.get('host', '127.0.0.1')
ports = conf.get(
'ports',
# default order is to check for gw first
{
'gw': 4002,
'tws': 7497,
'order': ['gw', 'tws']
}
)
order = ports['order']
accounts_def = config.load_accounts('ib')
try_ports = [ports[key] for key in order]
ports = try_ports if port is None else [port]
# TODO: support multiple clients allowing for execution on # TODO: support multiple clients allowing for execution on
# multiple accounts (including a paper instance running on the # multiple accounts (including a paper instance running on the
# same machine) and switching between accounts in the EMs # same machine) and switching between accounts in the EMs
@ -774,9 +758,15 @@ async def load_aio_clients(
# from connection details in ``brokers.toml``. # from connection details in ``brokers.toml``.
for port in ports: for port in ports:
client = _client_cache.get((host, port)) client = _client_cache.get((host, port))
accounts_found: dict[str, Client] = {}
if not client or not client.ib.isConnected(): if not client or not client.ib.isConnected():
try: try:
ib = NonShittyIB()
# if this is a persistent brokerd, try to allocate
# a new id for each client
client_id = next(_client_ids)
log.info(f"Connecting to the EYEBEE on port {port}!") log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id) await ib.connectAsync(host, port, clientId=client_id)
@ -789,7 +779,7 @@ async def load_aio_clients(
pps = ib.positions() pps = ib.positions()
if pps: if pps:
for pp in pps: for pp in pps:
_accounts2clients[ accounts_found[
accounts_def.inverse[pp.account] accounts_def.inverse[pp.account]
] = client ] = client
@ -798,18 +788,21 @@ async def load_aio_clients(
# them for this client # them for this client
for value in ib.accountValues(): for value in ib.accountValues():
acct = value.account acct = value.account
if acct not in _accounts2clients: if acct not in accounts_found:
_accounts2clients[ accounts_found[
accounts_def.inverse[acct] accounts_def.inverse[acct]
] = client ] = client
log.info( log.info(
f'Loaded accounts: {_accounts2clients} for {client} ' f'Loaded accounts for client @ {host}:{port}\n'
f'@ {host}:{port}' f'{pformat(accounts_found)}'
) )
# update all actor-global caches
log.info(f"Caching client for {(host, port)}") log.info(f"Caching client for {(host, port)}")
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
we_connected.append(client)
_accounts2clients.update(accounts_found)
except ConnectionRefusedError as ce: except ConnectionRefusedError as ce:
_err = ce _err = ce
@ -826,7 +819,8 @@ async def load_aio_clients(
yield client, _client_cache, _accounts2clients yield client, _client_cache, _accounts2clients
except BaseException: except BaseException:
ib.disconnect() for client in we_connected:
client.ib.disconnect()
raise raise
@ -834,14 +828,16 @@ async def _aio_run_client_method(
meth: str, meth: str,
to_trio=None, to_trio=None,
from_trio=None, from_trio=None,
client=None,
**kwargs, **kwargs,
) -> None: ) -> None:
async with load_aio_clients() as ( async with load_aio_clients() as (
client, _client,
clients, clients,
accts2clients, accts2clients,
): ):
client = client or _client
async_meth = getattr(client, meth) async_meth = getattr(client, meth)
# handle streaming methods # handle streaming methods
@ -855,7 +851,9 @@ async def _aio_run_client_method(
async def _trio_run_client_method( async def _trio_run_client_method(
method: str, method: str,
client: Optional[Client] = None,
**kwargs, **kwargs,
) -> None: ) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api. """Asyncio entry point to run tasks against the ``ib_insync`` api.
@ -875,12 +873,12 @@ async def _trio_run_client_method(
): ):
kwargs['_treat_as_stream'] = True kwargs['_treat_as_stream'] = True
result = await tractor.to_asyncio.run_task( return await tractor.to_asyncio.run_task(
_aio_run_client_method, _aio_run_client_method,
meth=method, meth=method,
client=client,
**kwargs **kwargs
) )
return result
class _MethodProxy: class _MethodProxy:
@ -1371,11 +1369,11 @@ def pack_position(pos: Position) -> dict[str, Any]:
async def handle_order_requests( async def handle_order_requests(
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
accounts_def: dict[str, str],
) -> None: ) -> None:
global _accounts2clients global _accounts2clients
accounts_def = config.load_accounts('ib')
# request_msg: dict # request_msg: dict
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
@ -1415,15 +1413,13 @@ async def handle_order_requests(
order = BrokerdOrder(**request_msg) order = BrokerdOrder(**request_msg)
# call our client api to submit the order # call our client api to submit the order
reqid = await _trio_run_client_method( reqid = client.submit_limit(
method='submit_limit',
oid=order.oid, oid=order.oid,
symbol=order.symbol, symbol=order.symbol,
price=order.price, price=order.price,
action=order.action, action=order.action,
size=order.size, size=order.size,
account=order.account, account=acct_number,
# XXX: by default 0 tells ``ib_insync`` methods that # XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create # there is no existing order so ask the client to create
@ -1446,11 +1442,7 @@ async def handle_order_requests(
elif action == 'cancel': elif action == 'cancel':
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=msg.reqid)
await _trio_run_client_method(
method='submit_cancel',
reqid=msg.reqid
)
else: else:
log.error(f'Unknown order command: {request_msg}') log.error(f'Unknown order command: {request_msg}')
@ -1467,169 +1459,202 @@ async def trades_dialogue(
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
ib_trade_events_stream = await _trio_run_client_method( accounts_def = config.load_accounts('ib')
method='recv_trade_updates',
)
global _accounts2clients global _accounts2clients
global _client_cache global _client_cache
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
all_positions = {} all_positions = {}
for client in _client_cache.values(): clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
for pos in client.positions(): for account, client in _accounts2clients.items():
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict() # each client to an api endpoint will have it's own event stream
trade_event_stream = await _trio_run_client_method(
method='recv_trade_updates',
client=client,
)
clients.append((client, trade_event_stream))
for client in _client_cache.values():
for pos in client.positions():
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
await ctx.started(all_positions) await ctx.started(all_positions)
action_map = {'BOT': 'buy', 'SLD': 'sell'}
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
# start order request handler **before** local trades event loop # start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream) n.start_soon(handle_order_requests, ems_stream, accounts_def)
# TODO: for some reason we can receive a ``None`` here when the # allocate event relay tasks for each client connection
# ib-gw goes down? Not sure exactly how that's happening looking for client, stream in clients:
# at the eventkit code above but we should probably handle it... n.start_soon(
async for event_name, item in ib_trade_events_stream: deliver_trade_events,
stream,
ems_stream,
accounts_def
)
log.info(f'ib sending {event_name}:\n{pformat(item)}') # block until cancelled
await trio.sleep_forever()
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# XXX: here's some other sucky cases from the api async def deliver_trade_events(
# - short-sale but securities haven't been located, in this
# case we should probably keep the order in some kind of
# weird state or cancel it outright?
# status='PendingSubmit', message=''), trade_event_stream: trio.MemoryReceiveChannel,
# status='Cancelled', message='Error 404, ems_stream: tractor.MsgStream,
# reqId 1550: Order held while securities are located.'), accounts_def: dict[str, str],
# status='PreSubmitted', message='')],
if event_name == 'status': ) -> None:
'''Format and relay all trade events for a given client to the EMS.
# XXX: begin normalization of nonsense ib_insync internal '''
# object-state tracking representations... action_map = {'BOT': 'buy', 'SLD': 'sell'}
# unwrap needed data from ib_insync internal types # TODO: for some reason we can receive a ``None`` here when the
trade: Trade = item # ib-gw goes down? Not sure exactly how that's happening looking
status: OrderStatus = trade.orderStatus # at the eventkit code above but we should probably handle it...
async for event_name, item in trade_event_stream:
# skip duplicate filled updates - we get the deats log.info(f'ib sending {event_name}:\n{pformat(item)}')
# from the execution details event
msg = BrokerdStatus(
reqid=trade.order.orderId, # TODO: templating the ib statuses in comparison with other
time_ns=time.time_ns(), # cuz why not # brokers is likely the way to go:
# account=client. # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case.. # XXX: here's some other sucky cases from the api
status=status.status.lower(), # force lower case # - short-sale but securities haven't been located, in this
# case we should probably keep the order in some kind of
# weird state or cancel it outright?
filled=status.filled, # status='PendingSubmit', message=''),
reason=status.whyHeld, # status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
# this seems to not be necessarily up to date in the if event_name == 'status':
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
broker_details={'name': 'ib'}, # XXX: begin normalization of nonsense ib_insync internal
) # object-state tracking representations...
elif event_name == 'fill': # unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
# for wtv reason this is a separate event type # skip duplicate filled updates - we get the deats
# from IB, not sure why it's needed other then for extra # from the execution details event
# complexity and over-engineering :eyeroll:. msg = BrokerdStatus(
# we may just end up dropping these events (or
# translating them to ``Status`` msgs) if we can
# show the equivalent status events are no more latent.
# unpack ib_insync types reqid=trade.order.orderId,
# pep-0526 style: time_ns=time.time_ns(), # cuz why not
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations account=accounts_def.inverse[trade.order.account],
trade: Trade
fill: Fill
trade, fill = item
execu: Execution = fill.execution
# TODO: normalize out commissions details? # everyone doin camel case..
details = { status=status.status.lower(), # force lower case
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly server fill time
'name': 'ib',
}
msg = BrokerdFill( filled=status.filled,
# should match the value returned from `.submit_limit()` reason=status.whyHeld,
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
action=action_map[execu.side], # this seems to not be necessarily up to date in the
size=execu.shares, # execDetails event.. so we have to send it here I guess?
price=execu.price, remaining=status.remaining,
broker_details=details, broker_details={'name': 'ib'},
# XXX: required by order mode currently )
broker_time=details['broker_time'],
) elif event_name == 'fill':
elif event_name == 'error': # for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra
# complexity and over-engineering :eyeroll:.
# we may just end up dropping these events (or
# translating them to ``Status`` msgs) if we can
# show the equivalent status events are no more latent.
err: dict = item # unpack ib_insync types
# pep-0526 style:
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade
fill: Fill
trade, fill = item
execu: Execution = fill.execution
# f$#$% gawd dammit insync.. # TODO: normalize out commissions details?
con = err['contract'] details = {
if isinstance(con, Contract): 'contract': asdict(fill.contract),
err['contract'] = asdict(con) 'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly server fill time
'name': 'ib',
}
if err['reqid'] == -1: msg = BrokerdFill(
log.error(f'TWS external order error:\n{pformat(err)}') # should match the value returned from `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
# don't forward for now, it's unecessary.. but if we wanted to, action=action_map[execu.side],
# msg = BrokerdError(**err) size=execu.shares,
continue price=execu.price,
elif event_name == 'position': broker_details=details,
msg = pack_position(item) # XXX: required by order mode currently
broker_time=details['broker_time'],
if getattr(msg, 'reqid', 0) < -1: )
# it's a trade event generated by TWS usage. elif event_name == 'error':
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
msg.reqid = 'tws-' + str(-1 * msg.reqid) err: dict = item
# mark msg as from "external system" # f$#$% gawd dammit insync..
# TODO: probably something better then this.. and start con = err['contract']
# considering multiplayer/group trades tracking if isinstance(con, Contract):
msg.broker_details['external_src'] = 'tws' err['contract'] = asdict(con)
continue
# XXX: we always serialize to a dict for msgpack if err['reqid'] == -1:
# translations, ideally we can move to an msgspec (or other) log.error(f'TWS external order error:\n{pformat(err)}')
# encoder # that can be enabled in ``tractor`` ahead of
# time so we can pass through the message types directly. # TODO: what schema for this msg if we're going to make it
await ems_stream.send(msg.dict()) # portable across all backends?
# msg = BrokerdError(**err)
continue
elif event_name == 'position':
msg = pack_position(item)
if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg.broker_details['external_src'] = 'tws'
continue
# XXX: we always serialize to a dict for msgpack
# translations, ideally we can move to an msgspec (or other)
# encoder # that can be enabled in ``tractor`` ahead of
# time so we can pass through the message types directly.
await ems_stream.send(msg.dict())
@tractor.context @tractor.context