`ib`: only process ledger-txs once per client

Previous we were re-processing all ledgers for every position msg
received from the API, per client.. Instead do that once in a first pass
and drop all key-miss lookups for `bs_mktid`s; it should never happen.

Better typing for in-routine vars, convert pos msg/objects to `dict`
prior to logging so it's sane to read on console. Skip processing
specifically option contracts for now.
pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-27 14:14:39 -04:00
parent f0d181e3f7
commit c3686185c1
1 changed files with 34 additions and 39 deletions

View File

@ -38,6 +38,7 @@ from trio_typing import TaskStatus
import tractor import tractor
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
Option,
) )
from ib_insync.order import ( from ib_insync.order import (
Trade, Trade,
@ -88,14 +89,17 @@ from .api import (
def pack_position( def pack_position(
pos: IbPosition pos: IbPosition
) -> dict[str, Any]: ) -> tuple[
str,
dict[str, Any]
]:
con = pos.contract con = pos.contract
fqsn, calc_price = con2fqsn(con) fqsn, calc_price = con2fqsn(con)
# TODO: options contracts into a sane format.. # TODO: options contracts into a sane format..
return ( return (
con.conId, str(con.conId),
BrokerdPosition( BrokerdPosition(
broker='ib', broker='ib',
account=pos.account, account=pos.account,
@ -383,20 +387,19 @@ async def update_and_audit_msgs(
# raise ValueError( # raise ValueError(
log.error( log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
f'reverse_split_ratio: {reverse_split_ratio}\n'
f'split_ratio: {split_ratio}\n\n'
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n' 'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
'If you are expecting a (reverse) split in this ' 'If you are expecting a (reverse) split in this '
'instrument you should probably put the following ' 'instrument you should probably put the following\n\n'
f'in the `pps.toml` section:\n{entry}' f'in the `pps.toml` section:\n{entry}'
f'IB:\nm{ibppmsg.to_dict()}\n\n'
f'PIKER:\n{msg.to_dict()}\n\n'
# f'reverse_split_ratio: {reverse_split_ratio}\n'
# f'split_ratio: {split_ratio}\n\n'
) )
msg.size = ibsize msg.size = ibsize
if ibppmsg.avg_price != msg.avg_price: if ibppmsg.avg_price != msg.avg_price:
# TODO: make this a "propaganda" log level?
# TODO: make this a "propoganda" log level?
log.warning( log.warning(
'The mega-cucks at IB want you to believe with their ' 'The mega-cucks at IB want you to believe with their '
f'"FIFO" positioning for {msg.symbol}:\n' f'"FIFO" positioning for {msg.symbol}:\n'
@ -425,10 +428,10 @@ async def update_and_audit_msgs(
if validate and p.size: if validate and p.size:
# raise ValueError( # raise ValueError(
log.error( log.error(
f'UNEXPECTED POSITION says ib:\n' f'UNEXPECTED POSITION says IB:\n'
f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n' 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n'
'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?' 'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?\n'
f'PIKER:\n{msg.to_dict()}\n'
) )
msgs.append(msg) msgs.append(msg)
@ -611,6 +614,8 @@ async def trades_dialogue(
# api clients which report trades for **this session**. # api clients which report trades for **this session**.
trades = await proxy.trades() trades = await proxy.trades()
if trades: if trades:
trans_by_acct: dict[str, Transaction]
api_to_ledger_entries: dict[str, dict]
( (
trans_by_acct, trans_by_acct,
api_to_ledger_entries, api_to_ledger_entries,
@ -637,17 +642,30 @@ async def trades_dialogue(
if trans: if trans:
table.update_from_trans(trans) table.update_from_trans(trans)
trans = norm_trade_records(ledger)
table.update_from_trans(trans)
# process pp value reported from ib's system. we only # process pp value reported from ib's system. we only
# use these to cross-check sizing since average pricing # use these to cross-check sizing since average pricing
# on their end uses the so called (bs) "FIFO" style # on their end uses the so called (bs) "FIFO" style
# which more or less results in a price that's not # which more or less results in a price that's not
# useful for traders who want to not lose money.. xb # useful for traders who want to not lose money.. xb
for pos in client.positions(): # -> collect all ib-pp reported positions so that we can be
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if # sure know which positions to update from the ledger if
# any are missing from the ``pps.toml`` # any are missing from the ``pps.toml``
bs_mktid, msg = pack_position(pos) pos: IbPosition # named tuple actually
for pos in client.positions():
# NOTE XXX: we skip options for now since we don't
# yet support the symbology nor the live feeds.
if isinstance(pos.contract, Option):
log.warning(
f'Option contracts not supported for now:\n'
f'{pos._asdict()}'
)
continue
bs_mktid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.') acctid = acctid.strip('ib.')
cids2pps[(acctid, bs_mktid)] = msg cids2pps[(acctid, bs_mktid)] = msg
@ -663,29 +681,6 @@ async def trades_dialogue(
not pp not pp
or pp.size != msg.size or pp.size != msg.size
): ):
trans = norm_trade_records(ledger)
table.update_from_trans(trans)
# XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bs_mktid]`` ?
pp = table.pps.get(bs_mktid)
if not pp:
log.error(
f'The contract id for {msg} may have '
f'changed to {bs_mktid}\nYou may need to '
'adjust your ledger for this, skipping '
'for now.'
)
continue
# XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bs_mktid]`` ?
pp = table.pps[bs_mktid] pp = table.pps[bs_mktid]
pairinfo = pp.symbol pairinfo = pp.symbol
if msg.size != pp.size: if msg.size != pp.size: