Merge pull request #365 from pikers/ppu_history

Ppu history
pydantic_zombie
goodboy 2022-07-27 12:25:23 -04:00 committed by GitHub
commit d81e629c29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 429 additions and 428 deletions

View File

@ -383,7 +383,7 @@ async def update_and_audit_msgs(
symbol=ibppmsg.symbol, symbol=ibppmsg.symbol,
currency=ibppmsg.currency, currency=ibppmsg.currency,
size=p.size, size=p.size,
avg_price=p.be_price, avg_price=p.ppu,
) )
msgs.append(msg) msgs.append(msg)
@ -430,7 +430,7 @@ async def update_and_audit_msgs(
symbol=p.symbol.front_fqsn(), symbol=p.symbol.front_fqsn(),
# currency=ibppmsg.currency, # currency=ibppmsg.currency,
size=p.size, size=p.size,
avg_price=p.be_price, avg_price=p.ppu,
) )
if validate and p.size: if validate and p.size:
raise ValueError( raise ValueError(

View File

@ -126,7 +126,7 @@ class Allocator(Struct):
l_sub_pp = self.units_limit - abs_live_size l_sub_pp = self.units_limit - abs_live_size
elif size_unit == 'currency': elif size_unit == 'currency':
live_cost_basis = abs_live_size * live_pp.be_price live_cost_basis = abs_live_size * live_pp.ppu
slot_size = currency_per_slot / price slot_size = currency_per_slot / price
l_sub_pp = (self.currency_limit - live_cost_basis) / price l_sub_pp = (self.currency_limit - live_cost_basis) / price
@ -158,7 +158,7 @@ class Allocator(Struct):
if size_unit == 'currency': if size_unit == 'currency':
# compute the "projected" limit's worth of units at the # compute the "projected" limit's worth of units at the
# current pp (weighted) price: # current pp (weighted) price:
slot_size = currency_per_slot / live_pp.be_price slot_size = currency_per_slot / live_pp.ppu
else: else:
slot_size = u_per_slot slot_size = u_per_slot
@ -200,7 +200,7 @@ class Allocator(Struct):
Position( Position(
symbol=sym, symbol=sym,
size=order_size, size=order_size,
be_price=price, ppu=price,
bsuid=sym, bsuid=sym,
) )
) )
@ -229,8 +229,8 @@ class Allocator(Struct):
abs_pp_size = abs(pp.size) abs_pp_size = abs(pp.size)
if self.size_unit == 'currency': if self.size_unit == 'currency':
# live_currency_size = size or (abs_pp_size * pp.be_price) # live_currency_size = size or (abs_pp_size * pp.ppu)
live_currency_size = abs_pp_size * pp.be_price live_currency_size = abs_pp_size * pp.ppu
prop = live_currency_size / self.currency_limit prop = live_currency_size / self.currency_limit
else: else:
@ -303,7 +303,7 @@ def mk_allocator(
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':
startup_size = startup_pp.size * startup_pp.be_price startup_size = startup_pp.size * startup_pp.ppu
if startup_size > alloc.currency_limit: if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2) alloc.currency_limit = round(startup_size, ndigits=2)

View File

