Provide `datetime`-sorted clears table iteration
Likely pertains to helping with stuff in issues #345 and #373 and just generally is handy to have when processing ledgers / clearing event tables. Adds the following helper methods: - `iter_by_dt()` to iter-sort an arbitrary `Transaction`-like table of clear entries. - `Position.iter_clears()` as a convenience wrapper for the above.dark_clearing_improvements
parent
35b097469b
commit
7ef8111381
43
piker/pp.py
43
piker/pp.py
|
@ -29,6 +29,7 @@ import re
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
Iterator,
|
||||||
Optional,
|
Optional,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
@ -116,6 +117,21 @@ class Transaction(Struct, frozen=True):
|
||||||
# from: Optional[str] = None
|
# from: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
def iter_by_dt(
|
||||||
|
clears: dict[str, Any],
|
||||||
|
) -> Iterator[tuple[str, dict]]:
|
||||||
|
'''
|
||||||
|
Iterate entries of a ``clears: dict`` table sorted by entry recorded
|
||||||
|
datetime presumably set at the ``'dt'`` field in each entry.
|
||||||
|
|
||||||
|
'''
|
||||||
|
for tid, data in sorted(
|
||||||
|
list(clears.items()),
|
||||||
|
key=lambda item: item[1]['dt'],
|
||||||
|
):
|
||||||
|
yield tid, data
|
||||||
|
|
||||||
|
|
||||||
class Position(Struct):
|
class Position(Struct):
|
||||||
'''
|
'''
|
||||||
Basic pp (personal/piker position) model with attached clearing
|
Basic pp (personal/piker position) model with attached clearing
|
||||||
|
@ -183,12 +199,7 @@ class Position(Struct):
|
||||||
toml_clears_list = []
|
toml_clears_list = []
|
||||||
|
|
||||||
# reverse sort so latest clears are at top of section?
|
# reverse sort so latest clears are at top of section?
|
||||||
for tid, data in sorted(
|
for tid, data in iter_by_dt(clears):
|
||||||
list(clears.items()),
|
|
||||||
|
|
||||||
# sort by datetime
|
|
||||||
key=lambda item: item[1]['dt'],
|
|
||||||
):
|
|
||||||
inline_table = toml.TomlDecoder().get_empty_inline_table()
|
inline_table = toml.TomlDecoder().get_empty_inline_table()
|
||||||
|
|
||||||
# serialize datetime to parsable `str`
|
# serialize datetime to parsable `str`
|
||||||
|
@ -301,6 +312,14 @@ class Position(Struct):
|
||||||
# def lifo_price() -> float:
|
# def lifo_price() -> float:
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
|
def iter_clears(self) -> Iterator[tuple[str, dict]]:
|
||||||
|
'''
|
||||||
|
Iterate the internally managed ``.clears: dict`` table in
|
||||||
|
datetime-stamped order.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return iter_by_dt(self.clears)
|
||||||
|
|
||||||
def calc_ppu(
|
def calc_ppu(
|
||||||
self,
|
self,
|
||||||
# include transaction cost in breakeven price
|
# include transaction cost in breakeven price
|
||||||
|
@ -331,10 +350,9 @@ class Position(Struct):
|
||||||
asize_h: list[float] = [] # historical accumulative size
|
asize_h: list[float] = [] # historical accumulative size
|
||||||
ppu_h: list[float] = [] # historical price-per-unit
|
ppu_h: list[float] = [] # historical price-per-unit
|
||||||
|
|
||||||
clears = list(self.clears.items())
|
tid: str
|
||||||
|
entry: dict[str, Any]
|
||||||
for i, (tid, entry) in enumerate(clears):
|
for (tid, entry) in self.iter_clears():
|
||||||
|
|
||||||
clear_size = entry['size']
|
clear_size = entry['size']
|
||||||
clear_price = entry['price']
|
clear_price = entry['price']
|
||||||
|
|
||||||
|
@ -344,6 +362,11 @@ class Position(Struct):
|
||||||
|
|
||||||
sign_change: bool = False
|
sign_change: bool = False
|
||||||
|
|
||||||
|
if accum_size == 0:
|
||||||
|
ppu_h.append(0)
|
||||||
|
asize_h.append(0)
|
||||||
|
continue
|
||||||
|
|
||||||
if accum_size == 0:
|
if accum_size == 0:
|
||||||
ppu_h.append(0)
|
ppu_h.append(0)
|
||||||
asize_h.append(0)
|
asize_h.append(0)
|
||||||
|
|
Loading…
Reference in New Issue