From 87f301500de7456c4307950a6c0526e53645db17 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jun 2022 14:59:47 -0400 Subject: [PATCH] Simplify updates to single-pass, fix clears minimizing Gah, was a remaining bug where if you tried to update the pps state with both new trades and from the ledger you'd do a double add of transactions that were cleared during a `update_pps()` loop. Instead now keep all clears in tact until ready to serialize to the `pps.toml` file in which cases we call a new method `Position.minimize_clears()` which does the work of only keep clears since the last net-zero size. Re-implement `update_pps_conf()` update logic as a single pass loop which does expiry and size checking for closed pps all in one pass thus allowing us to drop `dump_active()` which was kinda redundant anyway.. --- piker/pp.py | 166 ++++++++++++++++++++++++++++------------------------ 1 file changed, 89 insertions(+), 77 deletions(-) diff --git a/piker/pp.py b/piker/pp.py index 4d1be0c4..bdd2fae0 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -20,11 +20,13 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. (looking at you `ib` and dirt-bird friends) ''' +from collections import deque from contextlib import contextmanager as cm # from pprint import pformat import os from os import path import re +import time from typing import ( Any, Optional, @@ -75,7 +77,9 @@ def open_trade_ledger( with open(tradesfile, 'w') as cf: pass # touch with open(tradesfile, 'rb') as cf: + start = time.time() ledger = tomli.load(cf) + print(f'Ledger load took {time.time() - start}s') cpy = ledger.copy() try: yield cpy @@ -205,6 +209,18 @@ class Position(Struct): ''' return self.be_price * self.size + def update( + self, + t: Transaction, + + ) -> None: + self.clears[t.tid] = { + 'cost': t.cost, + 'price': t.price, + 'size': t.size, + 'dt': str(t.dt), + } + def lifo_update( self, size: float, @@ -257,6 +273,32 @@ class Position(Struct): return new_size, self.be_price + def minimize_clears( + self, + + ) -> dict[str, dict]: + ''' + Minimize the position's clears entries by removing + all transactions before the last net zero size to avoid + unecessary history irrelevant to the current pp state. + + + ''' + size: float = 0 + clears_since_zero: deque[tuple(str, dict)] = deque() + + # scan for the last "net zero" position by + # iterating clears in reverse. + for tid, clear in reversed(self.clears.items()): + size += clear['size'] + clears_since_zero.appendleft((tid, clear)) + + if size == 0: + break + + self.clears = dict(clears_since_zero) + return self.clears + def update_pps( records: dict[str, Transaction], @@ -310,62 +352,14 @@ def update_pps( cost=2*r.cost, ) - if pp.size == 0: - pp.clears.clear() + # track clearing data + pp.update(r) - else: - # track clearing data - pp.clears[r.tid] = { - 'cost': r.cost, - 'price': r.price, - 'size': r.size, - 'dt': str(r.dt), - } + assert len(set(pp.clears)) == len(pp.clears) - assert len(set(pp.clears)) == len(pp.clears) return pps -def dump_active( - pps: dict[str, Position], - -) -> tuple[ - dict[str, Any], - dict[str, Any], -]: - ''' - Split pps into those that are "active" (non-zero size) and "closed" - (zero size) and return in 2 dicts. - - Returning the "closed" set is important for updating the pps state - in any ``pps.toml`` such that we remove entries which are no longer - part of any "VaR" set (well presumably, except of course your liquidity - asset could be full of "risk" XD ). - - ''' - active = {} - closed = {} - - for k, pp in pps.items(): - - asdict = pp.to_pretoml() - - if pp.expiry is None: - asdict.pop('expiry', None) - - if ( - pp.size == 0 - - # drop time-expired positions (normally derivatives) - or (pp.expiry and pp.expiry < now()) - ): - closed[k] = asdict - else: - active[k] = asdict - - return active, closed - - def load_pps_from_ledger( brokername: str, @@ -391,11 +385,13 @@ def load_pps_from_ledger( return {} brokermod = get_brokermod(brokername) - records = brokermod.norm_trade_records(ledger) + src_records = brokermod.norm_trade_records(ledger) if filter_by: bsuids = set(filter_by) - records = filter(lambda r: r.bsuid in bsuids, records) + records = list(filter(lambda r: r.bsuid in bsuids, src_records)) + else: + records = src_records return update_pps(records) @@ -709,34 +705,50 @@ def update_pps_conf( pps=pp_objs, ) + pp_entries = {} # dict-serialize all active pps # NOTE: newly closed position are also important to report/return # since a consumer, like an order mode UI ;), might want to react # based on the closure. - active, closed = dump_active(pp_objs) - - # dict-serialize all active pps - pp_entries = {} - - for bsuid, pp_dict in active.items(): - - # normalize to a simpler flat dict format - s = pp_dict.pop('symbol') - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - fqsn = s.front_fqsn() - - print(f'Updating active pp: {fqsn}') - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.rstrip(f'.{brokername}') - pp_entries[brokerless_key] = pp_dict - closed_pp_objs: dict[str, Position] = {} - for bsuid in closed: - closed_pp = pp_objs.pop(bsuid, None) - if closed_pp: - closed_pp_objs[bsuid] = closed_pp + + for bsuid in list(pp_objs): + pp = pp_objs[bsuid] + pp.minimize_clears() + + if ( + pp.size == 0 + + # drop time-expired positions (normally derivatives) + or (pp.expiry and pp.expiry < now()) + ): + # if expired the position is closed + pp.size = 0 + + # position is already closed aka "net zero" + closed_pp = pp_objs.pop(bsuid, None) + if closed_pp: + closed_pp_objs[bsuid] = closed_pp + + else: + # serialize to pre-toml form + asdict = pp.to_pretoml() + + if pp.expiry is None: + asdict.pop('expiry', None) + + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = asdict.pop('symbol') + fqsn = s.front_fqsn() + print(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.rstrip(f'.{brokername}') + + pp_entries[brokerless_key] = asdict conf[brokername][acctid] = pp_entries