@ -22,17 +22,25 @@ from contextlib import asynccontextmanager
from datetime import datetime from datetime import datetime
from operator import itemgetter from operator import itemgetter
import time import time
from typing import Tuple, Optional, Callable from typing import (
Any,
Optional,
Callable,
)
import uuid import uuid
from bidict import bidict from bidict import bidict
import pendulum
import trio import trio
import tractor import tractor
from dataclasses import dataclass from dataclasses import dataclass
from .. import data from .. import data
from ..data._source import Symbol from ..data._source import Symbol
from ..pp import Position from ..pp import (
Position,
Transaction,
)
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
from ..log import get_logger from ..log import get_logger
@ -63,11 +71,12 @@ class PaperBoi:
_buys: bidict _buys: bidict
_sells: bidict _sells: bidict
_reqids: bidict _reqids: bidict
_positions: dict[str, BrokerdPosition] _positions: dict[str, Position]
_trade_ledger: dict[str, Any]
# init edge case L1 spread # init edge case L1 spread
last_ask: Tuple[float, float] = (float('inf'), 0) # price, size last_ask: tuple[float, float] = (float('inf'), 0) # price, size
last_bid: Tuple[float, float] = (0, 0) last_bid: tuple[float, float] = (0, 0)
async def submit_limit( async def submit_limit(
self, self,
@ -77,20 +86,21 @@ class PaperBoi:
action: str, action: str,
size: float, size: float,
reqid: Optional[str], reqid: Optional[str],
) -> int: ) -> int:
"""Place an order and return integer request id provided by client. '''
Place an order and return integer request id provided by client.
""" '''
is_modify: bool = False is_modify: bool = False
if reqid is None:
reqid = str(uuid.uuid4())
else: entry = self._reqids.get(reqid)
if entry:
# order is already existing, this is a modify # order is already existing, this is a modify
(oid, symbol, action, old_price) = self._reqids[reqid] (oid, symbol, action, old_price) = entry
assert old_price != price assert old_price != price
is_modify = True is_modify = True
else:
# register order internally # register order internally
self._reqids[reqid] = (oid, symbol, action, price) self._reqids[reqid] = (oid, symbol, action, price)
@ -197,16 +207,15 @@ class PaperBoi:
""" """
# TODO: net latency model # TODO: net latency model
await trio.sleep(0.05) await trio.sleep(0.05)
fill_time_ns = time.time_ns()
fill_time_s = time.time()
msg = BrokerdFill( fill_msg = BrokerdFill(
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=fill_time_ns,
action=action, action=action,
size=size, size=size,
price=price, price=price,
broker_time=datetime.now().timestamp(), broker_time=datetime.now().timestamp(),
broker_details={ broker_details={
'paper_info': { 'paper_info': {
@ -216,7 +225,9 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(fill_msg)
self._trade_ledger.update(fill_msg.to_dict())
if order_complete: if order_complete:
@ -243,9 +254,27 @@ class PaperBoi:
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
pp_msg = self._positions.setdefault( pp = self._positions.setdefault(
token, token,
BrokerdPosition( Position(
Symbol(key=symbol),
size=size,
ppu=price,
bsuid=symbol,
)
)
t = Transaction(
fqsn=symbol,
tid=oid,
size=size,
price=price,
cost=1., # todo cost model
dt=pendulum.from_timestamp(fill_time_s),
bsuid=symbol,
)
pp.add_clear(t)
pp_msg = BrokerdPosition(
broker=self.broker, broker=self.broker,
account='paper', account='paper',
symbol=symbol, symbol=symbol,
@ -253,19 +282,9 @@ class PaperBoi:
# broker info. i guess for crypto this can be # broker info. i guess for crypto this can be
# inferred from the pair? # inferred from the pair?
currency='', currency='',
size=0.0, size=pp.size,
avg_price=0, avg_price=pp.ppu,
) )
)
# delegate update to `.pp.Position.lifo_update()`
pp = Position(
Symbol(key=symbol),
size=pp_msg.size,
be_price=pp_msg.avg_price,
bsuid=symbol,
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg) await self.ems_trades_stream.send(pp_msg)
@ -273,6 +292,7 @@ class PaperBoi:
async def simulate_fills( async def simulate_fills(
quote_stream: 'tractor.ReceiveStream', # noqa quote_stream: 'tractor.ReceiveStream', # noqa
client: PaperBoi, client: PaperBoi,
) -> None: ) -> None:
# TODO: more machinery to better simulate real-world market things: # TODO: more machinery to better simulate real-world market things:
@ -389,6 +409,24 @@ async def handle_order_requests(
# validate # validate
order = BrokerdOrder(**request_msg) order = BrokerdOrder(**request_msg)
if order.reqid is None:
reqid = str(uuid.uuid4())
else:
reqid = order.reqid
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
)
)
# call our client api to submit the order # call our client api to submit the order
reqid = await client.submit_limit( reqid = await client.submit_limit(
@ -402,20 +440,7 @@ async def handle_order_requests(
# there is no existing order so ask the client to create # there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int # a new one (which it seems to do by allocating an int
# counter - collision prone..) # counter - collision prone..)
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid, reqid=reqid,
)
) )
elif action == 'cancel': elif action == 'cancel':
@ -468,6 +493,9 @@ async def trades_dialogue(
# TODO: load paper positions from ``positions.toml`` # TODO: load paper positions from ``positions.toml``
_positions={}, _positions={},
# TODO: load postions from ledger file
_trade_ledger={},
) )
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
@ -510,5 +538,4 @@ async def open_paperboi(
loglevel=loglevel, loglevel=loglevel,
) as (ctx, first): ) as (ctx, first):
yield ctx, first yield ctx, first

View File

@ -20,9 +20,8 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends) (looking at you `ib` and dirt-bird friends)
''' '''
from collections import deque
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
# from pprint import pformat from pprint import pformat
import os import os
from os import path from os import path
from math import copysign from math import copysign
@ -130,7 +129,7 @@ class Position(Struct):
# "breakeven price" above or below which pnl moves above and below # "breakeven price" above or below which pnl moves above and below
# zero for the entirety of the current "trade state". # zero for the entirety of the current "trade state".
be_price: float ppu: float
# unique backend symbol id # unique backend symbol id
bsuid: str bsuid: str
@ -149,7 +148,7 @@ class Position(Struct):
for f in self.__struct_fields__ for f in self.__struct_fields__
} }
def to_pretoml(self) -> dict: def to_pretoml(self) -> tuple[str, dict]:
''' '''
Prep this position's data contents for export to toml including Prep this position's data contents for export to toml including
re-structuring of the ``.clears`` table to an array of re-structuring of the ``.clears`` table to an array of
@ -160,23 +159,79 @@ class Position(Struct):
clears = d.pop('clears') clears = d.pop('clears')
expiry = d.pop('expiry') expiry = d.pop('expiry')
if expiry: # TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = d.pop('symbol')
fqsn = s.front_fqsn()
size = d.pop('size')
ppu = d.pop('ppu')
d['size'], d['ppu'] = self.audit_sizing(size, ppu)
if self.expiry is None:
d.pop('expiry', None)
elif expiry:
d['expiry'] = str(expiry) d['expiry'] = str(expiry)
clears_list = [] toml_clears_list = []
for tid, data in sorted(
list(clears.items()),
for tid, data in clears.items(): # sort by datetime
key=lambda item: item[1]['dt'],
):
inline_table = toml.TomlDecoder().get_empty_inline_table() inline_table = toml.TomlDecoder().get_empty_inline_table()
inline_table['dt'] = data['dt']
# insert optional clear fields in column order
for k in ['ppu', 'accum_size']:
val = data.get(k)
if val:
inline_table[k] = val
# insert required fields
for k in ['price', 'size', 'cost']:
inline_table[k] = data[k]
inline_table['tid'] = tid inline_table['tid'] = tid
toml_clears_list.append(inline_table)
for k, v in data.items(): d['clears'] = toml_clears_list
inline_table[k] = v
clears_list.append(inline_table) return fqsn, d
d['clears'] = clears_list def audit_sizing(
self,
size: Optional[float] = None,
ppu: Optional[float] = None,
return d ) -> tuple[float, float]:
'''
Audit either the `.size` and `.ppu` values or equvialent
passed in values against the clears table calculations and
return the calc-ed values if they differ and log warnings to
console.
'''
size = size or self.size
ppu = ppu or self.ppu
csize = self.calc_size()
cppu = self.calc_ppu()
if size != csize:
log.warning(f'size != calculated size: {size} != {csize}')
size = csize
if ppu != cppu:
log.warning(
f'ppu != calculated ppu: {ppu} != {cppu}'
)
ppu = cppu
return size, ppu
def update_from_msg( def update_from_msg(
self, self,
@ -188,7 +243,7 @@ class Position(Struct):
symbol = self.symbol symbol = self.symbol
lot_size_digits = symbol.lot_size_digits lot_size_digits = symbol.lot_size_digits
be_price, size = ( ppu, size = (
round( round(
msg['avg_price'], msg['avg_price'],
ndigits=symbol.tick_size_digits ndigits=symbol.tick_size_digits
@ -199,7 +254,7 @@ class Position(Struct):
), ),
) )
self.be_price = be_price self.ppu = ppu
self.size = size self.size = size
@property @property
@ -209,25 +264,7 @@ class Position(Struct):
terms. terms.
''' '''
return self.be_price * self.size return self.ppu * self.size
def update(
self,
t: Transaction,
) -> None:
self.clears[t.tid] = {
'cost': t.cost,
'price': t.price,
'size': t.size,
'dt': str(t.dt),
}
def lifo_update(
self,
size: float,
price: float,
cost: float = 0,
# TODO: idea: "real LIFO" dynamic positioning. # TODO: idea: "real LIFO" dynamic positioning.
# - when a trade takes place where the pnl for # - when a trade takes place where the pnl for
@ -237,95 +274,117 @@ class Position(Struct):
# - in this case we could recalc the be price to # - in this case we could recalc the be price to
# be reverted back to it's prior value before the nearest term # be reverted back to it's prior value before the nearest term
# trade was opened.? # trade was opened.?
# dynamic_breakeven_price: bool = False, # def lifo_price() -> float:
# ...
) -> (float, float): def calc_ppu(
self,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost_scalar: float = 2,
) -> float:
''' '''
Incremental update using a LIFO-style weighted mean. Compute the "price-per-unit" price for the given non-zero sized
rolling position.
The recurrence relation which computes this (exponential) mean
per new clear which **increases** the accumulative postiion size
is:
ppu[-1] = (
ppu[-2] * accum_size[-2]
+
ppu[-1] * size
) / accum_size[-1]
where `cost_basis` for the current step is simply the price
* size of the most recent clearing transaction.
''' '''
# "avg position price" calcs asize_h: list[float] = [] # historical accumulative size
# TODO: eventually it'd be nice to have a small set of routines ppu_h: list[float] = [] # historical price-per-unit
# to do this stuff from a sequence of cleared orders to enable
# so called "contextual positions".
new_size = self.size + size
# old size minus the new size gives us size diff with clears = list(self.clears.items())
# +ve -> increase in pp size
# -ve -> decrease in pp size
size_diff = abs(new_size) - abs(self.size)
if new_size == 0: for i, (tid, entry) in enumerate(clears):
self.be_price = 0
elif size_diff > 0:
# XXX: LOFI incremental update:
# only update the "average price" when
# the size increases not when it decreases (i.e. the
# position is being made smaller)
self.be_price = (
# weight of current exec = (size * price) + cost
(abs(size) * price)
+
(copysign(1, new_size) * cost) # transaction cost
+
# weight of existing be price
self.be_price * abs(self.size) # weight of previous pp
) / abs(new_size) # normalized by the new size: weighted mean.
self.size = new_size
return new_size, self.be_price
def calc_be_price(self) -> float:
size: float = 0
cb_tot_size: float = 0
cost_basis: float = 0
be_price: float = 0
for tid, entry in self.clears.items():
clear_size = entry['size'] clear_size = entry['size']
clear_price = entry['price'] clear_price = entry['price']
new_size = size + clear_size
last_accum_size = asize_h[-1] if asize_h else 0
accum_size = last_accum_size + clear_size
accum_sign = copysign(1, accum_size)
sign_change: bool = False
if accum_size == 0:
ppu_h.append(0)
asize_h.append(0)
continue
# test if the pp somehow went "passed" a net zero size state
# resulting in a change of the "sign" of the size (+ve for
# long, -ve for short).
sign_change = (
copysign(1, last_accum_size) + accum_sign == 0
and last_accum_size != 0
)
# since we passed the net-zero-size state the new size
# after sum should be the remaining size the new
# "direction" (aka, long vs. short) for this clear.
if sign_change:
clear_size = accum_size
abs_diff = abs(accum_size)
asize_h.append(0)
ppu_h.append(0)
else:
# old size minus the new size gives us size diff with # old size minus the new size gives us size diff with
# +ve -> increase in pp size # +ve -> increase in pp size
# -ve -> decrease in pp size # -ve -> decrease in pp size
size_diff = abs(new_size) - abs(size) abs_diff = abs(accum_size) - abs(last_accum_size)
if new_size == 0: # XXX: LIFO breakeven price update. only an increaze in size
cost_basis = 0 # of the position contributes the breakeven price,
cb_tot_size = 0 # a decrease does not (i.e. the position is being made
be_price = 0 # smaller).
# abs_clear_size = abs(clear_size)
abs_new_size = abs(accum_size)
elif size_diff > 0: if abs_diff > 0:
# only an increaze in size of the position contributes
# the breakeven price, a decrease does not.
cost_basis += ( cost_basis = (
# weighted price per unit of # cost basis for this clear
clear_price * abs(clear_size) clear_price * abs(clear_size)
+ +
# transaction cost # transaction cost
(copysign(1, new_size) * entry['cost'] * 2) accum_sign * cost_scalar * entry['cost']
) )
cb_tot_size += abs(clear_size)
be_price = cost_basis / cb_tot_size
size = new_size if asize_h:
size_last = abs(asize_h[-1])
cb_last = ppu_h[-1] * size_last
ppu = (cost_basis + cb_last) / abs_new_size
# print( else:
# f'cb: {cost_basis}\n' ppu = cost_basis / abs_new_size
# f'size: {size}\n'
# f'clear_size: {clear_size}\n'
# f'clear_price: {clear_price}\n\n'
# f'cb_tot_size: {cb_tot_size}\n' ppu_h.append(ppu)
# f'be_price: {be_price}\n\n' asize_h.append(accum_size)
# )
return be_price else:
# on "exit" clears from a given direction,
# only the size changes not the price-per-unit
# need to be updated since the ppu remains constant
# and gets weighted by the new size.
asize_h.append(accum_size)
ppu_h.append(ppu_h[-1])
return ppu_h[-1] if ppu_h else 0
def calc_size(self) -> float: def calc_size(self) -> float:
size: float = 0 size: float = 0
@ -343,24 +402,57 @@ class Position(Struct):
unecessary history irrelevant to the current pp state. unecessary history irrelevant to the current pp state.
''' '''
size: float = self.size size: float = 0
clears_since_zero: deque[tuple(str, dict)] = deque() clears_since_zero: list[tuple(str, dict)] = []
# scan for the last "net zero" position by # TODO: we might just want to always do this when iterating
# iterating clears in reverse. # a ledger? keep a state of the last net-zero and only do the
for tid, clear in reversed(self.clears.items()): # full iterate when no state was stashed?
size -= clear['size']
clears_since_zero.appendleft((tid, clear)) # scan for the last "net zero" position by iterating
# transactions until the next net-zero size, rinse, repeat.
for tid, clear in self.clears.items():
size += clear['size']
clears_since_zero.append((tid, clear))
if size == 0: if size == 0:
break clears_since_zero.clear()
self.clears = dict(clears_since_zero) self.clears = dict(clears_since_zero)
return self.clears return self.clears
def add_clear(
self,
t: Transaction,
) -> dict:
'''
Update clearing table and populate rolling ppu and accumulative
size in both the clears entry and local attrs state.
'''
clear = self.clears[t.tid] = {
'cost': t.cost,
'price': t.price,
'size': t.size,
'dt': str(t.dt),
}
# TODO: compute these incrementally instead
# of re-looping through each time resulting in O(n**2)
# behaviour..
# compute these **after** adding the entry
# in order to make the recurrence relation math work
# inside ``.calc_size()``.
self.size = clear['accum_size'] = self.calc_size()
self.ppu = clear['ppu'] = self.calc_ppu()
return clear
class PpTable(Struct): class PpTable(Struct):
brokername: str
acctid: str
pps: dict[str, Position] pps: dict[str, Position]
conf: Optional[dict] = {} conf: Optional[dict] = {}
@ -372,31 +464,30 @@ class PpTable(Struct):
) -> dict[str, Position]: ) -> dict[str, Position]:
pps = self.pps pps = self.pps
updated: dict[str, Position] = {} updated: dict[str, Position] = {}
# lifo update all pps from records # lifo update all pps from records
for tid, r in trans.items(): for tid, t in trans.items():
pp = pps.setdefault( pp = pps.setdefault(
r.bsuid, t.bsuid,
# if no existing pp, allocate fresh one. # if no existing pp, allocate fresh one.
Position( Position(
Symbol.from_fqsn( Symbol.from_fqsn(
r.fqsn, t.fqsn,
info={}, info={},
), ),
size=0.0, size=0.0,
be_price=0.0, ppu=0.0,
bsuid=r.bsuid, bsuid=t.bsuid,
expiry=r.expiry, expiry=t.expiry,
) )
) )
# don't do updates for ledger records we already have # don't do updates for ledger records we already have
# included in the current pps state. # included in the current pps state.
if r.tid in pp.clears: if t.tid in pp.clears:
# NOTE: likely you'll see repeats of the same # NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting # ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from # a ``brokerd.ib`` where the API will re-report trades from
@ -404,30 +495,20 @@ class PpTable(Struct):
# "double count" these in pp calculations. # "double count" these in pp calculations.
continue continue
# lifo style "breakeven" price calc # update clearing table
pp.lifo_update( pp.add_clear(t)
r.size, updated[t.bsuid] = pp
r.price,
# include transaction cost in breakeven price # minimize clears tables and update sizing.
# and presume the worst case of the same cost for bsuid, pp in updated.items():
# to exit this transaction (even though in reality pp.size, pp.ppu = pp.audit_sizing()
# it will be dynamic based on exit stratetgy).
cost=cost_scalar*r.cost,
)
# track clearing data
pp.update(r)
updated[r.bsuid] = pp
return updated return updated
def dump_active( def dump_active(
self, self,
brokername: str,
) -> tuple[ ) -> tuple[
dict[str, Any], dict[str, Position],
dict[str, Position] dict[str, Position]
]: ]:
''' '''
@ -437,13 +518,12 @@ class PpTable(Struct):
``Position``s which have recently closed. ``Position``s which have recently closed.
''' '''
# ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
# NOTE: newly closed position are also important to report/return # NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react # since a consumer, like an order mode UI ;), might want to react
# based on the closure. # based on the closure (for example removing the breakeven line
pp_entries = {} # and clearing the entry from any lists/monitors).
closed_pp_objs: dict[str, Position] = {} closed_pp_objs: dict[str, Position] = {}
open_pp_objs: dict[str, Position] = {}
pp_objs = self.pps pp_objs = self.pps
for bsuid in list(pp_objs): for bsuid in list(pp_objs):
@ -454,7 +534,7 @@ class PpTable(Struct):
# if bsuid == qqqbsuid: # if bsuid == qqqbsuid:
# breakpoint() # breakpoint()
pp.minimize_clears() pp.size, pp.ppu = pp.audit_sizing()
if ( if (
# "net-zero" is a "closed" position # "net-zero" is a "closed" position
@ -470,53 +550,71 @@ class PpTable(Struct):
# used to check for duplicate clears that may come in as # used to check for duplicate clears that may come in as
# new transaction from some backend API and need to be # new transaction from some backend API and need to be
# ignored; the closed positions won't be written to the # ignored; the closed positions won't be written to the
# ``pps.toml`` since ``pp_entries`` above is what's # ``pps.toml`` since ``pp_active_entries`` above is what's
# written. # written.
# closed_pp = pp_objs.pop(bsuid, None) closed_pp_objs[bsuid] = pp
closed_pp = pp_objs.get(bsuid)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else: else:
open_pp_objs[bsuid] = pp
return open_pp_objs, closed_pp_objs
def to_toml(
self,
) -> dict[str, Any]:
active, closed = self.dump_active()
# ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
to_toml_dict = {}
for bsuid, pos in active.items():
# keep the minimal amount of clears that make up this
# position since the last net-zero state.
pos.minimize_clears()
# serialize to pre-toml form # serialize to pre-toml form
asdict = pp.to_pretoml() fqsn, asdict = pos.to_pretoml()
if pp.expiry is None:
asdict.pop('expiry', None)
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
log.info(f'Updating active pp: {fqsn}') log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under # XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this? # the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.removeprefix(f'{brokername}.') brokerless_key = fqsn.removeprefix(f'{self.brokername}.')
to_toml_dict[brokerless_key] = asdict
pp_entries[brokerless_key] = asdict return to_toml_dict
return pp_entries, closed_pp_objs def write_config(self) -> None:
def update_pps(
records: dict[str, Transaction],
pps: Optional[dict[str, Position]] = None
) -> dict[str, Position]:
''' '''
Compile a set of positions from a trades ledger. Write the current position table to the user's ``pps.toml``.
''' '''
pps: dict[str, Position] = pps or {} # TODO: show diff output?
table = PpTable(pps) # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
table.update_from_trans(records) print(f'Updating ``pps.toml`` for {path}:\n')
return table.pps
# active, closed_pp_objs = table.dump_active()
pp_entries = self.to_toml()
self.conf[self.brokername][self.acctid] = pp_entries
# TODO: why tf haven't they already done this for inline
# tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[
toml.decoder.InlineTableDict
] = enc.dump_inline_table
config.write(
self.conf,
'pps',
encoder=enc,
)
def load_trans_from_ledger( def load_pps_from_ledger(
brokername: str, brokername: str,
acctname: str, acctname: str,
@ -524,24 +622,27 @@ def load_trans_from_ledger(
# post normalization filter on ledger entries to be processed # post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None, filter_by: Optional[list[dict]] = None,
) -> dict[str, Position]: ) -> tuple[
dict[str, Transaction],
dict[str, Position],
]:
''' '''
Open a ledger file by broker name and account and read in and Open a ledger file by broker name and account and read in and
process any trade records into our normalized ``Transaction`` process any trade records into our normalized ``Transaction`` form
form and then pass these into the position processing routine and then update the equivalent ``Pptable`` and deliver the two
and deliver the two dict-sets of the active and closed pps. bsuid-mapped dict-sets of the transactions and pps.
''' '''
with open_trade_ledger( with (
brokername, open_trade_ledger(brokername, acctname) as ledger,
acctname, open_pps(brokername, acctname) as table,
) as ledger: ):
if not ledger: if not ledger:
# null case, no ledger file with content # null case, no ledger file with content
return {} return {}
brokermod = get_brokermod(brokername) mod = get_brokermod(brokername)
src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger) src_records: dict[str, Transaction] = mod.norm_trade_records(ledger)
if filter_by: if filter_by:
records = {} records = {}
@ -552,7 +653,9 @@ def load_trans_from_ledger(
else: else:
records = src_records records = src_records
return records updated = table.update_from_trans(records)
return records, updated
# TODO: instead see if we can hack tomli and tomli-w to do the same: # TODO: instead see if we can hack tomli and tomli-w to do the same:
@ -686,67 +789,6 @@ class PpsEncoder(toml.TomlEncoder):
return (retstr, retdict) return (retstr, retdict)
def load_pps_from_toml(
brokername: str,
acctid: str,
# XXX: there is an edge case here where we may want to either audit
# the retrieved ``pps.toml`` output or reprocess it since there was
# an error on write on the last attempt to update the state file
# even though the ledger *was* updated. For this cases we allow the
# caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[dict[str, str]] = None,
# XXX: this is "global" update from ledger flag which
# does a full refresh of pps from the available ledger.
update_from_ledger: bool = False,
) -> tuple[PpTable, dict[str, str]]:
'''
Load and marshal to objects all pps from either an existing
``pps.toml`` config, or from scratch from a ledger file when
none yet exists.
'''
with open_pps(
brokername,
acctid,
write_on_exit=False,
) as table:
pp_objs = table.pps
# no pps entry yet for this broker/account so parse any available
# ledgers to build a brand new pps state.
if not pp_objs or update_from_ledger:
trans = load_trans_from_ledger(
brokername,
acctid,
)
table.update_from_trans(trans)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pp_objs and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
trans = load_trans_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
table.update_from_trans(trans)
if not table.pps:
log.warning(
f'No `pps.toml` values could be loaded {brokername}:{acctid}'
)
return table, table.conf
@cm @cm
def open_pps( def open_pps(
brokername: str, brokername: str,
@ -763,8 +805,23 @@ def open_pps(
brokersection = conf.setdefault(brokername, {}) brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {}) pps = brokersection.setdefault(acctid, {})
# TODO: ideally we can pass in an existing
# pps state to this right? such that we
# don't have to do a ledger reload all the
# time.. a couple ideas I can think of,
# - mirror this in some client side actor which
# does the actual ledger updates (say the paper
# engine proc if we decide to always spawn it?),
# - do diffs against updates from the ledger writer
# actor and the in-mem state here?
pp_objs = {} pp_objs = {}
table = PpTable(pp_objs, conf=conf) table = PpTable(
brokername,
acctid,
pp_objs,
conf=conf,
)
# unmarshal/load ``pps.toml`` config entries into object form # unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries. # and update `PpTable` obj entries.
@ -789,29 +846,17 @@ def open_pps(
clears[tid] = clears_table clears[tid] = clears_table
size = entry['size'] size = entry['size']
# TODO: remove but, handle old field name for now
# TODO: an audit system for existing pps entries? ppu = entry.get('ppu', entry.get('be_price', 0))
# if not len(clears) == abs(size):
# pp_objs = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=reload_records,
# )
# reason = 'size <-> len(clears) mismatch'
# raise ValueError(
# '`pps.toml` entry is invalid:\n'
# f'{fqsn}\n'
# f'{pformat(entry)}'
# )
expiry = entry.get('expiry') expiry = entry.get('expiry')
if expiry: if expiry:
expiry = pendulum.parse(expiry) expiry = pendulum.parse(expiry)
pp_objs[bsuid] = Position( pp = pp_objs[bsuid] = Position(
Symbol.from_fqsn(fqsn, info={}), Symbol.from_fqsn(fqsn, info={}),
size=size, size=size,
be_price=entry['be_price'], ppu=ppu,
expiry=expiry, expiry=expiry,
bsuid=entry['bsuid'], bsuid=entry['bsuid'],
@ -823,90 +868,14 @@ def open_pps(
clears=clears, clears=clears,
) )
# audit entries loaded from toml
pp.size, pp.ppu = pp.audit_sizing()
try: try:
yield table yield table
finally: finally:
if write_on_exit: if write_on_exit:
# TODO: show diff output? table.write_config()
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n')
pp_entries, closed_pp_objs = table.dump_active(brokername)
conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline
# tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[
toml.decoder.InlineTableDict
] = enc.dump_inline_table
config.write(
conf,
'pps',
encoder=enc,
)
def update_pps_conf(
brokername: str,
acctid: str,
trade_records: Optional[dict[str, Transaction]] = None,
ledger_reload: Optional[dict[str, str]] = None,
) -> tuple[
dict[str, Position],
dict[str, Position],
]:
# TODO: ideally we can pass in an existing
# pps state to this right? such that we
# don't have to do a ledger reload all the
# time.. a couple ideas I can think of,
# - load pps once after backend ledger state
# is loaded and keep maintainend in memory
# inside a with block,
# - mirror this in some client side actor which
# does the actual ledger updates (say the paper
# engine proc if we decide to always spawn it?),
# - do diffs against updates from the ledger writer
# actor and the in-mem state here?
if trade_records and ledger_reload:
for tid, r in trade_records.items():
ledger_reload[r.bsuid] = r.fqsn
table, conf = load_pps_from_toml(
brokername,
acctid,
reload_records=ledger_reload,
)
# update all pp objects from any (new) trade records which
# were passed in (aka incremental update case).
if trade_records:
table.update_from_trans(trade_records)
# this maps `.bsuid` values to positions
pp_entries, closed_pp_objs = table.dump_active(brokername)
pp_objs: dict[Union[str, int], Position] = table.pps
conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[toml.decoder.InlineTableDict] = enc.dump_inline_table
config.write(
conf,
'pps',
encoder=enc,
)
# deliver object form of all pps in table to caller
return pp_objs, closed_pp_objs
if __name__ == '__main__': if __name__ == '__main__':
@ -917,4 +886,9 @@ if __name__ == '__main__':
args = args[1:] args = args[1:]
for acctid in args: for acctid in args:
broker, name = acctid.split('.') broker, name = acctid.split('.')
update_pps_conf(broker, name) trans, updated_pps = load_pps_from_ledger(broker, name)
print(
f'Processing transactions into pps for {broker}:{acctid}\n'
f'{pformat(trans)}\n\n'
f'{pformat(updated_pps)}'
)

View File

@ -106,8 +106,8 @@ async def update_pnl_from_feed(
# compute and display pnl status # compute and display pnl status
order_mode.pane.pnl_label.format( order_mode.pane.pnl_label.format(
pnl=copysign(1, size) * pnl( pnl=copysign(1, size) * pnl(
# live.be_price, # live.ppu,
order_mode.current_pp.live_pp.be_price, order_mode.current_pp.live_pp.ppu,
tick['price'], tick['price'],
), ),
) )
@ -357,7 +357,7 @@ class SettingsPane:
# last historical close price # last historical close price
last = feed.shm.array[-1][['close']][0] last = feed.shm.array[-1][['close']][0]
pnl_value = copysign(1, size) * pnl( pnl_value = copysign(1, size) * pnl(
tracker.live_pp.be_price, tracker.live_pp.ppu,
last, last,
) )
@ -557,7 +557,7 @@ class PositionTracker:
pp = position or self.live_pp pp = position or self.live_pp
self.update_line( self.update_line(
pp.be_price, pp.ppu,
pp.size, pp.size,
self.chart.linked.symbol.lot_size_digits, self.chart.linked.symbol.lot_size_digits,
) )
@ -571,7 +571,7 @@ class PositionTracker:
self.hide() self.hide()
else: else:
self._level_marker.level = pp.be_price self._level_marker.level = pp.ppu
# these updates are critical to avoid lag on view/scene changes # these updates are critical to avoid lag on view/scene changes
self._level_marker.update() # trigger paint self._level_marker.update() # trigger paint

View File

@ -610,7 +610,7 @@ async def open_order_mode(
startup_pp = Position( startup_pp = Position(
symbol=symbol, symbol=symbol,
size=0, size=0,
be_price=0, ppu=0,
# XXX: BLEH, do we care about this on the client side? # XXX: BLEH, do we care about this on the client side?
bsuid=symbol, bsuid=symbol,