diff --git a/piker/pp.py b/piker/pp.py index 3860bf36..fd8d5cda 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -21,6 +21,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. ''' from contextlib import contextmanager as cm +# from pprint import pformat import os from os import path import re @@ -33,7 +34,7 @@ from typing import ( from msgspec import Struct import pendulum from pendulum import datetime, now -# import tomli +import tomli import toml from . import config @@ -73,8 +74,8 @@ def open_trade_ledger( ) with open(tradesfile, 'w') as cf: pass # touch - with open(tradesfile, 'r') as cf: - ledger = toml.load(tradesfile) + with open(tradesfile, 'rb') as cf: + ledger = tomli.load(cf) cpy = ledger.copy() try: yield cpy @@ -91,7 +92,9 @@ def open_trade_ledger( class Transaction(Struct): - fqsn: str # normally fqsn + # TODO: should this be ``.to`` (see below)? + fqsn: str + tid: Union[str, int] # unique transaction id size: float price: float @@ -104,6 +107,9 @@ class Transaction(Struct): # is for is truly unique. bsuid: Optional[Union[str, int]] = None + # optional fqsn for the source "asset"/money symbol? + # from: Optional[str] = None + class Position(Struct): ''' @@ -307,6 +313,9 @@ def update_pps( # track clearing data pp.clears[r.tid] = { 'cost': r.cost, + 'price': r.price, + 'size': r.size, + 'dt': str(r.dt), } assert len(set(pp.clears)) == len(pp.clears) @@ -359,7 +368,7 @@ def load_pps_from_ledger( acctname: str, # post normalization filter on ledger entries to be processed - filter_by: Optional[list[Transaction]] = None, + filter_by: Optional[list[dict]] = None, ) -> dict[str, Position]: ''' @@ -381,7 +390,7 @@ def load_pps_from_ledger( records = brokermod.norm_trade_records(ledger) if filter_by: - bsuids = set(r.bsuid for r in filter_by) + bsuids = set(filter_by) records = filter(lambda r: r.bsuid in bsuids, records) return update_pps(records) @@ -390,7 +399,6 @@ def load_pps_from_ledger( def get_pps( brokername: str, acctids: Optional[set[str]] = set(), - key_by: Optional[str] = None, ) -> dict[str, dict[str, Position]]: ''' @@ -403,7 +411,9 @@ def get_pps( # load dicts as inlines to preserve compactness # _dict=toml.decoder.InlineTableDict, ) + all_active = {} + all_closed = {} # try to load any ledgers if no section found bconf, path = config.load('brokers') @@ -419,10 +429,11 @@ def get_pps( ): continue - active = update_pps_conf(brokername, account, key_by=key_by) + active, closed = update_pps_conf(brokername, account) all_active.setdefault(account, {}).update(active) + all_closed.setdefault(account, {}).update(closed) - return all_active + return all_active, all_closed # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -566,7 +577,8 @@ def load_pps_from_toml( # even though the ledger *was* updated. For this cases we allow the # caller to pass in a symbol set they'd like to reload from the # underlying ledger to be reprocessed in computing pps state. - reload_records: Optional[list[Transaction]] = None, + reload_records: Optional[dict[str, str]] = None, + update_from_ledger: bool = False, ) -> tuple[dict, dict[str, Position]]: ''' @@ -580,9 +592,9 @@ def load_pps_from_toml( pps = brokersection.setdefault(acctid, {}) pp_objs = {} - if not pps: - # no pps entry yet for this broker/account so parse - # any available ledgers to build a pps state. + # no pps entry yet for this broker/account so parse any available + # ledgers to build a brand new pps state. + if not pps or update_from_ledger: pp_objs = load_pps_from_ledger( brokername, acctid, @@ -591,8 +603,7 @@ def load_pps_from_toml( # Reload symbol specific ledger entries if requested by the # caller **AND** none exist in the current pps state table. elif ( - pps and reload_records and - not any(r.fqsn in pps for r in reload_records) + pps and reload_records ): # no pps entry yet for this broker/account so parse # any available ledgers to build a pps state. @@ -609,6 +620,7 @@ def load_pps_from_toml( # unmarshal/load ``pps.toml`` config entries into object form. for fqsn, entry in pps.items(): + bsuid = entry['bsuid'] # convert clears sub-tables (only in this form # for toml re-presentation) back into a master table. @@ -622,13 +634,29 @@ def load_pps_from_toml( tid = clears_table.pop('tid') clears[tid] = clears_table + size = entry['size'] + + # TODO: an audit system for existing pps entries? + # if not len(clears) == abs(size): + # pp_objs = load_pps_from_ledger( + # brokername, + # acctid, + # filter_by=reload_records, + # ) + # reason = 'size <-> len(clears) mismatch' + # raise ValueError( + # '`pps.toml` entry is invalid:\n' + # f'{fqsn}\n' + # f'{pformat(entry)}' + # ) + expiry = entry.get('expiry') if expiry: expiry = pendulum.parse(expiry) - pp_objs[fqsn] = Position( + pp_objs[bsuid] = Position( Symbol.from_fqsn(fqsn, info={}), - size=entry['size'], + size=size, be_price=entry['be_price'], expiry=expiry, bsuid=entry['bsuid'], @@ -649,14 +677,24 @@ def update_pps_conf( acctid: str, trade_records: Optional[list[Transaction]] = None, - key_by: Optional[str] = None, + ledger_reload: Optional[dict[str, str]] = None, -) -> dict[str, Position]: +) -> tuple[ + dict[str, Position], + dict[str, Position], +]: + + # this maps `.bsuid` values to positions + pp_objs: dict[Union[str, int], Position] + + if trade_records and ledger_reload: + for r in trade_records: + ledger_reload[r.bsuid] = r.fqsn conf, pp_objs = load_pps_from_toml( brokername, acctid, - reload_records=trade_records, + reload_records=ledger_reload, ) # update all pp objects from any (new) trade records which @@ -667,6 +705,9 @@ def update_pps_conf( pps=pp_objs, ) + # NOTE: newly closed position are also important to report/return + # since a consumer, like an order mode UI ;), might want to react + # based on the closure. active, closed = dump_active(pp_objs) # dict-serialize all active pps @@ -687,8 +728,11 @@ def update_pps_conf( brokerless_key = fqsn.rstrip(f'.{brokername}') pp_entries[brokerless_key] = pp_dict + closed_pp_objs: dict[str, Position] = {} for bsuid in closed: - pp_objs.pop(bsuid, None) + closed_pp = pp_objs.pop(bsuid, None) + if closed_pp: + closed_pp_objs[bsuid] = closed_pp conf[brokername][acctid] = pp_entries @@ -703,11 +747,8 @@ def update_pps_conf( encoder=enc, ) - if key_by: - pp_objs = {getattr(pp, key_by): pp for pp in pp_objs} - # deliver object form of all pps in table to caller - return pp_objs + return pp_objs, closed_pp_objs if __name__ == '__main__':