Simplify updates to single-pass, fix clears minimizing
Gah, was a remaining bug where if you tried to update the pps state with both new trades and from the ledger you'd do a double add of transactions that were cleared during a `update_pps()` loop. Instead now keep all clears in tact until ready to serialize to the `pps.toml` file in which cases we call a new method `Position.minimize_clears()` which does the work of only keep clears since the last net-zero size. Re-implement `update_pps_conf()` update logic as a single pass loop which does expiry and size checking for closed pps all in one pass thus allowing us to drop `dump_active()` which was kinda redundant anyway..lifo_pps_ib
parent
566a54ffb6
commit
87f301500d
158
piker/pp.py
158
piker/pp.py
|
@ -20,11 +20,13 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
|
|||
(looking at you `ib` and dirt-bird friends)
|
||||
|
||||
'''
|
||||
from collections import deque
|
||||
from contextlib import contextmanager as cm
|
||||
# from pprint import pformat
|
||||
import os
|
||||
from os import path
|
||||
import re
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
|
@ -75,7 +77,9 @@ def open_trade_ledger(
|
|||
with open(tradesfile, 'w') as cf:
|
||||
pass # touch
|
||||
with open(tradesfile, 'rb') as cf:
|
||||
start = time.time()
|
||||
ledger = tomli.load(cf)
|
||||
print(f'Ledger load took {time.time() - start}s')
|
||||
cpy = ledger.copy()
|
||||
try:
|
||||
yield cpy
|
||||
|
@ -205,6 +209,18 @@ class Position(Struct):
|
|||
'''
|
||||
return self.be_price * self.size
|
||||
|
||||
def update(
|
||||
self,
|
||||
t: Transaction,
|
||||
|
||||
) -> None:
|
||||
self.clears[t.tid] = {
|
||||
'cost': t.cost,
|
||||
'price': t.price,
|
||||
'size': t.size,
|
||||
'dt': str(t.dt),
|
||||
}
|
||||
|
||||
def lifo_update(
|
||||
self,
|
||||
size: float,
|
||||
|
@ -257,6 +273,32 @@ class Position(Struct):
|
|||
|
||||
return new_size, self.be_price
|
||||
|
||||
def minimize_clears(
|
||||
self,
|
||||
|
||||
) -> dict[str, dict]:
|
||||
'''
|
||||
Minimize the position's clears entries by removing
|
||||
all transactions before the last net zero size to avoid
|
||||
unecessary history irrelevant to the current pp state.
|
||||
|
||||
|
||||
'''
|
||||
size: float = 0
|
||||
clears_since_zero: deque[tuple(str, dict)] = deque()
|
||||
|
||||
# scan for the last "net zero" position by
|
||||
# iterating clears in reverse.
|
||||
for tid, clear in reversed(self.clears.items()):
|
||||
size += clear['size']
|
||||
clears_since_zero.appendleft((tid, clear))
|
||||
|
||||
if size == 0:
|
||||
break
|
||||
|
||||
self.clears = dict(clears_since_zero)
|
||||
return self.clears
|
||||
|
||||
|
||||
def update_pps(
|
||||
records: dict[str, Transaction],
|
||||
|
@ -310,62 +352,14 @@ def update_pps(
|
|||
cost=2*r.cost,
|
||||
)
|
||||
|
||||
if pp.size == 0:
|
||||
pp.clears.clear()
|
||||
|
||||
else:
|
||||
# track clearing data
|
||||
pp.clears[r.tid] = {
|
||||
'cost': r.cost,
|
||||
'price': r.price,
|
||||
'size': r.size,
|
||||
'dt': str(r.dt),
|
||||
}
|
||||
pp.update(r)
|
||||
|
||||
assert len(set(pp.clears)) == len(pp.clears)
|
||||
|
||||
return pps
|
||||
|
||||
|
||||
def dump_active(
|
||||
pps: dict[str, Position],
|
||||
|
||||
) -> tuple[
|
||||
dict[str, Any],
|
||||
dict[str, Any],
|
||||
]:
|
||||
'''
|
||||
Split pps into those that are "active" (non-zero size) and "closed"
|
||||
(zero size) and return in 2 dicts.
|
||||
|
||||
Returning the "closed" set is important for updating the pps state
|
||||
in any ``pps.toml`` such that we remove entries which are no longer
|
||||
part of any "VaR" set (well presumably, except of course your liquidity
|
||||
asset could be full of "risk" XD ).
|
||||
|
||||
'''
|
||||
active = {}
|
||||
closed = {}
|
||||
|
||||
for k, pp in pps.items():
|
||||
|
||||
asdict = pp.to_pretoml()
|
||||
|
||||
if pp.expiry is None:
|
||||
asdict.pop('expiry', None)
|
||||
|
||||
if (
|
||||
pp.size == 0
|
||||
|
||||
# drop time-expired positions (normally derivatives)
|
||||
or (pp.expiry and pp.expiry < now())
|
||||
):
|
||||
closed[k] = asdict
|
||||
else:
|
||||
active[k] = asdict
|
||||
|
||||
return active, closed
|
||||
|
||||
|
||||
def load_pps_from_ledger(
|
||||
|
||||
brokername: str,
|
||||
|
@ -391,11 +385,13 @@ def load_pps_from_ledger(
|
|||
return {}
|
||||
|
||||
brokermod = get_brokermod(brokername)
|
||||
records = brokermod.norm_trade_records(ledger)
|
||||
src_records = brokermod.norm_trade_records(ledger)
|
||||
|
||||
if filter_by:
|
||||
bsuids = set(filter_by)
|
||||
records = filter(lambda r: r.bsuid in bsuids, records)
|
||||
records = list(filter(lambda r: r.bsuid in bsuids, src_records))
|
||||
else:
|
||||
records = src_records
|
||||
|
||||
return update_pps(records)
|
||||
|
||||
|
@ -709,35 +705,51 @@ def update_pps_conf(
|
|||
pps=pp_objs,
|
||||
)
|
||||
|
||||
pp_entries = {} # dict-serialize all active pps
|
||||
# NOTE: newly closed position are also important to report/return
|
||||
# since a consumer, like an order mode UI ;), might want to react
|
||||
# based on the closure.
|
||||
active, closed = dump_active(pp_objs)
|
||||
|
||||
# dict-serialize all active pps
|
||||
pp_entries = {}
|
||||
|
||||
for bsuid, pp_dict in active.items():
|
||||
|
||||
# normalize to a simpler flat dict format
|
||||
s = pp_dict.pop('symbol')
|
||||
# TODO: we need to figure out how to have one top level
|
||||
# listing venue here even when the backend isn't providing
|
||||
# it via the trades ledger..
|
||||
fqsn = s.front_fqsn()
|
||||
|
||||
print(f'Updating active pp: {fqsn}')
|
||||
# XXX: ugh, it's cuz we push the section under
|
||||
# the broker name.. maybe we need to rethink this?
|
||||
brokerless_key = fqsn.rstrip(f'.{brokername}')
|
||||
pp_entries[brokerless_key] = pp_dict
|
||||
|
||||
closed_pp_objs: dict[str, Position] = {}
|
||||
for bsuid in closed:
|
||||
|
||||
for bsuid in list(pp_objs):
|
||||
pp = pp_objs[bsuid]
|
||||
pp.minimize_clears()
|
||||
|
||||
if (
|
||||
pp.size == 0
|
||||
|
||||
# drop time-expired positions (normally derivatives)
|
||||
or (pp.expiry and pp.expiry < now())
|
||||
):
|
||||
# if expired the position is closed
|
||||
pp.size = 0
|
||||
|
||||
# position is already closed aka "net zero"
|
||||
closed_pp = pp_objs.pop(bsuid, None)
|
||||
if closed_pp:
|
||||
closed_pp_objs[bsuid] = closed_pp
|
||||
|
||||
else:
|
||||
# serialize to pre-toml form
|
||||
asdict = pp.to_pretoml()
|
||||
|
||||
if pp.expiry is None:
|
||||
asdict.pop('expiry', None)
|
||||
|
||||
# TODO: we need to figure out how to have one top level
|
||||
# listing venue here even when the backend isn't providing
|
||||
# it via the trades ledger..
|
||||
# drop symbol obj in serialized form
|
||||
s = asdict.pop('symbol')
|
||||
fqsn = s.front_fqsn()
|
||||
print(f'Updating active pp: {fqsn}')
|
||||
|
||||
# XXX: ugh, it's cuz we push the section under
|
||||
# the broker name.. maybe we need to rethink this?
|
||||
brokerless_key = fqsn.rstrip(f'.{brokername}')
|
||||
|
||||
pp_entries[brokerless_key] = asdict
|
||||
|
||||
conf[brokername][acctid] = pp_entries
|
||||
|
||||
# TODO: why tf haven't they already done this for inline tables smh..
|
||||
|
|
Loading…
Reference in New Issue