Expect `<brokermod>.norm_trade_records()` to return `dict`

pptables
Tyler Goodlet 2022-07-18 10:33:56 -04:00
parent 45871d5846
commit 09d9a7ea2b
1 changed files with 74 additions and 33 deletions

View File

@ -314,7 +314,7 @@ def update_pps(
pps: dict[str, Position] = pps or {} pps: dict[str, Position] = pps or {}
# lifo update all pps from records # lifo update all pps from records
for r in records: for tid, r in records.items():
pp = pps.setdefault( pp = pps.setdefault(
r.bsuid, r.bsuid,
@ -385,20 +385,24 @@ def load_pps_from_ledger(
return {} return {}
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
src_records = brokermod.norm_trade_records(ledger) src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger)
if filter_by: if filter_by:
records = {}
bsuids = set(filter_by) bsuids = set(filter_by)
records = list(filter(lambda r: r.bsuid in bsuids, src_records)) for tid, r in src_records.items():
if r.bsuid in bsuids:
records[tid] = r
else: else:
records = src_records records = src_records
return update_pps(records) return update_pps(records)
def get_pps( @cm
def open_pps(
brokername: str, brokername: str,
acctids: Optional[set[str]] = set(), acctid: str,
) -> dict[str, dict[str, Position]]: ) -> dict[str, dict[str, Position]]:
''' '''
@ -406,34 +410,68 @@ def get_pps(
incremental update file: ``pps.toml``. incremental update file: ``pps.toml``.
''' '''
conf, path = config.load( conf, path = config.load('pps')
'pps', brokersection = conf.setdefault(brokername, {})
# load dicts as inlines to preserve compactness pps = brokersection.setdefault(acctid, {})
# _dict=toml.decoder.InlineTableDict, pp_objs = {}
)
all_active = {} # unmarshal/load ``pps.toml`` config entries into object form.
all_closed = {} for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# try to load any ledgers if no section found # convert clears sub-tables (only in this form
bconf, path = config.load('brokers') # for toml re-presentation) back into a master table.
accounts = bconf[brokername]['accounts'] clears_list = entry['clears']
for account in accounts:
# TODO: instead of this filter we could # index clears entries in "object" form by tid in a top
# always send all known pps but just not audit # level dict instead of a list (as is presented in our
# them since an active client might not be up? # ``pps.toml``).
if ( pp = pp_objs.get(bsuid)
acctids and if pp:
f'{brokername}.{account}' not in acctids clears = pp.clears
): else:
continue clears = {}
active, closed = update_pps_conf(brokername, account) for clears_table in clears_list:
all_active.setdefault(account, {}).update(active) tid = clears_table.pop('tid')
all_closed.setdefault(account, {}).update(closed) clears[tid] = clears_table
return all_active, all_closed size = entry['size']
# TODO: an audit system for existing pps entries?
# if not len(clears) == abs(size):
# pp_objs = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=reload_records,
# )
# reason = 'size <-> len(clears) mismatch'
# raise ValueError(
# '`pps.toml` entry is invalid:\n'
# f'{fqsn}\n'
# f'{pformat(entry)}'
# )
expiry = entry.get('expiry')
if expiry:
expiry = pendulum.parse(expiry)
pp_objs[bsuid] = Position(
Symbol.from_fqsn(fqsn, info={}),
size=size,
be_price=entry['be_price'],
expiry=expiry,
bsuid=entry['bsuid'],
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
# already included in the current incremental update
# state, since today's records may have already been
# processed!
clears=clears,
)
yield pp_objs
# TODO: instead see if we can hack tomli and tomli-w to do the same: # TODO: instead see if we can hack tomli and tomli-w to do the same:
@ -578,6 +616,9 @@ def load_pps_from_toml(
# caller to pass in a symbol set they'd like to reload from the # caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state. # underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[dict[str, str]] = None, reload_records: Optional[dict[str, str]] = None,
# XXX: this is "global" update from ledger flag which
# does a full refresh of pps from the available ledger.
update_from_ledger: bool = False, update_from_ledger: bool = False,
) -> tuple[dict, dict[str, Position]]: ) -> tuple[dict, dict[str, Position]]:
@ -681,7 +722,7 @@ def update_pps_conf(
brokername: str, brokername: str,
acctid: str, acctid: str,
trade_records: Optional[list[Transaction]] = None, trade_records: Optional[dict[str, Transaction]] = None,
ledger_reload: Optional[dict[str, str]] = None, ledger_reload: Optional[dict[str, str]] = None,
) -> tuple[ ) -> tuple[
@ -701,13 +742,13 @@ def update_pps_conf(
# - do diffs against updates from the ledger writer # - do diffs against updates from the ledger writer
# actor and the in-mem state here? # actor and the in-mem state here?
if trade_records and ledger_reload:
for tid, r in trade_records.items():
ledger_reload[r.bsuid] = r.fqsn
# this maps `.bsuid` values to positions # this maps `.bsuid` values to positions
pp_objs: dict[Union[str, int], Position] pp_objs: dict[Union[str, int], Position]
if trade_records and ledger_reload:
for r in trade_records:
ledger_reload[r.bsuid] = r.fqsn
conf, pp_objs = load_pps_from_toml( conf, pp_objs = load_pps_from_toml(
brokername, brokername,
acctid, acctid,