From c53071e43a291251924f42cba84d8e01672b729f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 19 Oct 2022 13:18:19 -0400 Subject: [PATCH] WIP adding draft-commented code to try and get splits workin.. --- piker/data/feed.py | 16 +++++------ piker/pp.py | 71 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 16 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index c2fbbd5e..f03eb668 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -491,10 +491,10 @@ async def manage_history( readonly=False, ) # TODO: history validation - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) + # if not opened: + # raise RuntimeError( + # "Persistent shm for sym was already open?!" + # ) rt_shm, opened = maybe_open_shm_array( key=f'{fqsn}_rt', @@ -506,10 +506,10 @@ async def manage_history( readonly=False, size=3*_secs_in_day, ) - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) + # if not opened: + # raise RuntimeError( + # "Persistent shm for sym was already open?!" + # ) log.info('Scanning for existing `marketstored`') diff --git a/piker/pp.py b/piker/pp.py index 8fdaaa4d..10f62d7c 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. (looking at you `ib` and dirt-bird friends) ''' +from __future__ import annotations from contextlib import contextmanager as cm from pprint import pformat import os @@ -138,13 +139,31 @@ class Position(Struct): # ordered record of known constituent trade messages clears: dict[ - Union[str, int, Status], # trade id + str | int, # trade id dict[str, Any], # transaction history summaries ] = {} first_clear_dt: Optional[datetime] = None expiry: Optional[datetime] = None + # @property + # def clears(self) -> dict[ + # Union[str, int, Status], # trade id + # dict[str, Any], # transaction history summaries + # ]: + # ''' + # Datetime sorted reference to internal clears table. + + # ''' + # # self._clears = {} + # self._clears = dict(sorted( + # self._clears.items(), + # key=lambda entry: entry[1]['dt'], + # )) + # # self._clears[k] = v + + # return self._clears + def to_dict(self) -> dict: return { f: getattr(self, f) @@ -219,6 +238,10 @@ class Position(Struct): ''' clears = list(self.clears.values()) + if not clears: + log.warning(f'No clears table for {self.symbol}!?') + return + self.first_clear_dt = min(list(entry['dt'] for entry in clears)) last_clear = clears[-1] @@ -623,6 +646,7 @@ class PpTable(Struct): def to_toml( self, + min_clears: bool = True, ) -> dict[str, Any]: active, closed = self.dump_active() @@ -635,7 +659,9 @@ class PpTable(Struct): # keep the minimal amount of clears that make up this # position since the last net-zero state. - pos.minimize_clears() + if min_clears: + pos.minimize_clears() + pos.ensure_state() # serialize to pre-toml form @@ -682,6 +708,8 @@ def load_pps_from_ledger( brokername: str, acctname: str, + table: Optional[PpTable] = None, + # post normalization filter on ledger entries to be processed filter_by: Optional[list[dict]] = None, @@ -698,7 +726,6 @@ def load_pps_from_ledger( ''' with ( open_trade_ledger(brokername, acctname) as ledger, - open_pps(brokername, acctname) as table, ): if not ledger: # null case, no ledger file with content @@ -716,7 +743,11 @@ def load_pps_from_ledger( else: records = src_records - updated = table.update_from_trans(records) + if table is None: + with open_pps(brokername, acctname) as table: + updated = table.update_from_trans(records) + else: + updated = table.update_from_trans(records) return records, updated @@ -886,15 +917,27 @@ def open_pps( conf=conf, ) + # first pass populate all missing clears record tables + # for fqsn, entry in pps.items(): + # # convert clears sub-tables (only in this form + # # for toml re-presentation) back into a master table. + # clears_list = entry.get('clears', []) + + # # attempt to reload from ledger + # if not clears_list: + # trans, pos = load_pps_from_ledger( + # brokername, + # acctid, + # filter_by=[entry['bsuid']], + # table=table, + # ) + # # breakpoint() + # unmarshal/load ``pps.toml`` config entries into object form # and update `PpTable` obj entries. for fqsn, entry in pps.items(): bsuid = entry['bsuid'] - # convert clears sub-tables (only in this form - # for toml re-presentation) back into a master table. - clears_list = entry['clears'] - # index clears entries in "object" form by tid in a top # level dict instead of a list (as is presented in our # ``pps.toml``). @@ -906,6 +949,18 @@ def open_pps( # processing of new clear events. trans: list[Transaction] = [] + # convert clears sub-tables (only in this form + # for toml re-presentation) back into a master table. + clears_list = entry['clears'] + + # # attempt to reload from ledger + # if not clears_list: + # trans, pos = load_pps_from_ledger( + # brokername, + # acctid, + # table=table, + # ) + for clears_table in clears_list: tid = clears_table.pop('tid') dtstr = clears_table['dt']