`ib`: only process ledger-txs once per client
Previous we were re-processing all ledgers for every position msg received from the API, per client.. Instead do that once in a first pass and drop all key-miss lookups for `bs_mktid`s; it should never happen. Better typing for in-routine vars, convert pos msg/objects to `dict` prior to logging so it's sane to read on console. Skip processing specifically option contracts for now.rekt_pps
parent
978c59f5f0
commit
29ad20bc63
|
@ -38,6 +38,7 @@ from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from ib_insync.contract import (
|
from ib_insync.contract import (
|
||||||
Contract,
|
Contract,
|
||||||
|
Option,
|
||||||
)
|
)
|
||||||
from ib_insync.order import (
|
from ib_insync.order import (
|
||||||
Trade,
|
Trade,
|
||||||
|
@ -88,14 +89,17 @@ from .api import (
|
||||||
def pack_position(
|
def pack_position(
|
||||||
pos: IbPosition
|
pos: IbPosition
|
||||||
|
|
||||||
) -> dict[str, Any]:
|
) -> tuple[
|
||||||
|
str,
|
||||||
|
dict[str, Any]
|
||||||
|
]:
|
||||||
|
|
||||||
con = pos.contract
|
con = pos.contract
|
||||||
fqsn, calc_price = con2fqsn(con)
|
fqsn, calc_price = con2fqsn(con)
|
||||||
|
|
||||||
# TODO: options contracts into a sane format..
|
# TODO: options contracts into a sane format..
|
||||||
return (
|
return (
|
||||||
con.conId,
|
str(con.conId),
|
||||||
BrokerdPosition(
|
BrokerdPosition(
|
||||||
broker='ib',
|
broker='ib',
|
||||||
account=pos.account,
|
account=pos.account,
|
||||||
|
@ -383,20 +387,19 @@ async def update_and_audit_msgs(
|
||||||
# raise ValueError(
|
# raise ValueError(
|
||||||
log.error(
|
log.error(
|
||||||
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
||||||
f'ib: {ibppmsg}\n'
|
|
||||||
f'piker: {msg}\n'
|
|
||||||
f'reverse_split_ratio: {reverse_split_ratio}\n'
|
|
||||||
f'split_ratio: {split_ratio}\n\n'
|
|
||||||
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
|
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
|
||||||
'If you are expecting a (reverse) split in this '
|
'If you are expecting a (reverse) split in this '
|
||||||
'instrument you should probably put the following '
|
'instrument you should probably put the following\n\n'
|
||||||
f'in the `pps.toml` section:\n{entry}'
|
f'in the `pps.toml` section:\n{entry}'
|
||||||
|
f'IB:\nm{ibppmsg.to_dict()}\n\n'
|
||||||
|
f'PIKER:\n{msg.to_dict()}\n\n'
|
||||||
|
# f'reverse_split_ratio: {reverse_split_ratio}\n'
|
||||||
|
# f'split_ratio: {split_ratio}\n\n'
|
||||||
)
|
)
|
||||||
msg.size = ibsize
|
msg.size = ibsize
|
||||||
|
|
||||||
if ibppmsg.avg_price != msg.avg_price:
|
if ibppmsg.avg_price != msg.avg_price:
|
||||||
|
# TODO: make this a "propaganda" log level?
|
||||||
# TODO: make this a "propoganda" log level?
|
|
||||||
log.warning(
|
log.warning(
|
||||||
'The mega-cucks at IB want you to believe with their '
|
'The mega-cucks at IB want you to believe with their '
|
||||||
f'"FIFO" positioning for {msg.symbol}:\n'
|
f'"FIFO" positioning for {msg.symbol}:\n'
|
||||||
|
@ -425,10 +428,10 @@ async def update_and_audit_msgs(
|
||||||
if validate and p.size:
|
if validate and p.size:
|
||||||
# raise ValueError(
|
# raise ValueError(
|
||||||
log.error(
|
log.error(
|
||||||
f'UNEXPECTED POSITION says ib:\n'
|
f'UNEXPECTED POSITION says IB:\n'
|
||||||
f'piker: {msg}\n'
|
|
||||||
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n'
|
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n'
|
||||||
'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?'
|
'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?\n'
|
||||||
|
f'PIKER:\n{msg.to_dict()}\n'
|
||||||
)
|
)
|
||||||
msgs.append(msg)
|
msgs.append(msg)
|
||||||
|
|
||||||
|
@ -611,6 +614,8 @@ async def trades_dialogue(
|
||||||
# api clients which report trades for **this session**.
|
# api clients which report trades for **this session**.
|
||||||
trades = await proxy.trades()
|
trades = await proxy.trades()
|
||||||
if trades:
|
if trades:
|
||||||
|
trans_by_acct: dict[str, Transaction]
|
||||||
|
api_to_ledger_entries: dict[str, dict]
|
||||||
(
|
(
|
||||||
trans_by_acct,
|
trans_by_acct,
|
||||||
api_to_ledger_entries,
|
api_to_ledger_entries,
|
||||||
|
@ -637,17 +642,30 @@ async def trades_dialogue(
|
||||||
if trans:
|
if trans:
|
||||||
table.update_from_trans(trans)
|
table.update_from_trans(trans)
|
||||||
|
|
||||||
|
trans = norm_trade_records(ledger)
|
||||||
|
table.update_from_trans(trans)
|
||||||
|
|
||||||
# process pp value reported from ib's system. we only
|
# process pp value reported from ib's system. we only
|
||||||
# use these to cross-check sizing since average pricing
|
# use these to cross-check sizing since average pricing
|
||||||
# on their end uses the so called (bs) "FIFO" style
|
# on their end uses the so called (bs) "FIFO" style
|
||||||
# which more or less results in a price that's not
|
# which more or less results in a price that's not
|
||||||
# useful for traders who want to not lose money.. xb
|
# useful for traders who want to not lose money.. xb
|
||||||
for pos in client.positions():
|
# -> collect all ib-pp reported positions so that we can be
|
||||||
# collect all ib-pp reported positions so that we can be
|
|
||||||
# sure know which positions to update from the ledger if
|
# sure know which positions to update from the ledger if
|
||||||
# any are missing from the ``pps.toml``
|
# any are missing from the ``pps.toml``
|
||||||
bs_mktid, msg = pack_position(pos)
|
pos: IbPosition # named tuple actually
|
||||||
|
for pos in client.positions():
|
||||||
|
|
||||||
|
# NOTE XXX: we skip options for now since we don't
|
||||||
|
# yet support the symbology nor the live feeds.
|
||||||
|
if isinstance(pos.contract, Option):
|
||||||
|
log.warning(
|
||||||
|
f'Option contracts not supported for now:\n'
|
||||||
|
f'{pos._asdict()}'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
bs_mktid, msg = pack_position(pos)
|
||||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
acctid = acctid.strip('ib.')
|
acctid = acctid.strip('ib.')
|
||||||
cids2pps[(acctid, bs_mktid)] = msg
|
cids2pps[(acctid, bs_mktid)] = msg
|
||||||
|
@ -663,29 +681,6 @@ async def trades_dialogue(
|
||||||
not pp
|
not pp
|
||||||
or pp.size != msg.size
|
or pp.size != msg.size
|
||||||
):
|
):
|
||||||
trans = norm_trade_records(ledger)
|
|
||||||
table.update_from_trans(trans)
|
|
||||||
|
|
||||||
# XXX: not sure exactly why it wouldn't be in
|
|
||||||
# the updated output (maybe this is a bug?) but
|
|
||||||
# if you create a pos from TWS and then load it
|
|
||||||
# from the api trades it seems we get a key
|
|
||||||
# error from ``update[bs_mktid]`` ?
|
|
||||||
pp = table.pps.get(bs_mktid)
|
|
||||||
if not pp:
|
|
||||||
log.error(
|
|
||||||
f'The contract id for {msg} may have '
|
|
||||||
f'changed to {bs_mktid}\nYou may need to '
|
|
||||||
'adjust your ledger for this, skipping '
|
|
||||||
'for now.'
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# XXX: not sure exactly why it wouldn't be in
|
|
||||||
# the updated output (maybe this is a bug?) but
|
|
||||||
# if you create a pos from TWS and then load it
|
|
||||||
# from the api trades it seems we get a key
|
|
||||||
# error from ``update[bs_mktid]`` ?
|
|
||||||
pp = table.pps[bs_mktid]
|
pp = table.pps[bs_mktid]
|
||||||
pairinfo = pp.symbol
|
pairinfo = pp.symbol
|
||||||
if msg.size != pp.size:
|
if msg.size != pp.size:
|
||||||
|
|
Loading…
Reference in New Issue