''' `piker.accounting` mgmt calculations for - positioning - ledger updates - config file IO ''' from pathlib import Path import pytest from piker import config from piker.accounting import ( Account, calc, open_account, load_account, load_account_from_ledger, open_trade_ledger, Position, TransactionLedger, ) import tractor def test_root_conf_networking_section( root_conf: dict, ): conf, path = config.load( 'conf', touch_if_dne=True, ) assert conf['network']['tsdb'] def test_account_file_default_empty( tmpconfdir: Path, ): conf, path = load_account( 'kraken', 'paper', ) # ensure the account file empty but created # and in the correct place in the filesystem! assert not conf assert path.parent.is_dir() assert path.parent.name == 'accounting' @pytest.mark.parametrize( 'fq_acnt', [ ('binance', 'paper'), ], ) def test_paper_ledger_position_calcs( fq_acnt: tuple[str, str], debug_mode: bool, ): broker: str acnt_name: str broker, acnt_name = fq_acnt accounts_path: Path = ( config.repodir() / 'tests' / '_inputs' # tests-local-subdir ) ldr: TransactionLedger with ( open_trade_ledger( broker, acnt_name, allow_from_sync_code=True, _fp=accounts_path, ) as ldr, # open `polars` acnt dfs Bo calc.open_ledger_dfs( broker, acnt_name, ledger=ldr, _fp=accounts_path, debug_mode=debug_mode, ) as (dfs, ledger), ): acnt: Account = load_account_from_ledger( broker, acnt_name, ledger=ldr, _fp=accounts_path, ) # do manual checks on expected pos calcs based on input # ledger B) # xrpusdt should have a net-zero size xrp: str = 'xrpusdt.spot.binance' pos: Position = acnt.pps[xrp] # XXX: turns out our old dict-style-processing # get's this wrong i think due to dt-sorting.. # lcum: float = pos.cumsize df = dfs[xrp] assert df['cumsize'][-1] == 0 assert pos.cumsize == 0 @pytest.mark.parametrize( 'fq_acnt', [ ('ib', 'algopaper'), ], ) def test_ib_account_with_duplicated_mktids( fq_acnt: tuple[str, str], debug_mode: bool, ): # ?TODO, once we start symcache-incremental-update-support? # from piker.data import ( # open_symcache, # ) # # async def main(): # async with ( # # TODO: do this as part of `open_account()`!? # open_symcache( # 'ib', # only_from_memcache=True, # ) as symcache, # ): from piker.brokers.ib.ledger import ( tx_sort, # ?TODO, once we want to pull lowlevel txns and process them? # norm_trade_records, # update_ledger_from_api_trades, ) broker: str acnt_id: str = 'algopaper' broker, acnt_id = fq_acnt accounts_def = config.load_accounts([broker]) assert accounts_def[f'{broker}.{acnt_id}'] ledger: TransactionLedger acnt: Account with ( tractor.devx.maybe_open_crash_handler(pdb=debug_mode), open_trade_ledger( 'ib', acnt_id, tx_sort=tx_sort, # TODO, eventually incrementally updated for IB.. # symcache=symcache, symcache=None, allow_from_sync_code=True, ) as ledger, open_account( 'ib', acnt_id, write_on_exit=True, ) as acnt, ): # per input params symcache = ledger.symcache assert not ( symcache.pairs or symcache.pairs or symcache.mktmaps ) # re-compute all positions that have changed state. # TODO: likely we should change the API to return the # position updates from `.update_from_ledger()`? active, closed = acnt.dump_active() # breakpoint() # TODO, (see above imports as well) incremental update from # (updated) ledger? # -[ ] pull some code from `.ib.broker` content.