From cc15d0248850892cd7a62d8a8742a6f53df16b38 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Jul 2022 15:18:23 -0400 Subject: [PATCH 1/5] Fix `.minimize_clears()` to include clears since zero This was just implemented totally wrong but somehow worked XD The idea was to include all trades that contribute to ongoing position size since the last time the position was "net zero", i.e. no position in the asset. Adjust arithmetic to *subtract* from the current size until a zero size condition is met and then keep all those clears as part of the "current state" clears table. Additionally this fixes another bug where the positions freshly loaded from a ledger *were not* being merged with the current `pps.toml` state. --- piker/pp.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 0a67d04f..5a953e0d 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -285,15 +285,14 @@ class Position(Struct): all transactions before the last net zero size to avoid unecessary history irrelevant to the current pp state. - ''' - size: float = 0 + size: float = self.size clears_since_zero: deque[tuple(str, dict)] = deque() # scan for the last "net zero" position by # iterating clears in reverse. for tid, clear in reversed(self.clears.items()): - size += clear['size'] + size -= clear['size'] clears_since_zero.appendleft((tid, clear)) if size == 0: @@ -358,8 +357,6 @@ def update_pps( # track clearing data pp.update(r) - assert len(set(pp.clears)) == len(pp.clears) - return pps @@ -632,7 +629,7 @@ def load_pps_from_toml( # index clears entries in "object" form by tid in a top # level dict instead of a list (as is presented in our # ``pps.toml``). - clears = {} + clears = pp_objs[bsuid].clears for clears_table in clears_list: tid = clears_table.pop('tid') clears[tid] = clears_table @@ -716,6 +713,11 @@ def update_pps_conf( for bsuid in list(pp_objs): pp = pp_objs[bsuid] + + # XXX: debug hook for size mismatches + # if bsuid == 447767096: + # breakpoint() + pp.minimize_clears() if ( From ec6a28a8b1e2035d6b281af018b2a530385239ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Jul 2022 15:23:13 -0400 Subject: [PATCH 2/5] Drop stale comment --- piker/brokers/ib/broker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 4cc20b63..911d399d 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -296,8 +296,7 @@ async def update_ledger_from_api_trades( client: Union[Client, MethodProxy], ) -> dict[str, pp.Transaction]: - # construct piker pps from trade ledger, underneath using - # LIFO style breakeven pricing calcs. + conf = get_config() # XXX; ERRGGG.. From 64d8cd448f667458df9cdbec55c175ecb7a48d0b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Jul 2022 16:12:09 -0400 Subject: [PATCH 3/5] Right, handle brand-new pp case.. --- piker/pp.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/piker/pp.py b/piker/pp.py index 5a953e0d..0769982b 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -629,7 +629,12 @@ def load_pps_from_toml( # index clears entries in "object" form by tid in a top # level dict instead of a list (as is presented in our # ``pps.toml``). - clears = pp_objs[bsuid].clears + pp = pp_objs.get(bsuid) + if pp: + clears = pp.clears + else: + clears = {} + for clears_table in clears_list: tid = clears_table.pop('tid') clears[tid] = clears_table From 076c167d6e0732c950bebd17b47deebf53db79ad Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 13:26:07 -0400 Subject: [PATCH 4/5] Fix ib pkg mod doc string --- piker/brokers/ib/__init__.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 5c329ecc..48024dc8 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -20,15 +20,10 @@ Interactive Brokers API backend. Sub-modules within break into the core functionalities: - ``broker.py`` part for orders / trading endpoints -- ``data.py`` for real-time data feed endpoints - -- ``client.py`` for the core API machinery which is ``trio``-ized +- ``feed.py`` for real-time data feed endpoints +- ``api.py`` for the core API machinery which is ``trio``-ized wrapping around ``ib_insync``. -- ``report.py`` for the hackery to build manual pp calcs - to avoid ib's absolute bullshit FIFO style position - tracking.. - """ from .api import ( get_client, From 7442d68ecf2cf38d6168076d59bb56d6658fd3d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 2 Jul 2022 15:53:12 -0400 Subject: [PATCH 5/5] Drop nesting level from emsd's pp cacheing, adjust order mode --- piker/clearing/_ems.py | 43 +++++++++++++++++++++++------------------- piker/ui/order_mode.py | 21 ++++++++++++++++----- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 17f9be1a..e0917c85 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -289,7 +289,11 @@ class TradesRelay: brokerd_dialogue: tractor.MsgStream # map of symbols to dicts of accounts to pp msgs - positions: dict[str, dict[str, BrokerdPosition]] + positions: dict[ + # brokername, acctid + tuple[str, str], + list[BrokerdPosition], + ] # allowed account names accounts: tuple[str] @@ -461,18 +465,24 @@ async def open_brokerd_trades_dialogue( # normalizing them to EMS messages and relaying back to # the piker order client set. - # locally cache and track positions per account. + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. pps = {} for msg in positions: log.info(f'loading pp: {msg}') account = msg['account'] + + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. assert account in accounts pps.setdefault( - f'{msg["symbol"]}.{broker}', - {} - )[account] = msg + (broker, account), + [], + ).append(msg) relay = TradesRelay( brokerd_dialogue=brokerd_trades_stream, @@ -578,11 +588,9 @@ async def translate_and_relay_brokerd_events( relay.positions.setdefault( # NOTE: translate to a FQSN! - f'{sym}.{broker}', - {} - ).setdefault( - pos_msg['account'], {} - ).update(pos_msg) + (broker, sym), + [] + ).append(pos_msg) # fan-out-relay position msgs immediately by # broadcasting updates on all client streams @@ -635,8 +643,8 @@ async def translate_and_relay_brokerd_events( # something is out of order, we don't have an oid for # this broker-side message. log.error( - 'Unknown oid:{oid} for msg:\n' - f'{pformat(brokerd_msg)}' + f'Unknown oid: {oid} for msg:\n' + f'{pformat(brokerd_msg)}\n' 'Unable to relay message to client side!?' ) @@ -1088,15 +1096,12 @@ async def _emsd_main( brokerd_stream = relay.brokerd_dialogue # .clone() - # flatten out collected pps from brokerd for delivery - pp_msgs = { - fqsn: list(pps.values()) - for fqsn, pps in relay.positions.items() - } - # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. - await ems_ctx.started((pp_msgs, list(relay.accounts))) + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + )) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index f5a85d64..e9ed3499 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -579,9 +579,9 @@ async def open_order_mode( providers=symbol.brokers ) - # XXX: ``brokerd`` delivers a set of account names that it allows - # use of but the user also can define the accounts they'd like - # to use, in order, in their `brokers.toml` file. + # XXX: ``brokerd`` delivers a set of account names that it + # allows use of but the user also can define the accounts they'd + # like to use, in order, in their `brokers.toml` file. accounts = {} for name in brokerd_accounts: # ensure name is in ``brokers.toml`` @@ -594,10 +594,21 @@ async def open_order_mode( iter(accounts.keys()) ) if accounts else 'paper' + # Pack position messages by account, should only be one-to-one. # NOTE: requires the backend exactly specifies # the expected symbol key in its positions msg. - pp_msgs = position_msgs.get(symkey, ()) - pps_by_account = {msg['account']: msg for msg in pp_msgs} + pps_by_account = {} + for (broker, acctid), msgs in position_msgs.items(): + for msg in msgs: + + sym = msg['symbol'] + if ( + sym == symkey or + # mega-UGH, i think we need to fix the FQSN stuff sooner + # then later.. + sym == symkey.removesuffix(f'.{broker}') + ): + pps_by_account[acctid] = msg # update pp trackers with data relayed from ``brokerd``. for account_name in accounts: