We already collect account values/names in `load_io_clients()`

no_ib_pps
Tyler Goodlet 2022-05-12 13:21:20 -04:00
parent d0530c4e26
commit de55565f60
1 changed files with 26 additions and 22 deletions

View File

@ -996,15 +996,22 @@ async def load_aio_clients(
accounts_def.inverse[pp.account] accounts_def.inverse[pp.account]
] = client ] = client
# if there are no positions or accounts # if there are accounts without positions we should still
# without positions we should still register # register them for this client
# them for this client
for value in ib.accountValues(): for value in ib.accountValues():
acct = value.account acct_number = value.account
if acct not in accounts_found:
accounts_found[ entry = accounts_def.inverse.get(acct_number)
accounts_def.inverse[acct] if not entry:
] = client raise ValueError(
'No section in brokers.toml for account:'
f' {acct_number}\n'
f'Please add entry to continue using this API client'
)
# surjection of account names to operating clients.
if acct_number not in accounts_found:
accounts_found[entry] = client
log.info( log.info(
f'Loaded accounts for client @ {host}:{port}\n' f'Loaded accounts for client @ {host}:{port}\n'
@ -1028,6 +1035,8 @@ async def load_aio_clients(
# torn down correctly ... # torn down correctly ...
tractor._actor._lifetime_stack.callback(pop_and_discon) tractor._actor._lifetime_stack.callback(pop_and_discon)
# XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found) _accounts2clients.update(accounts_found)
except ( except (
@ -2117,6 +2126,7 @@ async def trades_dialogue(
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
for account, client in _accounts2clients.items(): for account, client in _accounts2clients.items():
async def open_stream( async def open_stream(
@ -2136,25 +2146,19 @@ async def trades_dialogue(
clients.append((client, trade_event_stream)) clients.append((client, trade_event_stream))
assert account in accounts_def
accounts.add(account)
for client in _client_cache.values(): for client in _client_cache.values():
for pos in client.positions(): for pos in client.positions():
msg = pack_position(pos) msg = pack_position(pos)
msg.account = accounts_def.inverse[msg.account] msg.account = accounts_def.inverse[msg.account]
accounts.add(msg.account)
all_positions.append(msg.dict())
# the account has no open positions (pps) so assert msg.account in accounts, (
# we just deliver the accounts def for now? f'Position for unknown account: {msg.account}')
for value in client.ib.accountValues():
account = value.account all_positions.append(msg.dict())
if account not in accounts_def.inverse:
log.warning(
f'Client {client} defines an unknown account '
'(not in brokers.toml):\n'
f'{account}'
)
else:
accounts.add(accounts_def.inverse[account])
await ctx.started(( await ctx.started((
all_positions, all_positions,