diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index e0ad96c8..e792eb25 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -38,6 +38,7 @@ from .broker import ( from .ledger import ( norm_trade, norm_trade_records, + tx_sort, ) from .symbols import ( get_mkt_info, @@ -55,6 +56,7 @@ __all__ = [ 'open_symbol_search', 'stream_quotes', '_search_conf', + 'tx_sort', ] _brokerd_mods: list[str] = [ diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 842fdbc3..56c3d82a 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -49,7 +49,6 @@ from ib_insync.objects import ( CommissionReport, ) from ib_insync.objects import Position as IbPosition -import pendulum from piker import config from piker.accounting import ( @@ -59,7 +58,6 @@ from piker.accounting import ( Transaction, open_trade_ledger, TransactionLedger, - iter_by_dt, open_account, Account, ) @@ -87,14 +85,13 @@ from .api import ( Client, MethodProxy, ) -from ._flex_reports import parse_flex_dt from .ledger import ( norm_trade_records, api_trades_to_ledger_entries, + tx_sort, ) - def pack_position( pos: IbPosition @@ -368,8 +365,6 @@ async def update_and_audit_msgs( # breakeven pp calcs. ibppmsg = cids2pps.get((acctid, bs_mktid)) if ibppmsg: - - symbol: str = ibppmsg.symbol msg = BrokerdPosition( broker='ib', @@ -379,10 +374,18 @@ async def update_and_audit_msgs( # need it and/or it's prefixed in the section # table.. account=ibppmsg.account, + # XXX: the `.ib` is stripped..? - symbol=symbol, - currency=ibppmsg.currency, - size=p.size, + symbol=ibppmsg.symbol, + + # remove.. + # currency=ibppmsg.currency, + + # NOTE: always take their size since it's usually the + # true gospel.. + # size=p.size, + size=ibppmsg.size, + avg_price=p.ppu, ) msgs.append(msg) @@ -404,8 +407,6 @@ async def update_and_audit_msgs( or ibsize ) ): - # if 'mbt.cme' in msg.symbol: - # await tractor.pause() # reverse_split_ratio = pikersize / ibsize # split_ratio = 1/reverse_split_ratio @@ -434,17 +435,19 @@ async def update_and_audit_msgs( # await tractor.pause() log.error(logmsg) + # TODO: make this a "propaganda" log level? if ibppmsg.avg_price != msg.avg_price: - # TODO: make this a "propaganda" log level? log.warning( f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' - f'ib: {pformat(ibppmsg)}\n' + f'ib: {ibfmtmsg}\n' '---------------------------\n' - f'piker: {msg.to_dict()}' + f'piker: {pformat(msg.to_dict())}' ) else: - # make brand new message + # XXX: though it shouldn't be possible (means an error + # in our accounting subsys) create a new message for + # a supposed "missing position" that IB never reported. msg = BrokerdPosition( broker='ib', @@ -455,9 +458,13 @@ async def update_and_audit_msgs( # table.. we should just strip this from the message # right since `.broker` is already included? account=f'ib.{acctid}', + # XXX: the `.ib` is stripped..? symbol=p.mkt.fqme, + + # TODO: we should remove from msg schema.. # currency=ibppmsg.currency, + size=p.size, avg_price=p.ppu, ) @@ -467,11 +474,8 @@ async def update_and_audit_msgs( 'Maybe they LIQUIDATED YOU or are missing ledger entries?\n' ) log.error(logmsg) - - # if validate: - # raise ValueError(logmsg) - - msgs.append(msg) + if validate: + raise ValueError(logmsg) return msgs @@ -558,13 +562,8 @@ async def open_trade_dialog( ) -> AsyncIterator[dict[str, Any]]: - # from piker.brokers import ( - # get_brokermod, - # ) accounts_def = config.load_accounts(['ib']) - global _client_cache - # deliver positions to subscriber before anything else all_positions = [] accounts = set() @@ -606,16 +605,7 @@ async def open_trade_dialog( open_trade_ledger( 'ib', acctid, - tx_sort=partial( - iter_by_dt, - parsers={ - 'dateTime': parse_flex_dt, - 'datetime': pendulum.parse, - # for some some fucking 2022 and - # back options records...fuck me. - 'date': pendulum.parse, - }, - ), + tx_sort=tx_sort, symcache=symcache, ) ) @@ -680,7 +670,6 @@ async def open_trade_dialog( api_to_ledger_entries and (trade_entries := api_to_ledger_entries.get(acctid)) ): - # TODO: fix this `tractor` BUG! # https://github.com/goodboy/tractor/issues/354 # await tractor.pp() @@ -724,8 +713,8 @@ async def open_trade_dialog( continue bs_mktid, msg = pack_position(pos) - acctid = msg.account = accounts_def.inverse[msg.account] - acctid = acctid.strip('ib.') + msg.account = accounts_def.inverse[msg.account] + acctid = msg.account.strip('ib.') cids2pps[(acctid, bs_mktid)] = msg assert msg.account in accounts, ( @@ -744,7 +733,7 @@ async def open_trade_dialog( cids2pps, validate=False, ) - all_positions.extend(msg for msg in msgs) + all_positions.extend(msg for msg in msgs if msg.size != 0) if not all_positions and cids2pps: raise RuntimeError( @@ -782,7 +771,7 @@ async def open_trade_dialog( # allocate event relay tasks for each client connection n.start_soon( deliver_trade_events, - n, + # n, trade_event_stream, ems_stream, accounts_def, @@ -838,26 +827,33 @@ async def emit_pp_update( acctid = fq_acctid.strip('ib.') acnt = acnts[acctid] + ledger: dict = ledgers[acctid] - acnt.update_from_ledger(trans) + acnt.update_from_ledger( + trans, + symcache=ledger.symcache + ) active, closed = acnt.dump_active() # NOTE: update ledger with all new trades for fq_acctid, trades_by_id in api_to_ledger_entries.items(): - acctid = fq_acctid.strip('ib.') - ledger = ledgers[acctid] + acctid: str = fq_acctid.strip('ib.') + ledger: dict = ledgers[acctid] + # NOTE: don't override flex/previous entries with new API + # ones, just update with new fields! for tid, tdict in trades_by_id.items(): - # NOTE: don't override flex/previous entries with new API - # ones, just update with new fields! ledger.setdefault(tid, {}).update(tdict) # generate pp msgs and cross check with ib's positions data, relay # re-formatted pps as msgs to the ems. for pos in filter( bool, - [active.get(tx.bs_mktid), closed.get(tx.bs_mktid)] + [ + active.get(tx.bs_mktid), + closed.get(tx.bs_mktid) + ] ): msgs = await update_and_audit_msgs( acctid, @@ -869,7 +865,7 @@ async def emit_pp_update( ) if msgs: msg = msgs[0] - log.info('Emitting pp msg: {msg}') + log.info(f'Emitting pp msg: {msg}') break await ems_stream.send(msg) @@ -889,11 +885,19 @@ _statuses: dict[str, str] = { # https://github.com/erdewit/ib_insync/issues/363 'inactive': 'pending', } +_action_map = { + 'BOT': 'buy', + 'SLD': 'sell', +} +# TODO: maybe just make this a flat func without an interal loop +# and call it *from* the `trade_event_stream` loop? Might look +# a lot nicer doing that from open_trade_dialog() instead of +# starting a separate task? async def deliver_trade_events( - nurse: trio.Nursery, + # nurse: trio.Nursery, trade_event_stream: trio.MemoryReceiveChannel, ems_stream: tractor.MsgStream, accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'` @@ -908,7 +912,6 @@ async def deliver_trade_events( Format and relay all trade events for a given client to emsd. ''' - action_map = {'BOT': 'buy', 'SLD': 'sell'} ids2fills: dict[str, dict] = {} # TODO: for some reason we can receive a ``None`` here when the @@ -916,7 +919,6 @@ async def deliver_trade_events( # at the eventkit code above but we should probably handle it... async for event_name, item in trade_event_stream: log.info(f'ib sending {event_name}:\n{pformat(item)}') - match event_name: # NOTE: we remap statuses to the ems set via the # ``_statuses: dict`` above. @@ -999,7 +1001,7 @@ async def deliver_trade_events( # `.submit_limit()` reqid=execu.orderId, time_ns=time.time_ns(), # cuz why not - action=action_map[execu.side], + action=_action_map[execu.side], size=execu.shares, price=execu.price, # broker_details=execdict, @@ -1163,8 +1165,16 @@ async def deliver_trade_events( case 'position': - cid, msg = pack_position(item) + pos: IbPosition = item + bs_mktid, msg = pack_position(pos) log.info(f'New IB position msg: {msg}') + + # always update with latest ib pos msg info since + # we generally audit against it for sanity and + # testing AND we require it to be updated to avoid + # error msgs emitted from `update_and_audit_msgs()` + cids2pps[(msg.account, bs_mktid)] = msg + # cuck ib and it's shitty fifo sys for pps! continue diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py index aaeda153..cc79122c 100644 --- a/piker/brokers/ib/ledger.py +++ b/piker/brokers/ib/ledger.py @@ -20,9 +20,11 @@ Trade transaction accounting and normalization. ''' from bisect import insort from decimal import Decimal +from functools import partial from pprint import pformat from typing import ( Any, + Callable, ) from bidict import bidict @@ -38,11 +40,24 @@ from piker.accounting import ( digits_to_dec, Transaction, MktPair, + iter_by_dt, ) from ._flex_reports import parse_flex_dt from ._util import log +tx_sort: Callable = partial( + iter_by_dt, + parsers={ + 'dateTime': parse_flex_dt, + 'datetime': pendulum.parse, + # for some some fucking 2022 and + # back options records...fuck me. + 'date': pendulum.parse, + } +) + + def norm_trade( tid: str, record: dict[str, Any],