diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 20865e30..b861d895 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -996,15 +996,22 @@ async def load_aio_clients( accounts_def.inverse[pp.account] ] = client - # if there are no positions or accounts - # without positions we should still register - # them for this client + # if there are accounts without positions we should still + # register them for this client for value in ib.accountValues(): - acct = value.account - if acct not in accounts_found: - accounts_found[ - accounts_def.inverse[acct] - ] = client + acct_number = value.account + + entry = accounts_def.inverse.get(acct_number) + if not entry: + raise ValueError( + 'No section in brokers.toml for account:' + f' {acct_number}\n' + f'Please add entry to continue using this API client' + ) + + # surjection of account names to operating clients. + if acct_number not in accounts_found: + accounts_found[entry] = client log.info( f'Loaded accounts for client @ {host}:{port}\n' @@ -1028,6 +1035,8 @@ async def load_aio_clients( # torn down correctly ... tractor._actor._lifetime_stack.callback(pop_and_discon) + # XXX: why aren't we just updating this directy above + # instead of using the intermediary `accounts_found`? _accounts2clients.update(accounts_found) except ( @@ -2117,6 +2126,7 @@ async def trades_dialogue( clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] async with trio.open_nursery() as nurse: + for account, client in _accounts2clients.items(): async def open_stream( @@ -2136,11 +2146,18 @@ async def trades_dialogue( clients.append((client, trade_event_stream)) + assert account in accounts_def + accounts.add(account) + for client in _client_cache.values(): for pos in client.positions(): + msg = pack_position(pos) msg.account = accounts_def.inverse[msg.account] - accounts.add(msg.account) + + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + all_positions.append(msg.dict()) await ctx.started((