Change `Position.clears` -> `._clears[list[dict]]`

When you look at usage we don't end up really needing clear entries to
be keyed by their `Transaction.tid`, instead it's much more important to
ensure the time sorted order of trade-clearing transactions such that
position properties such as the size and ppu are calculated correctly.
Thus, this instead simplified the `.clears` table to a list of clear
dict entries making a bunch of things simpler:
- object form `Position._clears` compared to the offline TOML schema
  (saved in account files) is now data-structure-symmetrical.
- `Position.add_clear()` now uses `bisect.insort()` to
  datetime-field-sort-insert into the *list* which saves having to worry
  about sorting on every sequence *read*.

Further deats:
- adjust `.accounting._ledger.iter_by_dt()` to expect an input `list`.
- change `Position.iter_clears()` to iterate only the clearing entry
  dicts without yielding a key/tid; no more tuples.
- drop `Position.to_dict()` since parent `Struct` already implements it.
basic_buy_bot
Tyler Goodlet 2023-06-27 12:58:50 -04:00
parent 66d402b80e
commit c0d575c009
2 changed files with 106 additions and 94 deletions

View File

@ -160,15 +160,16 @@ class TransactionLedger(UserDict):
# normer = mod.norm_trade_record(txdict)
# TODO: use tx_sort here yah?
for tid, txdict in self.data.items():
for txdict in self.tx_sort(self.data.values()):
# for tid, txdict in self.data.items():
# special field handling for datetimes
# to ensure pendulum is used!
fqme = txdict.get('fqme') or txdict['fqsn']
dt = parse(txdict['dt'])
expiry = txdict.get('expiry')
tid: str = txdict['tid']
fqme: str = txdict.get('fqme') or txdict['fqsn']
dt: DateTime = parse(txdict['dt'])
expiry: str | None = txdict.get('expiry')
mkt = mkt_by_fqme.get(fqme)
if not mkt:
if not (mkt := mkt_by_fqme.get(fqme)):
# we can't build a trans if we don't have
# the ``.sys: MktPair`` info, so skip.
continue
@ -229,7 +230,7 @@ class TransactionLedger(UserDict):
def iter_by_dt(
records: dict[str, Any],
records: dict[str, dict[str, Any]] | list[dict],
# NOTE: parsers are looked up in the insert order
# so if you know that the record stats show some field
@ -247,21 +248,20 @@ def iter_by_dt(
datetime presumably set at the ``'dt'`` field in each entry.
'''
def dyn_parse_to_dt(
pair: tuple[str, dict],
) -> DateTime:
_, txdict = pair
def dyn_parse_to_dt(txdict: dict[str, Any]) -> DateTime:
k, v, parser = next(
(k, txdict[k], parsers[k]) for k in parsers if k in txdict
)
return parser(v) if parser else v
for tid, data in sorted(
records.items(),
if isinstance(records, dict):
records = records.values()
for entry in sorted(
records,
key=key or dyn_parse_to_dt,
):
yield tid, data
yield entry
def load_ledger(

View File

@ -22,6 +22,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
'''
from __future__ import annotations
from bisect import insort
from contextlib import contextmanager as cm
from decimal import Decimal
from math import copysign
@ -30,7 +31,6 @@ from pathlib import Path
from typing import (
Any,
Iterator,
Union,
Generator
)
@ -52,7 +52,6 @@ from ._mktinfo import (
from .. import config
from ..clearing._messages import (
BrokerdPosition,
Status,
)
from ..data.types import Struct
from ..log import get_logger
@ -66,16 +65,17 @@ class Position(Struct):
A financial "position" in `piker` terms is a summary of accounting
metrics computed from a transaction ledger; generally it describes
some acumulative "size" and "average price" from the summarized
some accumulative "size" and "average price" from the summarized
underlying transaction set.
In piker we focus on the `.ppu` (price per unit) and the `.bep`
(break even price) including all transaction entries and exits since
the last "net-zero" size of the destination asset's holding.
This interface serves as an object API for computing and tracking
positions as well as supports serialization for storage in the local
file system (in TOML) and to interchange as a msg over IPC.
This interface serves as an object API for computing and
tracking positions as well as supports serialization for
storage in the local file system (in TOML) and to interchange
as a msg over IPC.
'''
mkt: MktPair
@ -100,10 +100,9 @@ class Position(Struct):
split_ratio: int | None = None
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
_clears: list[
dict[str, Any], # transaction history summaries
] = {}
] = []
first_clear_dt: datetime | None = None
expiry: datetime | None = None
@ -111,34 +110,30 @@ class Position(Struct):
def __repr__(self) -> str:
return pformat(self.to_dict())
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def to_pretoml(self) -> tuple[str, dict]:
'''
Prep this position's data contents for export to toml including
re-structuring of the ``.clears`` table to an array of
inline-subtables for better ``pps.toml`` compactness.
Prep this position's data contents for export as an entry
in a TOML "account file" (such as
`account.binance.paper.toml`) including re-structuring of
the ``._clears`` entries as an array of inline-subtables
for better ``pps.toml`` compactness.
'''
d = self.to_dict()
clears = d.pop('clears')
expiry = d.pop('expiry')
asdict = self.to_dict()
clears: list[dict] = asdict.pop('_clears')
expiry = asdict.pop('expiry')
if self.split_ratio is None:
d.pop('split_ratio')
asdict.pop('split_ratio')
# should be obvious from clears/event table
d.pop('first_clear_dt')
asdict.pop('first_clear_dt')
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
mkt: MktPair = d.pop('mkt')
mkt: MktPair = asdict.pop('mkt')
assert isinstance(mkt, MktPair)
fqme = mkt.fqme
@ -148,15 +143,15 @@ class Position(Struct):
# each tradeable asset in the market.
if mkt.resolved:
dst: Asset = mkt.dst
d['asset_type'] = dst.atype
asdict['asset_type'] = dst.atype
d['price_tick'] = mkt.price_tick
d['size_tick'] = mkt.size_tick
asdict['price_tick'] = mkt.price_tick
asdict['size_tick'] = mkt.size_tick
if self.expiry is None:
d.pop('expiry', None)
asdict.pop('expiry', None)
elif expiry:
d['expiry'] = str(expiry)
asdict['expiry'] = str(expiry)
clears_table: tomlkit.Array = tomlkit.array()
clears_table.multiline(
@ -165,30 +160,29 @@ class Position(Struct):
)
# reverse sort so latest clears are at top of section?
for tid, data in iter_by_dt(clears):
for entry in iter_by_dt(clears):
inline_table = tomlkit.inline_table()
# serialize datetime to parsable `str`
dtstr = inline_table['dt'] = data['dt'].isoformat('T')
dtstr = inline_table['dt'] = entry['dt'].isoformat('T')
assert 'Datetime' not in dtstr
# insert optional clear fields in column order
for k in ['ppu', 'accum_size']:
val = data.get(k)
if val:
if val := entry.get(k):
inline_table[k] = val
# insert required fields
for k in ['price', 'size', 'cost']:
inline_table[k] = data[k]
inline_table[k] = entry[k]
inline_table['tid'] = tid
inline_table['tid'] = entry['tid']
clears_table.append(inline_table)
d['clears'] = clears_table
asdict['clears'] = clears_table
return fqme, d
return fqme, asdict
def ensure_state(self) -> None:
'''
@ -197,18 +191,16 @@ class Position(Struct):
they differ and log warnings to console.
'''
clears = list(self.clears.values())
self.first_clear_dt = min(
list(entry['dt'] for entry in clears)
)
last_clear = clears[-1]
clears: list[dict] = self._clears
self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt']
last_clear: dict = clears[-1]
csize: float = self.calc_size()
accum: float = last_clear['accum_size']
csize = self.calc_size()
accum = last_clear['accum_size']
if not self.expired():
if (
csize != accum
and csize != round(accum * self.split_ratio or 1)
and csize != round(accum * (self.split_ratio or 1))
):
raise ValueError(f'Size mismatch: {csize}')
else:
@ -221,11 +213,12 @@ class Position(Struct):
)
self.size = csize
cppu = self.calc_ppu()
ppu = last_clear['ppu']
cppu: float = self.calc_ppu()
ppu: float = last_clear['ppu']
if (
cppu != ppu
and self.split_ratio is not None
# handle any split info entered (for now) manually by user
and cppu != (ppu / self.split_ratio)
):
@ -281,15 +274,15 @@ class Position(Struct):
def iter_clears(self) -> Iterator[tuple[str, dict]]:
'''
Iterate the internally managed ``.clears: dict`` table in
Iterate the internally managed ``._clears: dict`` table in
datetime-stamped order.
'''
# sort on the already existing datetime that should have
# been generated for the entry's table
return iter_by_dt(
self.clears,
key=lambda entry: entry[1]['dt']
self._clears,
key=lambda entry: entry['dt']
)
def calc_ppu(
@ -323,9 +316,8 @@ class Position(Struct):
asize_h: list[float] = [] # historical accumulative size
ppu_h: list[float] = [] # historical price-per-unit
tid: str
entry: dict[str, Any]
for (tid, entry) in self.iter_clears():
for entry in self.iter_clears():
clear_size = entry['size']
clear_price: str | float = entry['price']
is_clear: bool = not isinstance(clear_price, str)
@ -451,7 +443,7 @@ class Position(Struct):
if self.expired():
return 0.
for tid, entry in self.clears.items():
for entry in self._clears:
size += entry['size']
# XXX: do we need it every step?
# no right since rounding is an LT?
@ -474,11 +466,11 @@ class Position(Struct):
'''
Minimize the position's clears entries by removing
all transactions before the last net zero size to avoid
unecessary history irrelevant to the current pp state.
unnecessary history irrelevant to the current pp state.
'''
size: float = 0
clears_since_zero: list[tuple(str, dict)] = []
clears_since_zero: list[dict] = []
# TODO: we might just want to always do this when iterating
# a ledger? keep a state of the last net-zero and only do the
@ -486,34 +478,44 @@ class Position(Struct):
# scan for the last "net zero" position by iterating
# transactions until the next net-zero size, rinse, repeat.
for tid, clear in self.clears.items():
for clear in self._clears:
size = float(
self.mkt.quantize(size + clear['size'])
)
clears_since_zero.append((tid, clear))
clears_since_zero.append(clear)
if size == 0:
clears_since_zero.clear()
self.clears = dict(clears_since_zero)
return self.clears
self._clears = clears_since_zero
return self._clears
def add_clear(
self,
t: Transaction,
) -> dict:
'''
Update clearing table and populate rolling ppu and accumulative
size in both the clears entry and local attrs state.
Update clearing table by calculating the rolling ppu and
(accumulative) size in both the clears entry and local
attrs state.
Inserts are always done in datetime sorted order.
'''
clear = self.clears[t.tid] = {
clear: dict[str, float | str | int] = {
'tid': t.tid,
'cost': t.cost,
'price': t.price,
'size': t.size,
'dt': t.dt
}
insort(
self._clears,
clear,
key=lambda entry: entry['dt']
)
# TODO: compute these incrementally instead
# of re-looping through each time resulting in O(n**2)
# behaviour..?
@ -526,10 +528,14 @@ class Position(Struct):
return clear
# def sugest_split(self) -> float:
# TODO: once we have an `.events` table with diff
# mkt event types..?
# def suggest_split(self) -> float:
# ...
# TODO: maybe a better name is just `Account` and we include
# a table of asset balances as `.balances: dict[Asset, float]`?
class PpTable(Struct):
brokername: str
@ -544,7 +550,12 @@ class PpTable(Struct):
cost_scalar: float = 2,
) -> dict[str, Position]:
'''
Update the internal `.pps[str, Position]` table from input
transactions recomputing the price-per-unit (ppu) and
accumulative size for each entry.
'''
pps = self.pps
updated: dict[str, Position] = {}
@ -553,7 +564,7 @@ class PpTable(Struct):
for t in sorted(
trans.values(),
key=lambda t: t.dt,
reverse=True,
# reverse=True,
):
fqme = t.fqme
bs_mktid = t.bs_mktid
@ -561,10 +572,10 @@ class PpTable(Struct):
# template the mkt-info presuming a legacy market ticks
# if no info exists in the transactions..
mkt: MktPair = t.sys
pp = pps.get(bs_mktid)
if not pp:
# if no existing pp, allocate fresh one.
pp = pps[bs_mktid] = Position(
pos = pps.get(bs_mktid)
if not pos:
# if no existing pos, allocate fresh one.
pos = pps[bs_mktid] = Position(
mkt=mkt,
size=0.0,
ppu=0.0,
@ -577,12 +588,12 @@ class PpTable(Struct):
# a shorter string), instead use the one from the
# transaction since it likely has (more) full
# information from the provider.
if len(pp.mkt.fqme) < len(fqme):
pp.mkt = mkt
if len(pos.mkt.fqme) < len(fqme):
pos.mkt = mkt
clears = pp.clears
clears: list[dict] = pos._clears
if clears:
first_clear_dt = pp.first_clear_dt
first_clear_dt = pos.first_clear_dt
# don't do updates for ledger records we already have
# included in the current pps state.
@ -601,15 +612,16 @@ class PpTable(Struct):
continue
# update clearing table
pp.add_clear(t)
updated[t.bs_mktid] = pp
pos.add_clear(t)
updated[t.bs_mktid] = pos
# minimize clears tables and update sizing.
for bs_mktid, pp in updated.items():
pp.ensure_state()
# re-calc ppu and accumulative sizing.
for bs_mktid, pos in updated.items():
pos.ensure_state()
# deliver only the position entries that were actually updated
# (modified the state) from the input transaction set.
# NOTE: deliver only the position entries that were
# actually updated (modified the state) from the input
# transaction set.
return updated
def dump_active(