diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index c4cab60d..a2166f07 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -219,6 +219,8 @@ class Client: Note: this client requires running inside an ``asyncio`` loop. """ + _contracts: dict[str, Contract] = {} + def __init__( self, @@ -229,7 +231,6 @@ class Client: self.ib.RaiseRequestErrors = True # contract cache - self._contracts: dict[str, Contract] = {} self._feeds: dict[str, trio.abc.SendChannel] = {} # NOTE: the ib.client here is "throttled" to 45 rps by default @@ -505,7 +506,7 @@ class Client: return contract, ticker, details # async to be consistent for the client proxy, and cuz why not. - async def submit_limit( + def submit_limit( self, # ignored since ib doesn't support defining your # own order id @@ -554,7 +555,7 @@ class Client: # their own weird client int counting ids.. return trade.order.orderId - async def submit_cancel( + def submit_cancel( self, reqid: str, ) -> None: @@ -571,6 +572,7 @@ class Client: async def recv_trade_updates( self, to_trio: trio.abc.SendChannel, + ) -> None: """Stream a ticker using the std L1 api. """ @@ -720,50 +722,32 @@ async def load_aio_clients( global _client_cache conf = get_config() + ib = None client = None - # first check cache for existing client - if port: - log.info(f'Loading requested client on port: {port}') - client = _client_cache.get((host, port)) + # attempt to get connection info from config; if no .toml entry + # exists, we try to load from a default localhost connection. + host = conf.get('host', '127.0.0.1') + ports = conf.get( + 'ports', - if client and client.ib.isConnected(): - yield client, _client_cache, _accounts2clients - return + # default order is to check for gw first + { + 'gw': 4002, + 'tws': 7497, + 'order': ['gw', 'tws'] + } + ) + order = ports['order'] + accounts_def = config.load_accounts('ib') + + try_ports = [ports[key] for key in order] + ports = try_ports if port is None else [port] + + we_connected = [] # allocate new and/or reload disconnected but cached clients try: - - # TODO: in case the arbiter has no record - # of existing brokerd we need to broadcast for one. - - if client_id is None: - # if this is a persistent brokerd, try to allocate a new id for - # each client - client_id = next(_client_ids) - - ib = NonShittyIB() - - # attempt to get connection info from config; if no .toml entry - # exists, we try to load from a default localhost connection. - host = conf.get('host', '127.0.0.1') - ports = conf.get( - 'ports', - - # default order is to check for gw first - { - 'gw': 4002, - 'tws': 7497, - 'order': ['gw', 'tws'] - } - ) - order = ports['order'] - - accounts_def = config.load_accounts('ib') - - try_ports = [ports[key] for key in order] - ports = try_ports if port is None else [port] - # TODO: support multiple clients allowing for execution on # multiple accounts (including a paper instance running on the # same machine) and switching between accounts in the EMs @@ -774,9 +758,15 @@ async def load_aio_clients( # from connection details in ``brokers.toml``. for port in ports: client = _client_cache.get((host, port)) - + accounts_found: dict[str, Client] = {} if not client or not client.ib.isConnected(): try: + ib = NonShittyIB() + + # if this is a persistent brokerd, try to allocate + # a new id for each client + client_id = next(_client_ids) + log.info(f"Connecting to the EYEBEE on port {port}!") await ib.connectAsync(host, port, clientId=client_id) @@ -789,7 +779,7 @@ async def load_aio_clients( pps = ib.positions() if pps: for pp in pps: - _accounts2clients[ + accounts_found[ accounts_def.inverse[pp.account] ] = client @@ -798,18 +788,21 @@ async def load_aio_clients( # them for this client for value in ib.accountValues(): acct = value.account - if acct not in _accounts2clients: - _accounts2clients[ + if acct not in accounts_found: + accounts_found[ accounts_def.inverse[acct] ] = client log.info( - f'Loaded accounts: {_accounts2clients} for {client} ' - f'@ {host}:{port}' + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' ) + # update all actor-global caches log.info(f"Caching client for {(host, port)}") _client_cache[(host, port)] = client + we_connected.append(client) + _accounts2clients.update(accounts_found) except ConnectionRefusedError as ce: _err = ce @@ -826,7 +819,8 @@ async def load_aio_clients( yield client, _client_cache, _accounts2clients except BaseException: - ib.disconnect() + for client in we_connected: + client.ib.disconnect() raise @@ -834,14 +828,16 @@ async def _aio_run_client_method( meth: str, to_trio=None, from_trio=None, + client=None, **kwargs, ) -> None: async with load_aio_clients() as ( - client, + _client, clients, accts2clients, ): + client = client or _client async_meth = getattr(client, meth) # handle streaming methods @@ -855,7 +851,9 @@ async def _aio_run_client_method( async def _trio_run_client_method( method: str, + client: Optional[Client] = None, **kwargs, + ) -> None: """Asyncio entry point to run tasks against the ``ib_insync`` api. @@ -875,12 +873,12 @@ async def _trio_run_client_method( ): kwargs['_treat_as_stream'] = True - result = await tractor.to_asyncio.run_task( + return await tractor.to_asyncio.run_task( _aio_run_client_method, meth=method, + client=client, **kwargs ) - return result class _MethodProxy: @@ -1371,11 +1369,11 @@ def pack_position(pos: Position) -> dict[str, Any]: async def handle_order_requests( ems_order_stream: tractor.MsgStream, + accounts_def: dict[str, str], ) -> None: global _accounts2clients - accounts_def = config.load_accounts('ib') # request_msg: dict async for request_msg in ems_order_stream: @@ -1415,15 +1413,13 @@ async def handle_order_requests( order = BrokerdOrder(**request_msg) # call our client api to submit the order - reqid = await _trio_run_client_method( - - method='submit_limit', + reqid = client.submit_limit( oid=order.oid, symbol=order.symbol, price=order.price, action=order.action, size=order.size, - account=order.account, + account=acct_number, # XXX: by default 0 tells ``ib_insync`` methods that # there is no existing order so ask the client to create @@ -1446,11 +1442,7 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) - - await _trio_run_client_method( - method='submit_cancel', - reqid=msg.reqid - ) + client.submit_cancel(reqid=msg.reqid) else: log.error(f'Unknown order command: {request_msg}') @@ -1467,169 +1459,202 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - ib_trade_events_stream = await _trio_run_client_method( - method='recv_trade_updates', - ) + accounts_def = config.load_accounts('ib') + global _accounts2clients global _client_cache # deliver positions to subscriber before anything else all_positions = {} - for client in _client_cache.values(): - for pos in client.positions(): - msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() + clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + for account, client in _accounts2clients.items(): + + # each client to an api endpoint will have it's own event stream + trade_event_stream = await _trio_run_client_method( + method='recv_trade_updates', + client=client, + ) + clients.append((client, trade_event_stream)) + + for client in _client_cache.values(): + for pos in client.positions(): + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() await ctx.started(all_positions) - action_map = {'BOT': 'buy', 'SLD': 'sell'} - async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream) + n.start_soon(handle_order_requests, ems_stream, accounts_def) - # TODO: for some reason we can receive a ``None`` here when the - # ib-gw goes down? Not sure exactly how that's happening looking - # at the eventkit code above but we should probably handle it... - async for event_name, item in ib_trade_events_stream: + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def + ) - log.info(f'ib sending {event_name}:\n{pformat(item)}') + # block until cancelled + await trio.sleep_forever() - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? +async def deliver_trade_events( - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], + trade_event_stream: trio.MemoryReceiveChannel, + ems_stream: tractor.MsgStream, + accounts_def: dict[str, str], - if event_name == 'status': +) -> None: + '''Format and relay all trade events for a given client to the EMS. - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... + ''' + action_map = {'BOT': 'buy', 'SLD': 'sell'} - # unwrap needed data from ib_insync internal types - trade: Trade = item - status: OrderStatus = trade.orderStatus + # TODO: for some reason we can receive a ``None`` here when the + # ib-gw goes down? Not sure exactly how that's happening looking + # at the eventkit code above but we should probably handle it... + async for event_name, item in trade_event_stream: - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = BrokerdStatus( + log.info(f'ib sending {event_name}:\n{pformat(item)}') - reqid=trade.order.orderId, - time_ns=time.time_ns(), # cuz why not - # account=client. + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) - # everyone doin camel case.. - status=status.status.lower(), # force lower case + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? - filled=status.filled, - reason=status.whyHeld, + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, + if event_name == 'status': - broker_details={'name': 'ib'}, - ) + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - elif event_name == 'fill': + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations - trade: Trade - fill: Fill - trade, fill = item - execu: Execution = fill.execution + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + account=accounts_def.inverse[trade.order.account], - # TODO: normalize out commissions details? - details = { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - 'commissions': asdict(fill.commissionReport), - 'broker_time': execu.time, # supposedly server fill time - 'name': 'ib', - } + # everyone doin camel case.. + status=status.status.lower(), # force lower case - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not + filled=status.filled, + reason=status.whyHeld, - action=action_map[execu.side], - size=execu.shares, - price=execu.price, + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, - broker_details=details, - # XXX: required by order mode currently - broker_time=details['broker_time'], + broker_details={'name': 'ib'}, + ) - ) + elif event_name == 'fill': - elif event_name == 'error': + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. - err: dict = item + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill + trade, fill = item + execu: Execution = fill.execution - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) + # TODO: normalize out commissions details? + details = { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissions': asdict(fill.commissionReport), + 'broker_time': execu.time, # supposedly server fill time + 'name': 'ib', + } - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not - # don't forward for now, it's unecessary.. but if we wanted to, - # msg = BrokerdError(**err) - continue + action=action_map[execu.side], + size=execu.shares, + price=execu.price, - elif event_name == 'position': - msg = pack_position(item) + broker_details=details, + # XXX: required by order mode currently + broker_time=details['broker_time'], - if getattr(msg, 'reqid', 0) < -1: + ) - # it's a trade event generated by TWS usage. - log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + elif event_name == 'error': - msg.reqid = 'tws-' + str(-1 * msg.reqid) + err: dict = item - # mark msg as from "external system" - # TODO: probably something better then this.. and start - # considering multiplayer/group trades tracking - msg.broker_details['external_src'] = 'tws' - continue + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) - # XXX: we always serialize to a dict for msgpack - # translations, ideally we can move to an msgspec (or other) - # encoder # that can be enabled in ``tractor`` ahead of - # time so we can pass through the message types directly. - await ems_stream.send(msg.dict()) + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') + + # TODO: what schema for this msg if we're going to make it + # portable across all backends? + # msg = BrokerdError(**err) + continue + + elif event_name == 'position': + msg = pack_position(item) + + if getattr(msg, 'reqid', 0) < -1: + + # it's a trade event generated by TWS usage. + log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + msg.reqid = 'tws-' + str(-1 * msg.reqid) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + msg.broker_details['external_src'] = 'tws' + continue + + # XXX: we always serialize to a dict for msgpack + # translations, ideally we can move to an msgspec (or other) + # encoder # that can be enabled in ``tractor`` ahead of + # time so we can pass through the message types directly. + await ems_stream.send(msg.dict()) @tractor.context