ib: expose trade EP as `open_trade_dialog()`

Should be the final production backend to switch this over B)

Also tidy up the `update_and_audit_msgs()` validator to log vs. raise
when `validate: bool` is set; turn it off by default to avoid raises
until we figure out wtf is up with ib ledger processing or wtv..
basic_buy_bot
Tyler Goodlet 2023-06-24 17:12:43 -04:00
parent b1ef549276
commit 3be1d610e0
4 changed files with 56 additions and 43 deletions

View File

@ -34,12 +34,12 @@ from .feed import (
stream_quotes, stream_quotes,
) )
from .broker import ( from .broker import (
trades_dialogue, open_trade_dialog,
) )
__all__ = [ __all__ = [
'get_client', 'get_client',
'trades_dialogue', 'open_trade_dialog',
'open_history_client', 'open_history_client',
'open_symbol_search', 'open_symbol_search',
'stream_quotes', 'stream_quotes',

View File

@ -29,7 +29,7 @@ import subprocess
import tractor import tractor
from .._util import get_logger from piker.brokers._util import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from .api import Client from .api import Client

View File

@ -85,8 +85,9 @@ import numpy as np
# non-relative for backends so that non-builting backends # non-relative for backends so that non-builting backends
# can be easily modelled after this style B) # can be easily modelled after this style B)
from piker import config from piker import config
from piker.brokers._util import ( from ._util import (
log, log,
# only for the ib_sync internal logging
get_logger, get_logger,
) )

View File

@ -60,6 +60,7 @@ from piker.accounting import (
Position, Position,
Transaction, Transaction,
open_trade_ledger, open_trade_ledger,
TransactionLedger,
iter_by_dt, iter_by_dt,
open_pps, open_pps,
PpTable, PpTable,
@ -78,10 +79,10 @@ from piker.clearing._messages import (
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
) )
from ._util import log
from .api import ( from .api import (
_accounts2clients, _accounts2clients,
con2fqme, con2fqme,
log,
get_config, get_config,
open_client_proxies, open_client_proxies,
Client, Client,
@ -90,6 +91,7 @@ from .api import (
from ._flex_reports import parse_flex_dt from ._flex_reports import parse_flex_dt
def pack_position( def pack_position(
pos: IbPosition pos: IbPosition
@ -339,7 +341,7 @@ async def update_and_audit_msgs(
acctid: str, # no `ib.` prefix is required! acctid: str, # no `ib.` prefix is required!
pps: list[Position], pps: list[Position],
cids2pps: dict[tuple[str, int], BrokerdPosition], cids2pps: dict[tuple[str, int], BrokerdPosition],
validate: bool = False, validate: bool = True,
) -> list[BrokerdPosition]: ) -> list[BrokerdPosition]:
@ -352,9 +354,9 @@ async def update_and_audit_msgs(
# for comparison/audit versus the piker equivalent # for comparison/audit versus the piker equivalent
# breakeven pp calcs. # breakeven pp calcs.
ibppmsg = cids2pps.get((acctid, bs_mktid)) ibppmsg = cids2pps.get((acctid, bs_mktid))
if ibppmsg: if ibppmsg:
symbol = ibppmsg.symbol
symbol: str = ibppmsg.symbol
msg = BrokerdPosition( msg = BrokerdPosition(
broker='ib', broker='ib',
@ -375,7 +377,6 @@ async def update_and_audit_msgs(
ibfmtmsg = pformat(ibppmsg.to_dict()) ibfmtmsg = pformat(ibppmsg.to_dict())
pikerfmtmsg = pformat(msg.to_dict()) pikerfmtmsg = pformat(msg.to_dict())
if validate:
ibsize = ibppmsg.size ibsize = ibppmsg.size
pikersize = msg.size pikersize = msg.size
diff = pikersize - ibsize diff = pikersize - ibsize
@ -392,8 +393,9 @@ async def update_and_audit_msgs(
else: else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}' entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
# raise ValueError( msg.size = ibsize
log.error(
logmsg: str = (
f'Pos mismatch in ib vs. the piker ledger!\n' f'Pos mismatch in ib vs. the piker ledger!\n'
f'IB:\n{ibfmtmsg}\n\n' f'IB:\n{ibfmtmsg}\n\n'
f'PIKER:\n{pikerfmtmsg}\n\n' f'PIKER:\n{pikerfmtmsg}\n\n'
@ -404,7 +406,12 @@ async def update_and_audit_msgs(
# f'reverse_split_ratio: {reverse_split_ratio}\n' # f'reverse_split_ratio: {reverse_split_ratio}\n'
# f'split_ratio: {split_ratio}\n\n' # f'split_ratio: {split_ratio}\n\n'
) )
msg.size = ibsize
if validate:
raise ValueError(logmsg)
else:
# await tractor.pause()
log.error(logmsg)
if ibppmsg.avg_price != msg.avg_price: if ibppmsg.avg_price != msg.avg_price:
# TODO: make this a "propaganda" log level? # TODO: make this a "propaganda" log level?
@ -432,12 +439,16 @@ async def update_and_audit_msgs(
size=p.size, size=p.size,
avg_price=p.ppu, avg_price=p.ppu,
) )
if validate and p.size: if p.size:
# raise ValueError( logmsg: str = (
log.error(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n' f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or are missing ledger entries?\n' 'Maybe they LIQUIDATED YOU or are missing ledger entries?\n'
) )
log.error(logmsg)
# if validate:
# raise ValueError(logmsg)
msgs.append(msg) msgs.append(msg)
return msgs return msgs
@ -520,10 +531,8 @@ async def open_trade_event_stream(
@tractor.context @tractor.context
async def trades_dialogue( async def open_trade_dialog(
ctx: tractor.Context, ctx: tractor.Context,
# loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
@ -575,6 +584,7 @@ async def trades_dialogue(
# open ledger and pptable wrapper for each # open ledger and pptable wrapper for each
# detected account. # detected account.
ledger: TransactionLedger
ledger = ledgers[acctid] = lstack.enter_context( ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger( open_trade_ledger(
'ib', 'ib',
@ -643,13 +653,14 @@ async def trades_dialogue(
# TODO: fix this `tractor` BUG! # TODO: fix this `tractor` BUG!
# https://github.com/goodboy/tractor/issues/354 # https://github.com/goodboy/tractor/issues/354
# await tractor.breakpoint() # await tractor.pp()
if trade_entries: if trade_entries:
# write ledger with all new api_trades # write ledger with all new api_trades
# **AFTER** we've updated the `pps.toml` # **AFTER** we've updated the `pps.toml`
# from the original ledger state! (i.e. this # from the original ledger state! (i.e. this
# is currently done on exit) # is currently done on exit)
for tid, entry in trade_entries.items(): for tid, entry in trade_entries.items():
ledger.setdefault(tid, {}).update(entry) ledger.setdefault(tid, {}).update(entry)
@ -670,6 +681,7 @@ async def trades_dialogue(
# -> collect all ib-pp reported positions so that we can be # -> collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if # sure know which positions to update from the ledger if
# any are missing from the ``pps.toml`` # any are missing from the ``pps.toml``
# await tractor.pp()
pos: IbPosition # named tuple subtype pos: IbPosition # named tuple subtype
for pos in client.positions(): for pos in client.positions():
@ -702,7 +714,7 @@ async def trades_dialogue(
acctid, acctid,
pps.values(), pps.values(),
cids2pps, cids2pps,
validate=True, validate=False,
) )
all_positions.extend(msg for msg in msgs) all_positions.extend(msg for msg in msgs)