ib: port to new `.accounting` APIs

Still kinda borked since i don't think there actually is a (per venue)
"get-all-symbologies" endpoint.. so we're likely gonna have to figure
out either how to hack it or provide a bypass in ledger processing?

Deatz:
- use new `Account` type name, rename endpoint vars to match and
  obviously use any new method name(s).
- mask out split ratio handling for now.
- async open the symcache prior to ledger processing (again, for now).
- drop passing `Transaction.sym`.
- fix parser set for dt-sorter since apparently 2022 and back had
  a `date` field instead?
account_tests
Tyler Goodlet 2023-07-10 18:02:40 -04:00
parent 8b9494281d
commit c30d8ac9ba
2 changed files with 48 additions and 25 deletions

View File

@ -61,7 +61,7 @@ from piker.accounting import (
TransactionLedger, TransactionLedger,
iter_by_dt, iter_by_dt,
open_pps, open_pps,
PpTable, Account,
) )
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -287,6 +287,9 @@ async def recv_trade_updates(
await client.ib.disconnectedEvent await client.ib.disconnectedEvent
# TODO: maybe we should allow the `trade_entries` input to be
# a list of the actual `Contract` types instead, though a couple
# other callers will need to be changed as well.
async def update_ledger_from_api_trades( async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
@ -383,25 +386,33 @@ async def update_and_audit_msgs(
# if ib reports a lesser pp it's not as bad since we can # if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we # presume we're at least not more in the shit then we
# thought. # thought.
if diff and pikersize: if (
reverse_split_ratio = pikersize / ibsize diff
split_ratio = 1/reverse_split_ratio and (
pikersize
or ibsize
)
):
# if 'mbt.cme' in msg.symbol:
# await tractor.pause()
if split_ratio >= reverse_split_ratio: # reverse_split_ratio = pikersize / ibsize
entry = f'split_ratio = {int(split_ratio)}' # split_ratio = 1/reverse_split_ratio
else: # if split_ratio >= reverse_split_ratio:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}' # entry = f'split_ratio = {int(split_ratio)}'
# else:
# entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
msg.size = ibsize msg.size = ibsize
logmsg: str = ( logmsg: str = (
f'Pos mismatch in ib vs. the piker ledger!\n' f'Pos mismatch in ib vs. the piker ledger!\n'
f'IB:\n{ibfmtmsg}\n\n' f'IB:\n{ibfmtmsg}\n\n'
f'PIKER:\n{pikerfmtmsg}\n\n' f'PIKER:\n{pikerfmtmsg}\n\n'
'If you are expecting a (reverse) split in this '
'instrument you should probably put the following' # 'If you are expecting a (reverse) split in this '
'in the `pps.toml` section:\n' # 'instrument you should probably put the following'
f'{entry}\n' # 'in the `pps.toml` section:\n'
# f'{entry}\n'
# f'reverse_split_ratio: {reverse_split_ratio}\n' # f'reverse_split_ratio: {reverse_split_ratio}\n'
# f'split_ratio: {split_ratio}\n\n' # f'split_ratio: {split_ratio}\n\n'
) )
@ -416,8 +427,9 @@ async def update_and_audit_msgs(
# TODO: make this a "propaganda" log level? # TODO: make this a "propaganda" log level?
log.warning( log.warning(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibppmsg.avg_price}\n' f'ib: {pformat(ibppmsg)}\n'
f'piker: {msg.avg_price}' '---------------------------\n'
f'piker: {msg.to_dict()}'
) )
else: else:
@ -537,6 +549,9 @@ async def open_trade_dialog(
accounts_def = config.load_accounts(['ib']) accounts_def = config.load_accounts(['ib'])
# TODO: do this as part of `open_account()`!?
from piker.data._symcache import open_symcache
global _client_cache global _client_cache
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
@ -550,12 +565,13 @@ async def open_trade_dialog(
proxies, proxies,
aioclients, aioclients,
), ),
open_symcache('ib', only_from_memcache=True) as symcache,
): ):
# Open a trade ledgers stack for appending trade records over # Open a trade ledgers stack for appending trade records over
# multiple accounts. # multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api.. # TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {} ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {} tables: dict[str, Account] = {}
order_msgs: list[Status] = [] order_msgs: list[Status] = []
conf = get_config() conf = get_config()
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
@ -582,8 +598,12 @@ async def open_trade_dialog(
parsers={ parsers={
'dateTime': parse_flex_dt, 'dateTime': parse_flex_dt,
'datetime': pendulum.parse, 'datetime': pendulum.parse,
# for some some fucking 2022 and
# back options records...fuck me.
'date': pendulum.parse,
}, },
), ),
symcache=symcache,
) )
) )
@ -616,7 +636,7 @@ async def open_trade_dialog(
) )
acctid: str = account.strip('ib.') acctid: str = account.strip('ib.')
ledger: dict = ledgers[acctid] ledger: dict = ledgers[acctid]
table: PpTable = tables[acctid] table: Account = tables[acctid]
# update position table with latest ledger from all # update position table with latest ledger from all
# gathered transactions: ledger file + api records. # gathered transactions: ledger file + api records.
@ -624,9 +644,8 @@ async def open_trade_dialog(
# update trades ledgers for all accounts from connected # update trades ledgers for all accounts from connected
# api clients which report trades for **this session**. # api clients which report trades for **this session**.
api_trades = await proxy.trades() api_trades: list[dict] = await proxy.trades()
if api_trades: if api_trades:
api_trans_by_acct: dict[str, Transaction] api_trans_by_acct: dict[str, Transaction]
api_to_ledger_entries: dict[str, dict] api_to_ledger_entries: dict[str, dict]
( (
@ -660,7 +679,10 @@ async def open_trade_dialog(
trans.update(api_trans) trans.update(api_trans)
# update account (and thus pps) from all gathered transactions # update account (and thus pps) from all gathered transactions
table.update_from_trans(trans) table.update_from_ledger(
trans,
symcache=ledger.symcache,
)
# process pp value reported from ib's system. we only # process pp value reported from ib's system. we only
# use these to cross-check sizing since average pricing # use these to cross-check sizing since average pricing
@ -772,7 +794,7 @@ async def emit_pp_update(
cids2pps: dict, cids2pps: dict,
ledgers: dict[str, dict[str, Any]], ledgers: dict[str, dict[str, Any]],
tables: dict[str, PpTable], acnts: dict[str, Account],
) -> None: ) -> None:
@ -794,9 +816,11 @@ async def emit_pp_update(
tx: Transaction = list(trans.values())[0] tx: Transaction = list(trans.values())[0]
acctid = fq_acctid.strip('ib.') acctid = fq_acctid.strip('ib.')
table = tables[acctid] acnt = acnts[acctid]
table.update_from_trans(trans)
active, closed = table.dump_active() acnt.update_from_ledger(trans)
active, closed = acnt.dump_active()
# NOTE: update ledger with all new trades # NOTE: update ledger with all new trades
for fq_acctid, trades_by_id in api_to_ledger_entries.items(): for fq_acctid, trades_by_id in api_to_ledger_entries.items():

View File

@ -224,7 +224,6 @@ def norm_trade_records(
# `trades_dialogue()` above). # `trades_dialogue()` above).
trans = Transaction( trans = Transaction(
fqme=fqme, fqme=fqme,
sym=pair,
tid=tid, tid=tid,
size=size, size=size,
price=price, price=price,