ib: more fixes to try and get positioning correct..

Define and bind in the `tx_sort()` routine to be used by
`open_trade_ledger()` when datetime sorting trade records.

Further deats:
- always use the IB reported position size (since apparently our ledger
  based accounting is getting rekt on occasion..).
- better ib pos msg formatting when there's mismatches with the piker
  equivalent.
- never emit zero-size pos msgs (in terms of strict ib pos sizing) since
  when there's piker ledger sizing errors we'll send the wrong thing to
  the ems and its clients..
account_tests
Tyler Goodlet 2023-07-19 16:46:36 -04:00
parent 8a10cbf6ab
commit 5eb310cac9
3 changed files with 79 additions and 52 deletions

View File

@ -38,6 +38,7 @@ from .broker import (
from .ledger import (
norm_trade,
norm_trade_records,
tx_sort,
)
from .symbols import (
get_mkt_info,
@ -55,6 +56,7 @@ __all__ = [
'open_symbol_search',
'stream_quotes',
'_search_conf',
'tx_sort',
]
_brokerd_mods: list[str] = [

View File

@ -49,7 +49,6 @@ from ib_insync.objects import (
CommissionReport,
)
from ib_insync.objects import Position as IbPosition
import pendulum
from piker import config
from piker.accounting import (
@ -59,7 +58,6 @@ from piker.accounting import (
Transaction,
open_trade_ledger,
TransactionLedger,
iter_by_dt,
open_account,
Account,
)
@ -87,14 +85,13 @@ from .api import (
Client,
MethodProxy,
)
from ._flex_reports import parse_flex_dt
from .ledger import (
norm_trade_records,
api_trades_to_ledger_entries,
tx_sort,
)
def pack_position(
pos: IbPosition
@ -368,8 +365,6 @@ async def update_and_audit_msgs(
# breakeven pp calcs.
ibppmsg = cids2pps.get((acctid, bs_mktid))
if ibppmsg:
symbol: str = ibppmsg.symbol
msg = BrokerdPosition(
broker='ib',
@ -379,10 +374,18 @@ async def update_and_audit_msgs(
# need it and/or it's prefixed in the section
# table..
account=ibppmsg.account,
# XXX: the `.ib` is stripped..?
symbol=symbol,
currency=ibppmsg.currency,
size=p.size,
symbol=ibppmsg.symbol,
# remove..
# currency=ibppmsg.currency,
# NOTE: always take their size since it's usually the
# true gospel..
# size=p.size,
size=ibppmsg.size,
avg_price=p.ppu,
)
msgs.append(msg)
@ -404,8 +407,6 @@ async def update_and_audit_msgs(
or ibsize
)
):
# if 'mbt.cme' in msg.symbol:
# await tractor.pause()
# reverse_split_ratio = pikersize / ibsize
# split_ratio = 1/reverse_split_ratio
@ -434,17 +435,19 @@ async def update_and_audit_msgs(
# await tractor.pause()
log.error(logmsg)
# TODO: make this a "propaganda" log level?
if ibppmsg.avg_price != msg.avg_price:
# TODO: make this a "propaganda" log level?
log.warning(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {pformat(ibppmsg)}\n'
f'ib: {ibfmtmsg}\n'
'---------------------------\n'
f'piker: {msg.to_dict()}'
f'piker: {pformat(msg.to_dict())}'
)
else:
# make brand new message
# XXX: though it shouldn't be possible (means an error
# in our accounting subsys) create a new message for
# a supposed "missing position" that IB never reported.
msg = BrokerdPosition(
broker='ib',
@ -455,9 +458,13 @@ async def update_and_audit_msgs(
# table.. we should just strip this from the message
# right since `.broker` is already included?
account=f'ib.{acctid}',
# XXX: the `.ib` is stripped..?
symbol=p.mkt.fqme,
# TODO: we should remove from msg schema..
# currency=ibppmsg.currency,
size=p.size,
avg_price=p.ppu,
)
@ -467,11 +474,8 @@ async def update_and_audit_msgs(
'Maybe they LIQUIDATED YOU or are missing ledger entries?\n'
)
log.error(logmsg)
# if validate:
# raise ValueError(logmsg)
msgs.append(msg)
if validate:
raise ValueError(logmsg)
return msgs
@ -558,13 +562,8 @@ async def open_trade_dialog(
) -> AsyncIterator[dict[str, Any]]:
# from piker.brokers import (
# get_brokermod,
# )
accounts_def = config.load_accounts(['ib'])
global _client_cache
# deliver positions to subscriber before anything else
all_positions = []
accounts = set()
@ -606,16 +605,7 @@ async def open_trade_dialog(
open_trade_ledger(
'ib',
acctid,
tx_sort=partial(
iter_by_dt,
parsers={
'dateTime': parse_flex_dt,
'datetime': pendulum.parse,
# for some some fucking 2022 and
# back options records...fuck me.
'date': pendulum.parse,
},
),
tx_sort=tx_sort,
symcache=symcache,
)
)
@ -680,7 +670,6 @@ async def open_trade_dialog(
api_to_ledger_entries
and (trade_entries := api_to_ledger_entries.get(acctid))
):
# TODO: fix this `tractor` BUG!
# https://github.com/goodboy/tractor/issues/354
# await tractor.pp()
@ -724,8 +713,8 @@ async def open_trade_dialog(
continue
bs_mktid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
msg.account = accounts_def.inverse[msg.account]
acctid = msg.account.strip('ib.')
cids2pps[(acctid, bs_mktid)] = msg
assert msg.account in accounts, (
@ -744,7 +733,7 @@ async def open_trade_dialog(
cids2pps,
validate=False,
)
all_positions.extend(msg for msg in msgs)
all_positions.extend(msg for msg in msgs if msg.size != 0)
if not all_positions and cids2pps:
raise RuntimeError(
@ -782,7 +771,7 @@ async def open_trade_dialog(
# allocate event relay tasks for each client connection
n.start_soon(
deliver_trade_events,
n,
# n,
trade_event_stream,
ems_stream,
accounts_def,
@ -838,26 +827,33 @@ async def emit_pp_update(
acctid = fq_acctid.strip('ib.')
acnt = acnts[acctid]
ledger: dict = ledgers[acctid]
acnt.update_from_ledger(trans)
acnt.update_from_ledger(
trans,
symcache=ledger.symcache
)
active, closed = acnt.dump_active()
# NOTE: update ledger with all new trades
for fq_acctid, trades_by_id in api_to_ledger_entries.items():
acctid = fq_acctid.strip('ib.')
ledger = ledgers[acctid]
acctid: str = fq_acctid.strip('ib.')
ledger: dict = ledgers[acctid]
# NOTE: don't override flex/previous entries with new API
# ones, just update with new fields!
for tid, tdict in trades_by_id.items():
# NOTE: don't override flex/previous entries with new API
# ones, just update with new fields!
ledger.setdefault(tid, {}).update(tdict)
# generate pp msgs and cross check with ib's positions data, relay
# re-formatted pps as msgs to the ems.
for pos in filter(
bool,
[active.get(tx.bs_mktid), closed.get(tx.bs_mktid)]
[
active.get(tx.bs_mktid),
closed.get(tx.bs_mktid)
]
):
msgs = await update_and_audit_msgs(
acctid,
@ -869,7 +865,7 @@ async def emit_pp_update(
)
if msgs:
msg = msgs[0]
log.info('Emitting pp msg: {msg}')
log.info(f'Emitting pp msg: {msg}')
break
await ems_stream.send(msg)
@ -889,11 +885,19 @@ _statuses: dict[str, str] = {
# https://github.com/erdewit/ib_insync/issues/363
'inactive': 'pending',
}
_action_map = {
'BOT': 'buy',
'SLD': 'sell',
}
# TODO: maybe just make this a flat func without an interal loop
# and call it *from* the `trade_event_stream` loop? Might look
# a lot nicer doing that from open_trade_dialog() instead of
# starting a separate task?
async def deliver_trade_events(
nurse: trio.Nursery,
# nurse: trio.Nursery,
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
@ -908,7 +912,6 @@ async def deliver_trade_events(
Format and relay all trade events for a given client to emsd.
'''
action_map = {'BOT': 'buy', 'SLD': 'sell'}
ids2fills: dict[str, dict] = {}
# TODO: for some reason we can receive a ``None`` here when the
@ -916,7 +919,6 @@ async def deliver_trade_events(
# at the eventkit code above but we should probably handle it...
async for event_name, item in trade_event_stream:
log.info(f'ib sending {event_name}:\n{pformat(item)}')
match event_name:
# NOTE: we remap statuses to the ems set via the
# ``_statuses: dict`` above.
@ -999,7 +1001,7 @@ async def deliver_trade_events(
# `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
action=action_map[execu.side],
action=_action_map[execu.side],
size=execu.shares,
price=execu.price,
# broker_details=execdict,
@ -1163,8 +1165,16 @@ async def deliver_trade_events(
case 'position':
cid, msg = pack_position(item)
pos: IbPosition = item
bs_mktid, msg = pack_position(pos)
log.info(f'New IB position msg: {msg}')
# always update with latest ib pos msg info since
# we generally audit against it for sanity and
# testing AND we require it to be updated to avoid
# error msgs emitted from `update_and_audit_msgs()`
cids2pps[(msg.account, bs_mktid)] = msg
# cuck ib and it's shitty fifo sys for pps!
continue

View File

@ -20,9 +20,11 @@ Trade transaction accounting and normalization.
'''
from bisect import insort
from decimal import Decimal
from functools import partial
from pprint import pformat
from typing import (
Any,
Callable,
)
from bidict import bidict
@ -38,11 +40,24 @@ from piker.accounting import (
digits_to_dec,
Transaction,
MktPair,
iter_by_dt,
)
from ._flex_reports import parse_flex_dt
from ._util import log
tx_sort: Callable = partial(
iter_by_dt,
parsers={
'dateTime': parse_flex_dt,
'datetime': pendulum.parse,
# for some some fucking 2022 and
# back options records...fuck me.
'date': pendulum.parse,
}
)
def norm_trade(
tid: str,
record: dict[str, Any],