diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index bc712677..93128240 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -383,7 +383,7 @@ async def update_and_audit_msgs( symbol=ibppmsg.symbol, currency=ibppmsg.currency, size=p.size, - avg_price=p.be_price, + avg_price=p.ppu, ) msgs.append(msg) @@ -430,7 +430,7 @@ async def update_and_audit_msgs( symbol=p.symbol.front_fqsn(), # currency=ibppmsg.currency, size=p.size, - avg_price=p.be_price, + avg_price=p.ppu, ) if validate and p.size: raise ValueError( diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index 8e51ed16..62b8c115 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -126,7 +126,7 @@ class Allocator(Struct): l_sub_pp = self.units_limit - abs_live_size elif size_unit == 'currency': - live_cost_basis = abs_live_size * live_pp.be_price + live_cost_basis = abs_live_size * live_pp.ppu slot_size = currency_per_slot / price l_sub_pp = (self.currency_limit - live_cost_basis) / price @@ -158,7 +158,7 @@ class Allocator(Struct): if size_unit == 'currency': # compute the "projected" limit's worth of units at the # current pp (weighted) price: - slot_size = currency_per_slot / live_pp.be_price + slot_size = currency_per_slot / live_pp.ppu else: slot_size = u_per_slot @@ -200,7 +200,7 @@ class Allocator(Struct): Position( symbol=sym, size=order_size, - be_price=price, + ppu=price, bsuid=sym, ) ) @@ -229,8 +229,8 @@ class Allocator(Struct): abs_pp_size = abs(pp.size) if self.size_unit == 'currency': - # live_currency_size = size or (abs_pp_size * pp.be_price) - live_currency_size = abs_pp_size * pp.be_price + # live_currency_size = size or (abs_pp_size * pp.ppu) + live_currency_size = abs_pp_size * pp.ppu prop = live_currency_size / self.currency_limit else: @@ -303,7 +303,7 @@ def mk_allocator( # if the current position is already greater then the limit # settings, increase the limit to the current position if alloc.size_unit == 'currency': - startup_size = startup_pp.size * startup_pp.be_price + startup_size = startup_pp.size * startup_pp.ppu if startup_size > alloc.currency_limit: alloc.currency_limit = round(startup_size, ndigits=2) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 59c05e96..7d2ceebd 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -22,17 +22,25 @@ from contextlib import asynccontextmanager from datetime import datetime from operator import itemgetter import time -from typing import Tuple, Optional, Callable +from typing import ( + Any, + Optional, + Callable, +) import uuid from bidict import bidict +import pendulum import trio import tractor from dataclasses import dataclass from .. import data from ..data._source import Symbol -from ..pp import Position +from ..pp import ( + Position, + Transaction, +) from ..data._normalize import iterticks from ..data._source import unpack_fqsn from ..log import get_logger @@ -63,11 +71,12 @@ class PaperBoi: _buys: bidict _sells: bidict _reqids: bidict - _positions: dict[str, BrokerdPosition] + _positions: dict[str, Position] + _trade_ledger: dict[str, Any] # init edge case L1 spread - last_ask: Tuple[float, float] = (float('inf'), 0) # price, size - last_bid: Tuple[float, float] = (0, 0) + last_ask: tuple[float, float] = (float('inf'), 0) # price, size + last_bid: tuple[float, float] = (0, 0) async def submit_limit( self, @@ -77,22 +86,23 @@ class PaperBoi: action: str, size: float, reqid: Optional[str], + ) -> int: - """Place an order and return integer request id provided by client. + ''' + Place an order and return integer request id provided by client. - """ + ''' is_modify: bool = False - if reqid is None: - reqid = str(uuid.uuid4()) - else: + entry = self._reqids.get(reqid) + if entry: # order is already existing, this is a modify - (oid, symbol, action, old_price) = self._reqids[reqid] + (oid, symbol, action, old_price) = entry assert old_price != price is_modify = True - - # register order internally - self._reqids[reqid] = (oid, symbol, action, price) + else: + # register order internally + self._reqids[reqid] = (oid, symbol, action, price) if action == 'alert': # bypass all fill simulation @@ -197,16 +207,15 @@ class PaperBoi: """ # TODO: net latency model await trio.sleep(0.05) + fill_time_ns = time.time_ns() + fill_time_s = time.time() - msg = BrokerdFill( - + fill_msg = BrokerdFill( reqid=reqid, - time_ns=time.time_ns(), - + time_ns=fill_time_ns, action=action, size=size, price=price, - broker_time=datetime.now().timestamp(), broker_details={ 'paper_info': { @@ -216,7 +225,9 @@ class PaperBoi: 'name': self.broker + '_paper', }, ) - await self.ems_trades_stream.send(msg) + await self.ems_trades_stream.send(fill_msg) + + self._trade_ledger.update(fill_msg.to_dict()) if order_complete: @@ -243,29 +254,37 @@ class PaperBoi: # lookup any existing position token = f'{symbol}.{self.broker}' - pp_msg = self._positions.setdefault( + pp = self._positions.setdefault( token, - BrokerdPosition( - broker=self.broker, - account='paper', - symbol=symbol, - # TODO: we need to look up the asset currency from - # broker info. i guess for crypto this can be - # inferred from the pair? - currency='', - size=0.0, - avg_price=0, + Position( + Symbol(key=symbol), + size=size, + ppu=price, + bsuid=symbol, ) ) - - # delegate update to `.pp.Position.lifo_update()` - pp = Position( - Symbol(key=symbol), - size=pp_msg.size, - be_price=pp_msg.avg_price, + t = Transaction( + fqsn=symbol, + tid=oid, + size=size, + price=price, + cost=1., # todo cost model + dt=pendulum.from_timestamp(fill_time_s), bsuid=symbol, ) - pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) + pp.add_clear(t) + + pp_msg = BrokerdPosition( + broker=self.broker, + account='paper', + symbol=symbol, + # TODO: we need to look up the asset currency from + # broker info. i guess for crypto this can be + # inferred from the pair? + currency='', + size=pp.size, + avg_price=pp.ppu, + ) await self.ems_trades_stream.send(pp_msg) @@ -273,6 +292,7 @@ class PaperBoi: async def simulate_fills( quote_stream: 'tractor.ReceiveStream', # noqa client: PaperBoi, + ) -> None: # TODO: more machinery to better simulate real-world market things: @@ -389,6 +409,24 @@ async def handle_order_requests( # validate order = BrokerdOrder(**request_msg) + if order.reqid is None: + reqid = str(uuid.uuid4()) + else: + reqid = order.reqid + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + ) + ) + # call our client api to submit the order reqid = await client.submit_limit( @@ -402,20 +440,7 @@ async def handle_order_requests( # there is no existing order so ask the client to create # a new one (which it seems to do by allocating an int # counter - collision prone..) - reqid=order.reqid, - ) - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - ) + reqid=reqid, ) elif action == 'cancel': @@ -468,6 +493,9 @@ async def trades_dialogue( # TODO: load paper positions from ``positions.toml`` _positions={}, + + # TODO: load postions from ledger file + _trade_ledger={}, ) n.start_soon(handle_order_requests, client, ems_stream) @@ -510,5 +538,4 @@ async def open_paperboi( loglevel=loglevel, ) as (ctx, first): - yield ctx, first diff --git a/piker/pp.py b/piker/pp.py index 5f78db55..456c2c4f 100644 --- a/piker/pp.py +++ b/piker/pp.py @@ -20,9 +20,8 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. (looking at you `ib` and dirt-bird friends) ''' -from collections import deque from contextlib import contextmanager as cm -# from pprint import pformat +from pprint import pformat import os from os import path from math import copysign @@ -130,7 +129,7 @@ class Position(Struct): # "breakeven price" above or below which pnl moves above and below # zero for the entirety of the current "trade state". - be_price: float + ppu: float # unique backend symbol id bsuid: str @@ -149,7 +148,7 @@ class Position(Struct): for f in self.__struct_fields__ } - def to_pretoml(self) -> dict: + def to_pretoml(self) -> tuple[str, dict]: ''' Prep this position's data contents for export to toml including re-structuring of the ``.clears`` table to an array of @@ -160,23 +159,79 @@ class Position(Struct): clears = d.pop('clears') expiry = d.pop('expiry') - if expiry: + # TODO: we need to figure out how to have one top level + # listing venue here even when the backend isn't providing + # it via the trades ledger.. + # drop symbol obj in serialized form + s = d.pop('symbol') + fqsn = s.front_fqsn() + + size = d.pop('size') + ppu = d.pop('ppu') + d['size'], d['ppu'] = self.audit_sizing(size, ppu) + + if self.expiry is None: + d.pop('expiry', None) + elif expiry: d['expiry'] = str(expiry) - clears_list = [] + toml_clears_list = [] + for tid, data in sorted( + list(clears.items()), - for tid, data in clears.items(): + # sort by datetime + key=lambda item: item[1]['dt'], + ): inline_table = toml.TomlDecoder().get_empty_inline_table() + + inline_table['dt'] = data['dt'] + + # insert optional clear fields in column order + for k in ['ppu', 'accum_size']: + val = data.get(k) + if val: + inline_table[k] = val + + # insert required fields + for k in ['price', 'size', 'cost']: + inline_table[k] = data[k] + inline_table['tid'] = tid + toml_clears_list.append(inline_table) - for k, v in data.items(): - inline_table[k] = v + d['clears'] = toml_clears_list - clears_list.append(inline_table) + return fqsn, d - d['clears'] = clears_list + def audit_sizing( + self, + size: Optional[float] = None, + ppu: Optional[float] = None, - return d + ) -> tuple[float, float]: + ''' + Audit either the `.size` and `.ppu` values or equvialent + passed in values against the clears table calculations and + return the calc-ed values if they differ and log warnings to + console. + + ''' + size = size or self.size + ppu = ppu or self.ppu + csize = self.calc_size() + cppu = self.calc_ppu() + + if size != csize: + log.warning(f'size != calculated size: {size} != {csize}') + size = csize + + if ppu != cppu: + log.warning( + f'ppu != calculated ppu: {ppu} != {cppu}' + ) + ppu = cppu + + return size, ppu def update_from_msg( self, @@ -188,7 +243,7 @@ class Position(Struct): symbol = self.symbol lot_size_digits = symbol.lot_size_digits - be_price, size = ( + ppu, size = ( round( msg['avg_price'], ndigits=symbol.tick_size_digits @@ -199,7 +254,7 @@ class Position(Struct): ), ) - self.be_price = be_price + self.ppu = ppu self.size = size @property @@ -209,123 +264,127 @@ class Position(Struct): terms. ''' - return self.be_price * self.size + return self.ppu * self.size - def update( + # TODO: idea: "real LIFO" dynamic positioning. + # - when a trade takes place where the pnl for + # the (set of) trade(s) is below the breakeven price + # it may be that the trader took a +ve pnl on a short(er) + # term trade in the same account. + # - in this case we could recalc the be price to + # be reverted back to it's prior value before the nearest term + # trade was opened.? + # def lifo_price() -> float: + # ... + + def calc_ppu( self, - t: Transaction, + # include transaction cost in breakeven price + # and presume the worst case of the same cost + # to exit this transaction (even though in reality + # it will be dynamic based on exit stratetgy). + cost_scalar: float = 2, - ) -> None: - self.clears[t.tid] = { - 'cost': t.cost, - 'price': t.price, - 'size': t.size, - 'dt': str(t.dt), - } - - def lifo_update( - self, - size: float, - price: float, - cost: float = 0, - - # TODO: idea: "real LIFO" dynamic positioning. - # - when a trade takes place where the pnl for - # the (set of) trade(s) is below the breakeven price - # it may be that the trader took a +ve pnl on a short(er) - # term trade in the same account. - # - in this case we could recalc the be price to - # be reverted back to it's prior value before the nearest term - # trade was opened.? - # dynamic_breakeven_price: bool = False, - - ) -> (float, float): + ) -> float: ''' - Incremental update using a LIFO-style weighted mean. + Compute the "price-per-unit" price for the given non-zero sized + rolling position. + + The recurrence relation which computes this (exponential) mean + per new clear which **increases** the accumulative postiion size + is: + + ppu[-1] = ( + ppu[-2] * accum_size[-2] + + + ppu[-1] * size + ) / accum_size[-1] + + where `cost_basis` for the current step is simply the price + * size of the most recent clearing transaction. ''' - # "avg position price" calcs - # TODO: eventually it'd be nice to have a small set of routines - # to do this stuff from a sequence of cleared orders to enable - # so called "contextual positions". - new_size = self.size + size + asize_h: list[float] = [] # historical accumulative size + ppu_h: list[float] = [] # historical price-per-unit - # old size minus the new size gives us size diff with - # +ve -> increase in pp size - # -ve -> decrease in pp size - size_diff = abs(new_size) - abs(self.size) + clears = list(self.clears.items()) - if new_size == 0: - self.be_price = 0 + for i, (tid, entry) in enumerate(clears): - elif size_diff > 0: - # XXX: LOFI incremental update: - # only update the "average price" when - # the size increases not when it decreases (i.e. the - # position is being made smaller) - self.be_price = ( - # weight of current exec = (size * price) + cost - (abs(size) * price) - + - (copysign(1, new_size) * cost) # transaction cost - + - # weight of existing be price - self.be_price * abs(self.size) # weight of previous pp - ) / abs(new_size) # normalized by the new size: weighted mean. - - self.size = new_size - - return new_size, self.be_price - - def calc_be_price(self) -> float: - - size: float = 0 - cb_tot_size: float = 0 - cost_basis: float = 0 - be_price: float = 0 - - for tid, entry in self.clears.items(): clear_size = entry['size'] clear_price = entry['price'] - new_size = size + clear_size - # old size minus the new size gives us size diff with - # +ve -> increase in pp size - # -ve -> decrease in pp size - size_diff = abs(new_size) - abs(size) + last_accum_size = asize_h[-1] if asize_h else 0 + accum_size = last_accum_size + clear_size + accum_sign = copysign(1, accum_size) - if new_size == 0: - cost_basis = 0 - cb_tot_size = 0 - be_price = 0 + sign_change: bool = False - elif size_diff > 0: - # only an increaze in size of the position contributes - # the breakeven price, a decrease does not. + if accum_size == 0: + ppu_h.append(0) + asize_h.append(0) + continue - cost_basis += ( - # weighted price per unit of + # test if the pp somehow went "passed" a net zero size state + # resulting in a change of the "sign" of the size (+ve for + # long, -ve for short). + sign_change = ( + copysign(1, last_accum_size) + accum_sign == 0 + and last_accum_size != 0 + ) + + # since we passed the net-zero-size state the new size + # after sum should be the remaining size the new + # "direction" (aka, long vs. short) for this clear. + if sign_change: + clear_size = accum_size + abs_diff = abs(accum_size) + asize_h.append(0) + ppu_h.append(0) + + else: + # old size minus the new size gives us size diff with + # +ve -> increase in pp size + # -ve -> decrease in pp size + abs_diff = abs(accum_size) - abs(last_accum_size) + + # XXX: LIFO breakeven price update. only an increaze in size + # of the position contributes the breakeven price, + # a decrease does not (i.e. the position is being made + # smaller). + # abs_clear_size = abs(clear_size) + abs_new_size = abs(accum_size) + + if abs_diff > 0: + + cost_basis = ( + # cost basis for this clear clear_price * abs(clear_size) + # transaction cost - (copysign(1, new_size) * entry['cost'] * 2) + accum_sign * cost_scalar * entry['cost'] ) - cb_tot_size += abs(clear_size) - be_price = cost_basis / cb_tot_size - size = new_size + if asize_h: + size_last = abs(asize_h[-1]) + cb_last = ppu_h[-1] * size_last + ppu = (cost_basis + cb_last) / abs_new_size - # print( - # f'cb: {cost_basis}\n' - # f'size: {size}\n' - # f'clear_size: {clear_size}\n' - # f'clear_price: {clear_price}\n\n' + else: + ppu = cost_basis / abs_new_size - # f'cb_tot_size: {cb_tot_size}\n' - # f'be_price: {be_price}\n\n' - # ) + ppu_h.append(ppu) + asize_h.append(accum_size) - return be_price + else: + # on "exit" clears from a given direction, + # only the size changes not the price-per-unit + # need to be updated since the ppu remains constant + # and gets weighted by the new size. + asize_h.append(accum_size) + ppu_h.append(ppu_h[-1]) + + return ppu_h[-1] if ppu_h else 0 def calc_size(self) -> float: size: float = 0 @@ -343,24 +402,57 @@ class Position(Struct): unecessary history irrelevant to the current pp state. ''' - size: float = self.size - clears_since_zero: deque[tuple(str, dict)] = deque() + size: float = 0 + clears_since_zero: list[tuple(str, dict)] = [] - # scan for the last "net zero" position by - # iterating clears in reverse. - for tid, clear in reversed(self.clears.items()): - size -= clear['size'] - clears_since_zero.appendleft((tid, clear)) + # TODO: we might just want to always do this when iterating + # a ledger? keep a state of the last net-zero and only do the + # full iterate when no state was stashed? + + # scan for the last "net zero" position by iterating + # transactions until the next net-zero size, rinse, repeat. + for tid, clear in self.clears.items(): + size += clear['size'] + clears_since_zero.append((tid, clear)) if size == 0: - break + clears_since_zero.clear() self.clears = dict(clears_since_zero) return self.clears + def add_clear( + self, + t: Transaction, + ) -> dict: + ''' + Update clearing table and populate rolling ppu and accumulative + size in both the clears entry and local attrs state. + + ''' + clear = self.clears[t.tid] = { + 'cost': t.cost, + 'price': t.price, + 'size': t.size, + 'dt': str(t.dt), + } + + # TODO: compute these incrementally instead + # of re-looping through each time resulting in O(n**2) + # behaviour.. + # compute these **after** adding the entry + # in order to make the recurrence relation math work + # inside ``.calc_size()``. + self.size = clear['accum_size'] = self.calc_size() + self.ppu = clear['ppu'] = self.calc_ppu() + + return clear + class PpTable(Struct): + brokername: str + acctid: str pps: dict[str, Position] conf: Optional[dict] = {} @@ -372,31 +464,30 @@ class PpTable(Struct): ) -> dict[str, Position]: pps = self.pps - updated: dict[str, Position] = {} # lifo update all pps from records - for tid, r in trans.items(): + for tid, t in trans.items(): pp = pps.setdefault( - r.bsuid, + t.bsuid, # if no existing pp, allocate fresh one. Position( Symbol.from_fqsn( - r.fqsn, + t.fqsn, info={}, ), size=0.0, - be_price=0.0, - bsuid=r.bsuid, - expiry=r.expiry, + ppu=0.0, + bsuid=t.bsuid, + expiry=t.expiry, ) ) # don't do updates for ledger records we already have # included in the current pps state. - if r.tid in pp.clears: + if t.tid in pp.clears: # NOTE: likely you'll see repeats of the same # ``Transaction`` passed in here if/when you are restarting # a ``brokerd.ib`` where the API will re-report trades from @@ -404,30 +495,20 @@ class PpTable(Struct): # "double count" these in pp calculations. continue - # lifo style "breakeven" price calc - pp.lifo_update( - r.size, - r.price, + # update clearing table + pp.add_clear(t) + updated[t.bsuid] = pp - # include transaction cost in breakeven price - # and presume the worst case of the same cost - # to exit this transaction (even though in reality - # it will be dynamic based on exit stratetgy). - cost=cost_scalar*r.cost, - ) - - # track clearing data - pp.update(r) - - updated[r.bsuid] = pp + # minimize clears tables and update sizing. + for bsuid, pp in updated.items(): + pp.size, pp.ppu = pp.audit_sizing() return updated def dump_active( self, - brokername: str, ) -> tuple[ - dict[str, Any], + dict[str, Position], dict[str, Position] ]: ''' @@ -437,13 +518,12 @@ class PpTable(Struct): ``Position``s which have recently closed. ''' - # ONLY dict-serialize all active positions; those that are closed - # we don't store in the ``pps.toml``. # NOTE: newly closed position are also important to report/return # since a consumer, like an order mode UI ;), might want to react - # based on the closure. - pp_entries = {} + # based on the closure (for example removing the breakeven line + # and clearing the entry from any lists/monitors). closed_pp_objs: dict[str, Position] = {} + open_pp_objs: dict[str, Position] = {} pp_objs = self.pps for bsuid in list(pp_objs): @@ -454,7 +534,7 @@ class PpTable(Struct): # if bsuid == qqqbsuid: # breakpoint() - pp.minimize_clears() + pp.size, pp.ppu = pp.audit_sizing() if ( # "net-zero" is a "closed" position @@ -470,53 +550,71 @@ class PpTable(Struct): # used to check for duplicate clears that may come in as # new transaction from some backend API and need to be # ignored; the closed positions won't be written to the - # ``pps.toml`` since ``pp_entries`` above is what's + # ``pps.toml`` since ``pp_active_entries`` above is what's # written. - # closed_pp = pp_objs.pop(bsuid, None) - closed_pp = pp_objs.get(bsuid) - if closed_pp: - closed_pp_objs[bsuid] = closed_pp + closed_pp_objs[bsuid] = pp else: - # serialize to pre-toml form - asdict = pp.to_pretoml() + open_pp_objs[bsuid] = pp - if pp.expiry is None: - asdict.pop('expiry', None) + return open_pp_objs, closed_pp_objs - # TODO: we need to figure out how to have one top level - # listing venue here even when the backend isn't providing - # it via the trades ledger.. - # drop symbol obj in serialized form - s = asdict.pop('symbol') - fqsn = s.front_fqsn() - log.info(f'Updating active pp: {fqsn}') + def to_toml( + self, + ) -> dict[str, Any]: - # XXX: ugh, it's cuz we push the section under - # the broker name.. maybe we need to rethink this? - brokerless_key = fqsn.removeprefix(f'{brokername}.') + active, closed = self.dump_active() - pp_entries[brokerless_key] = asdict + # ONLY dict-serialize all active positions; those that are closed + # we don't store in the ``pps.toml``. + to_toml_dict = {} - return pp_entries, closed_pp_objs + for bsuid, pos in active.items(): + + # keep the minimal amount of clears that make up this + # position since the last net-zero state. + pos.minimize_clears() + + # serialize to pre-toml form + fqsn, asdict = pos.to_pretoml() + log.info(f'Updating active pp: {fqsn}') + + # XXX: ugh, it's cuz we push the section under + # the broker name.. maybe we need to rethink this? + brokerless_key = fqsn.removeprefix(f'{self.brokername}.') + to_toml_dict[brokerless_key] = asdict + + return to_toml_dict + + def write_config(self) -> None: + ''' + Write the current position table to the user's ``pps.toml``. + + ''' + # TODO: show diff output? + # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries + print(f'Updating ``pps.toml`` for {path}:\n') + + # active, closed_pp_objs = table.dump_active() + pp_entries = self.to_toml() + self.conf[self.brokername][self.acctid] = pp_entries + + # TODO: why tf haven't they already done this for inline + # tables smh.. + enc = PpsEncoder(preserve=True) + # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) + enc.dump_funcs[ + toml.decoder.InlineTableDict + ] = enc.dump_inline_table + + config.write( + self.conf, + 'pps', + encoder=enc, + ) -def update_pps( - records: dict[str, Transaction], - pps: Optional[dict[str, Position]] = None - -) -> dict[str, Position]: - ''' - Compile a set of positions from a trades ledger. - - ''' - pps: dict[str, Position] = pps or {} - table = PpTable(pps) - table.update_from_trans(records) - return table.pps - - -def load_trans_from_ledger( +def load_pps_from_ledger( brokername: str, acctname: str, @@ -524,35 +622,40 @@ def load_trans_from_ledger( # post normalization filter on ledger entries to be processed filter_by: Optional[list[dict]] = None, -) -> dict[str, Position]: +) -> tuple[ + dict[str, Transaction], + dict[str, Position], +]: ''' Open a ledger file by broker name and account and read in and - process any trade records into our normalized ``Transaction`` - form and then pass these into the position processing routine - and deliver the two dict-sets of the active and closed pps. + process any trade records into our normalized ``Transaction`` form + and then update the equivalent ``Pptable`` and deliver the two + bsuid-mapped dict-sets of the transactions and pps. ''' - with open_trade_ledger( - brokername, - acctname, - ) as ledger: + with ( + open_trade_ledger(brokername, acctname) as ledger, + open_pps(brokername, acctname) as table, + ): if not ledger: # null case, no ledger file with content return {} - brokermod = get_brokermod(brokername) - src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger) + mod = get_brokermod(brokername) + src_records: dict[str, Transaction] = mod.norm_trade_records(ledger) - if filter_by: - records = {} - bsuids = set(filter_by) - for tid, r in src_records.items(): - if r.bsuid in bsuids: - records[tid] = r - else: - records = src_records + if filter_by: + records = {} + bsuids = set(filter_by) + for tid, r in src_records.items(): + if r.bsuid in bsuids: + records[tid] = r + else: + records = src_records - return records + updated = table.update_from_trans(records) + + return records, updated # TODO: instead see if we can hack tomli and tomli-w to do the same: @@ -686,67 +789,6 @@ class PpsEncoder(toml.TomlEncoder): return (retstr, retdict) -def load_pps_from_toml( - brokername: str, - acctid: str, - - # XXX: there is an edge case here where we may want to either audit - # the retrieved ``pps.toml`` output or reprocess it since there was - # an error on write on the last attempt to update the state file - # even though the ledger *was* updated. For this cases we allow the - # caller to pass in a symbol set they'd like to reload from the - # underlying ledger to be reprocessed in computing pps state. - reload_records: Optional[dict[str, str]] = None, - - # XXX: this is "global" update from ledger flag which - # does a full refresh of pps from the available ledger. - update_from_ledger: bool = False, - -) -> tuple[PpTable, dict[str, str]]: - ''' - Load and marshal to objects all pps from either an existing - ``pps.toml`` config, or from scratch from a ledger file when - none yet exists. - - ''' - with open_pps( - brokername, - acctid, - write_on_exit=False, - ) as table: - pp_objs = table.pps - - # no pps entry yet for this broker/account so parse any available - # ledgers to build a brand new pps state. - if not pp_objs or update_from_ledger: - trans = load_trans_from_ledger( - brokername, - acctid, - ) - table.update_from_trans(trans) - - # Reload symbol specific ledger entries if requested by the - # caller **AND** none exist in the current pps state table. - elif ( - pp_objs and reload_records - ): - # no pps entry yet for this broker/account so parse - # any available ledgers to build a pps state. - trans = load_trans_from_ledger( - brokername, - acctid, - filter_by=reload_records, - ) - table.update_from_trans(trans) - - if not table.pps: - log.warning( - f'No `pps.toml` values could be loaded {brokername}:{acctid}' - ) - - return table, table.conf - - @cm def open_pps( brokername: str, @@ -763,8 +805,23 @@ def open_pps( brokersection = conf.setdefault(brokername, {}) pps = brokersection.setdefault(acctid, {}) + # TODO: ideally we can pass in an existing + # pps state to this right? such that we + # don't have to do a ledger reload all the + # time.. a couple ideas I can think of, + # - mirror this in some client side actor which + # does the actual ledger updates (say the paper + # engine proc if we decide to always spawn it?), + # - do diffs against updates from the ledger writer + # actor and the in-mem state here? + pp_objs = {} - table = PpTable(pp_objs, conf=conf) + table = PpTable( + brokername, + acctid, + pp_objs, + conf=conf, + ) # unmarshal/load ``pps.toml`` config entries into object form # and update `PpTable` obj entries. @@ -789,29 +846,17 @@ def open_pps( clears[tid] = clears_table size = entry['size'] - - # TODO: an audit system for existing pps entries? - # if not len(clears) == abs(size): - # pp_objs = load_pps_from_ledger( - # brokername, - # acctid, - # filter_by=reload_records, - # ) - # reason = 'size <-> len(clears) mismatch' - # raise ValueError( - # '`pps.toml` entry is invalid:\n' - # f'{fqsn}\n' - # f'{pformat(entry)}' - # ) + # TODO: remove but, handle old field name for now + ppu = entry.get('ppu', entry.get('be_price', 0)) expiry = entry.get('expiry') if expiry: expiry = pendulum.parse(expiry) - pp_objs[bsuid] = Position( + pp = pp_objs[bsuid] = Position( Symbol.from_fqsn(fqsn, info={}), size=size, - be_price=entry['be_price'], + ppu=ppu, expiry=expiry, bsuid=entry['bsuid'], @@ -823,90 +868,14 @@ def open_pps( clears=clears, ) + # audit entries loaded from toml + pp.size, pp.ppu = pp.audit_sizing() + try: yield table finally: if write_on_exit: - # TODO: show diff output? - # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries - print(f'Updating ``pps.toml`` for {path}:\n') - - pp_entries, closed_pp_objs = table.dump_active(brokername) - conf[brokername][acctid] = pp_entries - - # TODO: why tf haven't they already done this for inline - # tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[ - toml.decoder.InlineTableDict - ] = enc.dump_inline_table - - config.write( - conf, - 'pps', - encoder=enc, - ) - - -def update_pps_conf( - brokername: str, - acctid: str, - - trade_records: Optional[dict[str, Transaction]] = None, - ledger_reload: Optional[dict[str, str]] = None, - -) -> tuple[ - dict[str, Position], - dict[str, Position], -]: - # TODO: ideally we can pass in an existing - # pps state to this right? such that we - # don't have to do a ledger reload all the - # time.. a couple ideas I can think of, - # - load pps once after backend ledger state - # is loaded and keep maintainend in memory - # inside a with block, - # - mirror this in some client side actor which - # does the actual ledger updates (say the paper - # engine proc if we decide to always spawn it?), - # - do diffs against updates from the ledger writer - # actor and the in-mem state here? - - if trade_records and ledger_reload: - for tid, r in trade_records.items(): - ledger_reload[r.bsuid] = r.fqsn - - table, conf = load_pps_from_toml( - brokername, - acctid, - reload_records=ledger_reload, - ) - - # update all pp objects from any (new) trade records which - # were passed in (aka incremental update case). - if trade_records: - table.update_from_trans(trade_records) - - # this maps `.bsuid` values to positions - pp_entries, closed_pp_objs = table.dump_active(brokername) - pp_objs: dict[Union[str, int], Position] = table.pps - - conf[brokername][acctid] = pp_entries - - # TODO: why tf haven't they already done this for inline tables smh.. - enc = PpsEncoder(preserve=True) - # table_bs_type = type(toml.TomlDecoder().get_empty_inline_table()) - enc.dump_funcs[toml.decoder.InlineTableDict] = enc.dump_inline_table - - config.write( - conf, - 'pps', - encoder=enc, - ) - - # deliver object form of all pps in table to caller - return pp_objs, closed_pp_objs + table.write_config() if __name__ == '__main__': @@ -917,4 +886,9 @@ if __name__ == '__main__': args = args[1:] for acctid in args: broker, name = acctid.split('.') - update_pps_conf(broker, name) + trans, updated_pps = load_pps_from_ledger(broker, name) + print( + f'Processing transactions into pps for {broker}:{acctid}\n' + f'{pformat(trans)}\n\n' + f'{pformat(updated_pps)}' + ) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 6a1ab01e..e02187da 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -106,8 +106,8 @@ async def update_pnl_from_feed( # compute and display pnl status order_mode.pane.pnl_label.format( pnl=copysign(1, size) * pnl( - # live.be_price, - order_mode.current_pp.live_pp.be_price, + # live.ppu, + order_mode.current_pp.live_pp.ppu, tick['price'], ), ) @@ -357,7 +357,7 @@ class SettingsPane: # last historical close price last = feed.shm.array[-1][['close']][0] pnl_value = copysign(1, size) * pnl( - tracker.live_pp.be_price, + tracker.live_pp.ppu, last, ) @@ -557,7 +557,7 @@ class PositionTracker: pp = position or self.live_pp self.update_line( - pp.be_price, + pp.ppu, pp.size, self.chart.linked.symbol.lot_size_digits, ) @@ -571,7 +571,7 @@ class PositionTracker: self.hide() else: - self._level_marker.level = pp.be_price + self._level_marker.level = pp.ppu # these updates are critical to avoid lag on view/scene changes self._level_marker.update() # trigger paint diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 83a0ed48..ce08a64c 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -610,7 +610,7 @@ async def open_order_mode( startup_pp = Position( symbol=symbol, size=0, - be_price=0, + ppu=0, # XXX: BLEH, do we care about this on the client side? bsuid=symbol,