diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 5c329ecc..48024dc8 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -20,15 +20,10 @@ Interactive Brokers API backend. Sub-modules within break into the core functionalities: - ``broker.py`` part for orders / trading endpoints -- ``data.py`` for real-time data feed endpoints - -- ``client.py`` for the core API machinery which is ``trio``-ized +- ``feed.py`` for real-time data feed endpoints +- ``api.py`` for the core API machinery which is ``trio``-ized wrapping around ``ib_insync``. -- ``report.py`` for the hackery to build manual pp calcs - to avoid ib's absolute bullshit FIFO style position - tracking.. - """ from .api import ( get_client, diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 4cc20b63..911d399d 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -296,8 +296,7 @@ async def update_ledger_from_api_trades( client: Union[Client, MethodProxy], ) -> dict[str, pp.Transaction]: - # construct piker pps from trade ledger, underneath using - # LIFO style breakeven pricing calcs. + conf = get_config() # XXX; ERRGGG.. diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 17f9be1a..e0917c85 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -289,7 +289,11 @@ class TradesRelay: brokerd_dialogue: tractor.MsgStream # map of symbols to dicts of accounts to pp msgs - positions: dict[str, dict[str, BrokerdPosition]] + positions: dict[ + # brokername, acctid + tuple[str, str], + list[BrokerdPosition], + ] # allowed account names accounts: tuple[str] @@ -461,18 +465,24 @@ async def open_brokerd_trades_dialogue( # normalizing them to EMS messages and relaying back to # the piker order client set. - # locally cache and track positions per account. + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. pps = {} for msg in positions: log.info(f'loading pp: {msg}') account = msg['account'] + + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. assert account in accounts pps.setdefault( - f'{msg["symbol"]}.{broker}', - {} - )[account] = msg + (broker, account), + [], + ).append(msg) relay = TradesRelay( brokerd_dialogue=brokerd_trades_stream, @@ -578,11 +588,9 @@ async def translate_and_relay_brokerd_events( relay.positions.setdefault( # NOTE: translate to a FQSN! - f'{sym}.{broker}', - {} - ).setdefault( - pos_msg['account'], {} - ).update(pos_msg) + (broker, sym), + [] + ).append(pos_msg) # fan-out-relay position msgs immediately by # broadcasting updates on all client streams @@ -635,8 +643,8 @@ async def translate_and_relay_brokerd_events( # something is out of order, we don't have an oid for # this broker-side message. log.error( - 'Unknown oid:{oid} for msg:\n' - f'{pformat(brokerd_msg)}' + f'Unknown oid: {oid} for msg:\n' + f'{pformat(brokerd_msg)}\n' 'Unable to relay message to client side!?' ) @@ -1088,15 +1096,12 @@ async def _emsd_main( brokerd_stream = relay.brokerd_dialogue # .clone() - # flatten out collected pps from brokerd for delivery - pp_msgs = { - fqsn: list(pps.values()) - for fqsn, pps in relay.positions.items() - } - # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. - await ems_ctx.started((pp_msgs, list(relay.accounts))) + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + )) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates diff --git a/piker/pp.py b/piker/pp.py index 0a67d04f..0769982b 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -285,15 +285,14 @@ class Position(Struct): all transactions before the last net zero size to avoid unecessary history irrelevant to the current pp state. - ''' - size: float = 0 + size: float = self.size clears_since_zero: deque[tuple(str, dict)] = deque() # scan for the last "net zero" position by # iterating clears in reverse. for tid, clear in reversed(self.clears.items()): - size += clear['size'] + size -= clear['size'] clears_since_zero.appendleft((tid, clear)) if size == 0: @@ -358,8 +357,6 @@ def update_pps( # track clearing data pp.update(r) - assert len(set(pp.clears)) == len(pp.clears) - return pps @@ -632,7 +629,12 @@ def load_pps_from_toml( # index clears entries in "object" form by tid in a top # level dict instead of a list (as is presented in our # ``pps.toml``). - clears = {} + pp = pp_objs.get(bsuid) + if pp: + clears = pp.clears + else: + clears = {} + for clears_table in clears_list: tid = clears_table.pop('tid') clears[tid] = clears_table @@ -716,6 +718,11 @@ def update_pps_conf( for bsuid in list(pp_objs): pp = pp_objs[bsuid] + + # XXX: debug hook for size mismatches + # if bsuid == 447767096: + # breakpoint() + pp.minimize_clears() if ( diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index f5a85d64..e9ed3499 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -579,9 +579,9 @@ async def open_order_mode( providers=symbol.brokers ) - # XXX: ``brokerd`` delivers a set of account names that it allows - # use of but the user also can define the accounts they'd like - # to use, in order, in their `brokers.toml` file. + # XXX: ``brokerd`` delivers a set of account names that it + # allows use of but the user also can define the accounts they'd + # like to use, in order, in their `brokers.toml` file. accounts = {} for name in brokerd_accounts: # ensure name is in ``brokers.toml`` @@ -594,10 +594,21 @@ async def open_order_mode( iter(accounts.keys()) ) if accounts else 'paper' + # Pack position messages by account, should only be one-to-one. # NOTE: requires the backend exactly specifies # the expected symbol key in its positions msg. - pp_msgs = position_msgs.get(symkey, ()) - pps_by_account = {msg['account']: msg for msg in pp_msgs} + pps_by_account = {} + for (broker, acctid), msgs in position_msgs.items(): + for msg in msgs: + + sym = msg['symbol'] + if ( + sym == symkey or + # mega-UGH, i think we need to fix the FQSN stuff sooner + # then later.. + sym == symkey.removesuffix(f'.{broker}') + ): + pps_by_account[acctid] = msg # update pp trackers with data relayed from ``brokerd``. for account_name in accounts: