Support re-processing a filtered ledger entry set

This makes it possible to refresh a single fqsn-position in one's
`pps.toml` by simply deleting the file entry, in which case, if there is
new trade records passed to `load_pps_from_toml()` via the new
`reload_records` kwarg, then the backend ledger entries matching that
symbol will be filtered and used to recompute a fresh position.

This turns out to be super handy when you have crashes that prevent
a `pps.toml` entry from being updated correctly but where the ledger
does have all the data necessary to calculate a fresh correct entry.
lifo_pps_ib
Tyler Goodlet 2022-06-21 12:37:33 -04:00
parent f32b4d37cb
commit 4fdfb81876
1 changed files with 75 additions and 41 deletions

View File

@ -33,6 +33,7 @@ from typing import (
from msgspec import Struct from msgspec import Struct
import pendulum import pendulum
from pendulum import datetime, now from pendulum import datetime, now
# import tomli
import toml import toml
from . import config from . import config
@ -357,6 +358,9 @@ def load_pps_from_ledger(
brokername: str, brokername: str,
acctname: str, acctname: str,
# post normalization filter on ledger entries to be processed
filter_by: Optional[list[Transaction]] = None,
) -> dict[str, Position]: ) -> dict[str, Position]:
''' '''
Open a ledger file by broker name and account and read in and Open a ledger file by broker name and account and read in and
@ -369,14 +373,17 @@ def load_pps_from_ledger(
brokername, brokername,
acctname, acctname,
) as ledger: ) as ledger:
pass # readonly if not ledger:
# null case, no ledger file with content
if not ledger: return {}
# null case, no ledger file with content
return {}
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
records = brokermod.norm_trade_records(ledger) records = brokermod.norm_trade_records(ledger)
if filter_by:
bsuids = set(r.bsuid for r in filter_by)
records = filter(lambda r: r.bsuid in bsuids, records)
return update_pps(records) return update_pps(records)
@ -551,7 +558,15 @@ class PpsEncoder(toml.TomlEncoder):
def load_pps_from_toml( def load_pps_from_toml(
brokername: str, brokername: str,
acctid: str acctid: str,
# XXX: there is an edge case here where we may want to either audit
# the retrieved ``pps.toml`` output or reprocess it since there was
# an error on write on the last attempt to update the state file
# even though the ledger *was* updated. For this cases we allow the
# caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[list[Transaction]] = None,
) -> tuple[dict, dict[str, Position]]: ) -> tuple[dict, dict[str, Position]]:
''' '''
@ -563,6 +578,7 @@ def load_pps_from_toml(
conf, path = config.load('pps') conf, path = config.load('pps')
brokersection = conf.setdefault(brokername, {}) brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {}) pps = brokersection.setdefault(acctid, {})
pp_objs = {}
if not pps: if not pps:
# no pps entry yet for this broker/account so parse # no pps entry yet for this broker/account so parse
@ -571,46 +587,59 @@ def load_pps_from_toml(
brokername, brokername,
acctid, acctid,
) )
if not pps:
log.warning(
f'No trade history could be loaded for {brokername}:{acctid}'
)
else: # Reload symbol specific ledger entries if requested by the
# unmarshal/load ``pps.toml`` config entries into object form. # caller **AND** none exist in the current pps state table.
pp_objs = {} elif (
for fqsn, entry in pps.items(): pps and reload_records and
not any(r.fqsn in pps for r in reload_records)
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
pp_objs = load_pps_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
# convert clears sub-tables (only in this form if not pps:
# for toml re-presentation) back into a master table. log.warning(
clears_list = entry['clears'] f'No trade history could be loaded for {brokername}:{acctid}'
)
# index clears entries in "object" form by tid in a top # unmarshal/load ``pps.toml`` config entries into object form.
# level dict instead of a list (as is presented in our for fqsn, entry in pps.items():
# ``pps.toml``).
clears = {}
for clears_table in clears_list:
tid = clears_table.pop('tid')
clears[tid] = clears_table
expiry = entry.get('expiry') # convert clears sub-tables (only in this form
if expiry: # for toml re-presentation) back into a master table.
expiry = pendulum.parse(expiry) clears_list = entry['clears']
pp_objs[fqsn] = Position( # index clears entries in "object" form by tid in a top
Symbol.from_fqsn(fqsn, info={}), # level dict instead of a list (as is presented in our
size=entry['size'], # ``pps.toml``).
be_price=entry['be_price'], clears = {}
expiry=expiry, for clears_table in clears_list:
bsuid=entry['bsuid'], tid = clears_table.pop('tid')
clears[tid] = clears_table
# XXX: super critical, we need to be sure to include expiry = entry.get('expiry')
# all pps.toml clears to avoid reusing clears that were if expiry:
# already included in the current incremental update expiry = pendulum.parse(expiry)
# state, since today's records may have already been
# processed! pp_objs[fqsn] = Position(
clears=clears, Symbol.from_fqsn(fqsn, info={}),
) size=entry['size'],
be_price=entry['be_price'],
expiry=expiry,
bsuid=entry['bsuid'],
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
# already included in the current incremental update
# state, since today's records may have already been
# processed!
clears=clears,
)
return conf, pp_objs return conf, pp_objs
@ -618,12 +647,17 @@ def load_pps_from_toml(
def update_pps_conf( def update_pps_conf(
brokername: str, brokername: str,
acctid: str, acctid: str,
trade_records: Optional[list[Transaction]] = None, trade_records: Optional[list[Transaction]] = None,
key_by: Optional[str] = None, key_by: Optional[str] = None,
) -> dict[str, Position]: ) -> dict[str, Position]:
conf, pp_objs = load_pps_from_toml(brokername, acctid) conf, pp_objs = load_pps_from_toml(
brokername,
acctid,
reload_records=trade_records,
)
# update all pp objects from any (new) trade records which # update all pp objects from any (new) trade records which
# were passed in (aka incremental update case). # were passed in (aka incremental update case).