From c7b84ab5004bbe4a428e12b4aada99100361b5ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Jul 2022 15:14:24 -0400 Subject: [PATCH] Port position calcs to new ctx mngr apis and drop multi-loop madness --- piker/brokers/ib/broker.py | 191 +++++++++++++++++++------------------ 1 file changed, 98 insertions(+), 93 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 12c7dd38..acf055dc 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -53,7 +53,6 @@ import pendulum from piker import config from piker.pp import ( - # update_pps_conf, Position, Transaction, open_trade_ledger, @@ -426,6 +425,8 @@ async def trades_dialogue( all_positions = [] accounts = set() clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + acctids = set() + cids2pps: dict[str, BrokerdPosition] = {} # TODO: this causes a massive tractor bug when you run marketstored # with ``--tsdb``... you should get: @@ -435,37 +436,13 @@ async def trades_dialogue( # - hitting final control-c to kill daemon will lead to hang # assert 0 + # TODO: just write on teardown? + # we might also want to delegate a specific actor for + # ledger writing / reading for speed? async with ( trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): - for account, proxy in proxies.items(): - - client = aioclients[account] - - async def open_trade_event_stream( - task_status: TaskStatus[ - trio.abc.ReceiveChannel - ] = trio.TASK_STATUS_IGNORED, - ): - # each api client has a unique event stream - async with tractor.to_asyncio.open_channel_from( - recv_trade_updates, - client=client, - ) as (first, trade_event_stream): - - task_status.started(trade_event_stream) - await trio.sleep_forever() - - trade_event_stream = await nurse.start(open_trade_event_stream) - - clients.append((client, trade_event_stream)) - - assert account in accounts_def - accounts.add(account) - - cids2pps: dict[str, BrokerdPosition] = {} - # Open a trade ledgers stack for appending trade records over # multiple accounts. # TODO: we probably want to generalize this into a "ledgers" api.. @@ -474,73 +451,106 @@ async def trades_dialogue( with ( ExitStack() as lstack, ): + for account, proxy in proxies.items(): - # process pp value reported from ib's system. we only use these - # to cross-check sizing since average pricing on their end uses - # the so called (bs) "FIFO" style which more or less results in - # a price that's not useful for traders who want to not lose - # money.. xb - acctids = set() - for client in aioclients.values(): - for pos in client.positions(): + acctid = account.strip('ib.') + acctids.add(acctid) - # collect all ib-pp reported positions so that we can be - # sure know which positions to update from the ledger if - # any are missing from the ``pps.toml`` - cid, msg = pack_position(pos) - acctid = msg.account = accounts_def.inverse[msg.account] - acctid = acctid.strip('ib.') - acctids.add(acctid) - - cids2pps[(acctid, cid)] = msg - assert msg.account in accounts, ( - f'Position for unknown account: {msg.account}') - - for acctid in acctids: # open ledger and pptable wrapper for each # detected account. - ledgers[acctid] = lstack.enter_context( + ledger = ledgers[acctid] = lstack.enter_context( open_trade_ledger('ib', acctid) ) - tables[acctid] = lstack.enter_context( + table = tables[acctid] = lstack.enter_context( open_pps('ib', acctid) ) - # update trades ledgers for all accounts from - # connected api clients which report trades for **this session**. - for account, proxy in proxies.items(): + client = aioclients[account] + async def open_trade_event_stream( + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, + ): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + + task_status.started(trade_event_stream) + await trio.sleep_forever() + + trade_event_stream = await nurse.start(open_trade_event_stream) + clients.append((client, trade_event_stream)) + + assert account in accounts_def + accounts.add(account) + + # update trades ledgers for all accounts from connected + # api clients which report trades for **this session**. trades = await proxy.trades() ( trans_by_acct, - ready_for_ledger_entries, + api_ready_for_ledger_entries, ) = await update_ledger_from_api_trades( trades, proxy, ) - acctid = account.strip('ib.') - ledger = ledgers[acctid] - ledger.update(ready_for_ledger_entries[acctid]) + # if new trades are detected from the API, prepare + # them for the ledger file and update the pptable. + if api_ready_for_ledger_entries: + trade_entries = api_ready_for_ledger_entries[acctid] + ledger.update(trade_entries) + trans = trans_by_acct.get(acctid) + if trans: + table.update_from_trans(trans) - trans = trans_by_acct.get(acctid) - if trans: - tables[acctid].update_from_trans(trans) + # process pp value reported from ib's system. we only use these + # to cross-check sizing since average pricing on their end uses + # the so called (bs) "FIFO" style which more or less results in + # a price that's not useful for traders who want to not lose + # money.. xb + # for client in aioclients.values(): + for pos in client.positions(): - # load all positions from `pps.toml`, cross check with ib's - # positions data, and relay re-formatted pps as msgs to the ems. - # __2 cases__: - # - new trades have taken place this session that we want to - # always reprocess indempotently, - # - no new trades yet but we want to reload and audit any - # positions reported by ib's sys that may not yet be in - # piker's ``pps.toml`` state-file. - for acctid in acctids: + # collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + bsuid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] + acctid = acctid.strip('ib.') + cids2pps[(acctid, bsuid)] = msg + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + + table = tables[acctid] + pp = table.pps.get(bsuid) + if ( + not pp + or pp.size != msg.size + ): + trans = norm_trade_records(ledger) + updated = table.update_from_trans(trans) + pp = updated[bsuid] + assert msg.size == pp.size, 'WTF' + + # TODO: figure out why these don't match? + # assert pp.calc_be_price() == pp.be_price - table = tables[acctid] _, closed_pps = table.dump_active('ib') active_pps = table.pps + # load all positions from `pps.toml`, cross check with + # ib's positions data, and relay re-formatted pps as + # msgs to the ems. + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. for pps in [active_pps, closed_pps]: msgs = await update_and_audit_msgs( acctid, @@ -556,24 +566,14 @@ async def trades_dialogue( f'{pformat(cids2pps)}' ) - # log.info(f'Loaded {len(trades)} from this session') - # TODO: write trades to local ``trades.toml`` - # - use above per-session trades data and write to local file - # - get the "flex reports" working and pull historical data and - # also save locally. - await ctx.started(( all_positions, tuple(name for name in accounts_def if name in accounts), )) - # TODO: maybe just write on teardown? - # we might also want to delegate a specific actor for - # ledger writing / reading for speed? - # write ledger with all new trades **AFTER** we've updated the # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ready_for_ledger_entries.items(): + for acctid, trades_by_id in api_ready_for_ledger_entries.items(): ledgers[acctid].update(trades_by_id) async with ( @@ -621,7 +621,7 @@ async def emit_pp_update( acctid = acctid.strip('ib.') ( records_by_acct, - ready_for_ledger_entries, + api_ready_for_ledger_entries, ) = await update_ledger_from_api_trades( [trade_entry], proxy, @@ -635,7 +635,7 @@ async def emit_pp_update( active = table.pps # NOTE: update ledger with all new trades - for acctid, trades_by_id in ready_for_ledger_entries.items(): + for acctid, trades_by_id in api_ready_for_ledger_entries.items(): ledger = ledgers[acctid] ledger.update(trades_by_id) @@ -1128,8 +1128,7 @@ def load_flex_trades( trade_entries = report.extract('Trade') ln = len(trade_entries) - # log.info(f'Loaded {ln} trades from flex query') - print(f'Loaded {ln} trades from flex query') + log.info(f'Loaded {ln} trades from flex query') trades_by_account = trades_to_ledger_entries( # get reverse map to user account names @@ -1138,14 +1137,20 @@ def load_flex_trades( source_type='flex', ) - ledgers = {} - for acctid, trades_by_id in trades_by_account.items(): - with open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) + for acctid in trades_by_account: + trades_by_id = trades_by_account[acctid] + with open_trade_ledger('ib', acctid) as ledger_dict: + tid_delta = set(trades_by_id) - set(ledger_dict) + log.info( + 'New trades detected\n' + f'{pformat(tid_delta)}' + ) + if tid_delta: + ledger_dict.update( + {tid: trades_by_id[tid] for tid in tid_delta} + ) - ledgers[acctid] = ledger - - return ledgers + return ledger_dict if __name__ == '__main__':