Support per-symbol reload from ledger pp loading

- use `tomli` package for reading since it's the fastest pure python
  reader available apparently.
- add new fields to each pp's clears table: price, size, dt
- make `load_pps_from_toml()`'s `reload_records` a dict that can be
  passed in by the caller and is verbatim used to re-read a ledger and
  filter to the specified symbol set to build out fresh pp objects.
- add a `update_from_ledger: bool` flag to `load_pps_from_toml()`
  to allow forcing a full backend ledger read.
- if a set of trades records is passed into `update_pps_conf()` parse
  out the meta data required to cause a ledger reload as per 2 bullets
  above.
- return active and closed pps in separate by-account maps from
  `update_pps_conf()`.
- drop the `key_by` kwarg.
lifo_pps_ib
Tyler Goodlet 2022-06-22 15:41:26 -04:00
parent cc68501c7a
commit a12e6800ff
1 changed files with 66 additions and 25 deletions

View File

@ -21,6 +21,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
''' '''
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
# from pprint import pformat
import os import os
from os import path from os import path
import re import re
@ -33,7 +34,7 @@ from typing import (
from msgspec import Struct from msgspec import Struct
import pendulum import pendulum
from pendulum import datetime, now from pendulum import datetime, now
# import tomli import tomli
import toml import toml
from . import config from . import config
@ -73,8 +74,8 @@ def open_trade_ledger(
) )
with open(tradesfile, 'w') as cf: with open(tradesfile, 'w') as cf:
pass # touch pass # touch
with open(tradesfile, 'r') as cf: with open(tradesfile, 'rb') as cf:
ledger = toml.load(tradesfile) ledger = tomli.load(cf)
cpy = ledger.copy() cpy = ledger.copy()
try: try:
yield cpy yield cpy
@ -91,7 +92,9 @@ def open_trade_ledger(
class Transaction(Struct): class Transaction(Struct):
fqsn: str # normally fqsn # TODO: should this be ``.to`` (see below)?
fqsn: str
tid: Union[str, int] # unique transaction id tid: Union[str, int] # unique transaction id
size: float size: float
price: float price: float
@ -104,6 +107,9 @@ class Transaction(Struct):
# is for is truly unique. # is for is truly unique.
bsuid: Optional[Union[str, int]] = None bsuid: Optional[Union[str, int]] = None
# optional fqsn for the source "asset"/money symbol?
# from: Optional[str] = None
class Position(Struct): class Position(Struct):
''' '''
@ -307,6 +313,9 @@ def update_pps(
# track clearing data # track clearing data
pp.clears[r.tid] = { pp.clears[r.tid] = {
'cost': r.cost, 'cost': r.cost,
'price': r.price,
'size': r.size,
'dt': str(r.dt),
} }
assert len(set(pp.clears)) == len(pp.clears) assert len(set(pp.clears)) == len(pp.clears)
@ -359,7 +368,7 @@ def load_pps_from_ledger(
acctname: str, acctname: str,
# post normalization filter on ledger entries to be processed # post normalization filter on ledger entries to be processed
filter_by: Optional[list[Transaction]] = None, filter_by: Optional[list[dict]] = None,
) -> dict[str, Position]: ) -> dict[str, Position]:
''' '''
@ -381,7 +390,7 @@ def load_pps_from_ledger(
records = brokermod.norm_trade_records(ledger) records = brokermod.norm_trade_records(ledger)
if filter_by: if filter_by:
bsuids = set(r.bsuid for r in filter_by) bsuids = set(filter_by)
records = filter(lambda r: r.bsuid in bsuids, records) records = filter(lambda r: r.bsuid in bsuids, records)
return update_pps(records) return update_pps(records)
@ -390,7 +399,6 @@ def load_pps_from_ledger(
def get_pps( def get_pps(
brokername: str, brokername: str,
acctids: Optional[set[str]] = set(), acctids: Optional[set[str]] = set(),
key_by: Optional[str] = None,
) -> dict[str, dict[str, Position]]: ) -> dict[str, dict[str, Position]]:
''' '''
@ -403,7 +411,9 @@ def get_pps(
# load dicts as inlines to preserve compactness # load dicts as inlines to preserve compactness
# _dict=toml.decoder.InlineTableDict, # _dict=toml.decoder.InlineTableDict,
) )
all_active = {} all_active = {}
all_closed = {}
# try to load any ledgers if no section found # try to load any ledgers if no section found
bconf, path = config.load('brokers') bconf, path = config.load('brokers')
@ -419,10 +429,11 @@ def get_pps(
): ):
continue continue
active = update_pps_conf(brokername, account, key_by=key_by) active, closed = update_pps_conf(brokername, account)
all_active.setdefault(account, {}).update(active) all_active.setdefault(account, {}).update(active)
all_closed.setdefault(account, {}).update(closed)
return all_active return all_active, all_closed
# TODO: instead see if we can hack tomli and tomli-w to do the same: # TODO: instead see if we can hack tomli and tomli-w to do the same:
@ -566,7 +577,8 @@ def load_pps_from_toml(
# even though the ledger *was* updated. For this cases we allow the # even though the ledger *was* updated. For this cases we allow the
# caller to pass in a symbol set they'd like to reload from the # caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state. # underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[list[Transaction]] = None, reload_records: Optional[dict[str, str]] = None,
update_from_ledger: bool = False,
) -> tuple[dict, dict[str, Position]]: ) -> tuple[dict, dict[str, Position]]:
''' '''
@ -580,9 +592,9 @@ def load_pps_from_toml(
pps = brokersection.setdefault(acctid, {}) pps = brokersection.setdefault(acctid, {})
pp_objs = {} pp_objs = {}
if not pps: # no pps entry yet for this broker/account so parse any available
# no pps entry yet for this broker/account so parse # ledgers to build a brand new pps state.
# any available ledgers to build a pps state. if not pps or update_from_ledger:
pp_objs = load_pps_from_ledger( pp_objs = load_pps_from_ledger(
brokername, brokername,
acctid, acctid,
@ -591,8 +603,7 @@ def load_pps_from_toml(
# Reload symbol specific ledger entries if requested by the # Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table. # caller **AND** none exist in the current pps state table.
elif ( elif (
pps and reload_records and pps and reload_records
not any(r.fqsn in pps for r in reload_records)
): ):
# no pps entry yet for this broker/account so parse # no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state. # any available ledgers to build a pps state.
@ -609,6 +620,7 @@ def load_pps_from_toml(
# unmarshal/load ``pps.toml`` config entries into object form. # unmarshal/load ``pps.toml`` config entries into object form.
for fqsn, entry in pps.items(): for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# convert clears sub-tables (only in this form # convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table. # for toml re-presentation) back into a master table.
@ -622,13 +634,29 @@ def load_pps_from_toml(
tid = clears_table.pop('tid') tid = clears_table.pop('tid')
clears[tid] = clears_table clears[tid] = clears_table
size = entry['size']
# TODO: an audit system for existing pps entries?
# if not len(clears) == abs(size):
# pp_objs = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=reload_records,
# )
# reason = 'size <-> len(clears) mismatch'
# raise ValueError(
# '`pps.toml` entry is invalid:\n'
# f'{fqsn}\n'
# f'{pformat(entry)}'
# )
expiry = entry.get('expiry') expiry = entry.get('expiry')
if expiry: if expiry:
expiry = pendulum.parse(expiry) expiry = pendulum.parse(expiry)
pp_objs[fqsn] = Position( pp_objs[bsuid] = Position(
Symbol.from_fqsn(fqsn, info={}), Symbol.from_fqsn(fqsn, info={}),
size=entry['size'], size=size,
be_price=entry['be_price'], be_price=entry['be_price'],
expiry=expiry, expiry=expiry,
bsuid=entry['bsuid'], bsuid=entry['bsuid'],
@ -649,14 +677,24 @@ def update_pps_conf(
acctid: str, acctid: str,
trade_records: Optional[list[Transaction]] = None, trade_records: Optional[list[Transaction]] = None,
key_by: Optional[str] = None, ledger_reload: Optional[dict[str, str]] = None,
) -> dict[str, Position]: ) -> tuple[
dict[str, Position],
dict[str, Position],
]:
# this maps `.bsuid` values to positions
pp_objs: dict[Union[str, int], Position]
if trade_records and ledger_reload:
for r in trade_records:
ledger_reload[r.bsuid] = r.fqsn
conf, pp_objs = load_pps_from_toml( conf, pp_objs = load_pps_from_toml(
brokername, brokername,
acctid, acctid,
reload_records=trade_records, reload_records=ledger_reload,
) )
# update all pp objects from any (new) trade records which # update all pp objects from any (new) trade records which
@ -667,6 +705,9 @@ def update_pps_conf(
pps=pp_objs, pps=pp_objs,
) )
# NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react
# based on the closure.
active, closed = dump_active(pp_objs) active, closed = dump_active(pp_objs)
# dict-serialize all active pps # dict-serialize all active pps
@ -687,8 +728,11 @@ def update_pps_conf(
brokerless_key = fqsn.rstrip(f'.{brokername}') brokerless_key = fqsn.rstrip(f'.{brokername}')
pp_entries[brokerless_key] = pp_dict pp_entries[brokerless_key] = pp_dict
closed_pp_objs: dict[str, Position] = {}
for bsuid in closed: for bsuid in closed:
pp_objs.pop(bsuid, None) closed_pp = pp_objs.pop(bsuid, None)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
conf[brokername][acctid] = pp_entries conf[brokername][acctid] = pp_entries
@ -703,11 +747,8 @@ def update_pps_conf(
encoder=enc, encoder=enc,
) )
if key_by:
pp_objs = {getattr(pp, key_by): pp for pp in pp_objs}
# deliver object form of all pps in table to caller # deliver object form of all pps in table to caller
return pp_objs return pp_objs, closed_pp_objs
if __name__ == '__main__': if __name__ == '__main__':