diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 65eb67a8..fbb6997f 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -22,18 +22,17 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. ''' from __future__ import annotations -# from bisect import insort from contextlib import contextmanager as cm from decimal import Decimal from pprint import pformat from pathlib import Path +from types import ModuleType from typing import ( Any, Iterator, Generator ) -import polars as pl import pendulum from pendulum import ( datetime, @@ -43,7 +42,6 @@ import tomlkit from ._ledger import ( Transaction, - open_trade_ledger, TransactionLedger, ) from ._mktinfo import ( @@ -60,6 +58,7 @@ from ..clearing._messages import ( BrokerdPosition, ) from ..data.types import Struct +from ..data._symcache import SymbologyCache from ..log import get_logger log = get_logger(__name__) @@ -105,19 +104,12 @@ class Position(Struct): split_ratio: int | None = None - # ordered record of known constituent trade messages - _clears: list[ - dict[str, Any], # transaction history summaries - ] = [] - - # _events: pl.DataFrame | None = None + # TODO: use a `pl.DataFrame` intead? _events: dict[str, Transaction | dict] = {} - # first_clear_dt: datetime | None = None - @property def expiry(self) -> datetime | None: - exp: str = self.mkt.expiry + exp: str = self.mkt.expiry.lower() match exp: # empty str, 'perp' (contract) or simply a null # signifies instrument with NO expiry. @@ -188,7 +180,7 @@ class Position(Struct): ''' # scan for the last "net zero" position by iterating - # transactions until the next net-zero accum_size, rinse, + # transactions until the next net-zero cumsize, rinse, # repeat. cumsize: float = 0 clears_since_zero: list[dict] = [] @@ -223,6 +215,7 @@ class Position(Struct): ''' mkt: MktPair = self.mkt assert isinstance(mkt, MktPair) + # TODO: we need to figure out how to have one top level # listing venue here even when the backend isn't providing # it via the trades ledger.. @@ -239,16 +232,19 @@ class Position(Struct): asdict: dict[str, Any] = { 'bs_mktid': self.bs_mktid, - 'expiry': self.expiry or '', + # 'expiry': self.expiry or '', 'asset_type': asset_type, 'price_tick': mkt.price_tick, 'size_tick': mkt.size_tick, } - if exp := self.expiry: asdict['expiry'] = exp clears_since_zero: list[dict] = self.minimized_clears() + + # setup a "multi-line array of inline tables" which we call + # the "clears table", contained by each position entry in + # an "account file". clears_table: tomlkit.Array = tomlkit.array() clears_table.multiline( multiline=True, @@ -267,69 +263,21 @@ class Position(Struct): for k in ['price', 'size', 'cost']: inline_table[k] = entry[k] - # serialize datetime to parsable `str` - inline_table['dt'] = entry['dt']#.isoformat('T') - # assert 'Datetime' not in inline_table['dt'] + # NOTE: we don't actually need to serialize datetime to parsable `str` + # since `tomlkit` supports a native `DateTime` but + # seems like we're not doing it entirely in clearing + # tables yet? + inline_table['dt'] = entry['dt'] # .isoformat('T') tid: str = entry['tid'] inline_table['tid'] = tid clears_table.append(inline_table) - # if val < 0: - # breakpoint() # assert not events asdict['clears'] = clears_table return fqme, asdict - # def ensure_state(self) -> None: - # ''' - # Audit either the `.cumsize` and `.ppu` local instance vars against - # the clears table calculations and return the calc-ed values if - # they differ and log warnings to console. - - # ''' - # # clears: list[dict] = self._clears - - # # self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt'] - # last_clear: dict = clears[-1] - # csize: float = self.calc_size() - # accum: float = last_clear['accum_size'] - - # if not self.expired(): - # if ( - # csize != accum - # and csize != round(accum * (self.split_ratio or 1)) - # ): - # raise ValueError(f'Size mismatch: {csize}') - # else: - # assert csize == 0, 'Contract is expired but non-zero size?' - - # if self.cumsize != csize: - # log.warning( - # 'Position state mismatch:\n' - # f'{self.cumsize} => {csize}' - # ) - # self.cumsize = csize - - # cppu: float = self.calc_ppu() - # ppu: float = last_clear['ppu'] - # if ( - # cppu != ppu - # and self.split_ratio is not None - - # # handle any split info entered (for now) manually by user - # and cppu != (ppu / self.split_ratio) - # ): - # raise ValueError(f'PPU mismatch: {cppu}') - - # if self.ppu != cppu: - # log.warning( - # 'Position state mismatch:\n' - # f'{self.ppu} => {cppu}' - # ) - # self.ppu = cppu - def update_from_msg( self, msg: BrokerdPosition, @@ -337,12 +285,13 @@ class Position(Struct): ) -> None: mkt: MktPair = self.mkt - # we summarize the pos with a single summary transaction - # (for now) until we either pass THIS type as msg directly - # from emsd or come up with a better way? + + # NOTE WARNING XXX: we summarize the pos with a single + # summary transaction (for now) until we either pass THIS + # type as msg directly from emsd or come up with a better + # way? t = Transaction( - fqme=mkt.bs_mktid, - sym=mkt, + fqme=mkt.fqme, bs_mktid=mkt.bs_mktid, tid='unknown', size=msg['size'], @@ -357,15 +306,16 @@ class Position(Struct): @property def dsize(self) -> float: ''' - The "dollar" size of the pp, normally in trading (fiat) unit - terms. + The "dollar" size of the pp, normally in source asset + (fiat) units. ''' return self.ppu * self.size def expired(self) -> bool: ''' - Predicate which checks if the contract/instrument is past its expiry. + Predicate which checks if the contract/instrument is past + its expiry. ''' return bool(self.expiry) and self.expiry < now() @@ -388,36 +338,23 @@ class Position(Struct): log.warning(f'{t} is already added?!') return added - # clear: dict[str, float | str | int] = { - # 'tid': t.tid, - # 'cost': t.cost, - # 'price': t.price, - # 'size': t.size, - # 'dt': t.dt - # } - self._events[tid] = t - return True + # TODO: apparently this IS possible with a dict but not + # common and probably not that beneficial unless we're also + # going to do cum-calcs on each insert? + # https://stackoverflow.com/questions/38079171/python-insert-new-element-into-sorted-list-of-dictionaries + # from bisect import insort # insort( # self._clears, # clear, # key=lambda entry: entry['dt'] # ) + self._events[tid] = t + return True - # TODO: compute these incrementally instead - # of re-looping through each time resulting in O(n**2) - # behaviour..? - - # NOTE: we compute these **after** adding the entry in order to - # make the recurrence relation math work inside - # ``.calc_size()``. - # self.size = clear['accum_size'] = self.calc_size() - # self.ppu = clear['ppu'] = self.calc_ppu() - # self.size: float = self.calc_size() - # self.ppu: float = self.calc_ppu() - - # assert len(self._events) == len(self._clears) - # return clear - + # TODO: compute these incrementally instead + # of re-looping through each time resulting in O(n**2) + # behaviour..? Can we have some kinda clears len to cached + # output subsys? def calc_ppu(self) -> float: return ppu(self.iter_by_type('clear')) @@ -487,20 +424,50 @@ class Position(Struct): class Account(Struct): + ''' + The real-time (double-entry accounting) state of + a given **asset ownership tracking system**, normally offered + or measured from some brokerage, CEX or (implied virtual) + summary crypto$ "wallets" aggregated and tracked over some set + of DEX-es. - brokername: str + Both market-mapped and ledger-system-native (aka inter-account + "transfers") transactions are accounted and they pertain to + (implied) PnL relatve to any other accountable asset. + + More specifically in piker terms, an account tracks all of: + + - the *balances* of all assets currently available for use either + in (future) market or (inter-account/wallet) transfer + transactions. + - a transaction *ledger* from a given brokerd backend whic + is a recording of all (know) such transactions from the past. + - a set of financial *positions* as measured from the current + ledger state. + + See the semantic origins from double-bookeeping: + https://en.wikipedia.org/wiki/Double-entry_bookkeeping + + ''' + mod: ModuleType acctid: str pps: dict[str, Position] + conf_path: Path conf: dict | None = {} # TODO: track a table of asset balances as `.balances: # dict[Asset, float]`? - def update_from_trans( + @property + def brokername(self) -> str: + return self.mod.name + + def update_from_ledger( self, - trans: dict[str, Transaction], + ledger: TransactionLedger, cost_scalar: float = 2, + symcache: SymbologyCache | None = None, ) -> dict[str, Position]: ''' @@ -509,24 +476,36 @@ class Account(Struct): accumulative size for each entry. ''' + if ( + not isinstance(ledger, TransactionLedger) + and symcache is None + ): + raise RuntimeError( + 'No ledger provided!\n' + 'We can not determine the `MktPair`s without a symcache..\n' + 'Please provide `symcache: SymbologyCache` when ' + 'processing NEW positions!' + ) + pps = self.pps updated: dict[str, Position] = {} # lifo update all pps from records, ensuring # we compute the PPU and size sorted in time! - for t in sorted( - trans.values(), - key=lambda t: t.dt, - # reverse=True, - ): - fqme: str = t.fqme - bs_mktid: str = t.bs_mktid + for tid, txn in ledger.iter_txns(): + # for t in sorted( + # trans.values(), + # key=lambda t: t.dt, + # ): + fqme: str = txn.fqme + bs_mktid: str = txn.bs_mktid # template the mkt-info presuming a legacy market ticks # if no info exists in the transactions.. - mkt: MktPair = t.sys + mkt: MktPair = ledger._symcache.mktmaps[fqme] if not (pos := pps.get(bs_mktid)): + # if no existing pos, allocate fresh one. pos = pps[bs_mktid] = Position( mkt=mkt, @@ -541,33 +520,16 @@ class Account(Struct): if len(pos.mkt.fqme) < len(fqme): pos.mkt = mkt - # clears: list[dict] = pos._clears - # if clears: - # # first_clear_dt = pos.first_clear_dt - - # # don't do updates for ledger records we already have - # # included in the current pps state. - # if ( - # t.tid in clears - # # or ( - # # first_clear_dt - # # and t.dt < first_clear_dt - # # ) - # ): - # # NOTE: likely you'll see repeats of the same - # # ``Transaction`` passed in here if/when you are restarting - # # a ``brokerd.ib`` where the API will re-report trades from - # # the current session, so we need to make sure we don't - # # "double count" these in pp calculations. - # continue - - # update clearing table - pos.add_clear(t) - updated[t.bs_mktid] = pos - - # re-calc ppu and accumulative sizing. - # for bs_mktid, pos in updated.items(): - # pos.ensure_state() + # update clearing table! + # NOTE: likely you'll see repeats of the same + # ``Transaction`` passed in here if/when you are restarting + # a ``brokerd.ib`` where the API will re-report trades from + # the current session, so we need to make sure we don't + # "double count" these in pp calculations; + # `Position.add_clear()` stores txs in a `dict[tid, + # tx]` which should always ensure this is true B) + pos.add_clear(txn) + updated[txn.bs_mktid] = pos # NOTE: deliver only the position entries that were # actually updated (modified the state) from the input @@ -614,7 +576,7 @@ class Account(Struct): return open_pp_objs, closed_pp_objs - def to_toml( + def prep_toml( self, active: dict[str, Position] | None = None, @@ -629,12 +591,12 @@ class Account(Struct): pos: Position for bs_mktid, pos in active.items(): - # NOTE: we only store the minimal amount of clears that make up this - # position since the last net-zero state. - # pos.minimize_clears() # pos.ensure_state() # serialize to pre-toml form + # NOTE: we only store the minimal amount of clears that + # make up this position since the last net-zero state, + # see `Position.to_pretoml()` for details fqme, asdict = pos.to_pretoml() # clears: list[dict] = asdict['clears'] @@ -650,7 +612,8 @@ class Account(Struct): def write_config(self) -> None: ''' - Write the current position table to the user's ``pps.toml``. + Write the current account state to the user's account TOML file, normally + something like ``pps.toml``. ''' # TODO: show diff output? @@ -658,7 +621,7 @@ class Account(Struct): # active, closed_pp_objs = table.dump_active() active, closed = self.dump_active() - pp_entries = self.to_toml(active=active) + pp_entries = self.prep_toml(active=active) if pp_entries: log.info( f'Updating positions in ``{self.conf_path}``:\n' @@ -705,24 +668,12 @@ class Account(Struct): # super weird --1 thing going on for cumsize!?1! # NOTE: the fix was to always float() the size value loaded # in open_pps() below! - - # confclears = self.conf["tsla.nasdaq.ib"]['clears'] - # firstcum = confclears[0]['cumsize'] - # if firstcum: - # breakpoint() - config.write( config=self.conf, path=self.conf_path, fail_empty=False, ) - # breakpoint() - - -# TODO: move over all broker backend usage to new name.. -PpTable = Account - def load_account( brokername: str, @@ -784,12 +735,12 @@ def load_account( @cm -def open_pps( +def open_account( brokername: str, acctid: str, write_on_exit: bool = False, -) -> Generator[PpTable, None, None]: +) -> Generator[Account, None, None]: ''' Read out broker-specific position entries from incremental update file: ``pps.toml``. @@ -820,10 +771,12 @@ def open_pps( # engine proc if we decide to always spawn it?), # - do diffs against updates from the ledger writer # actor and the in-mem state here? + from ..brokers import get_brokermod + mod: ModuleType = get_brokermod(brokername) - pp_objs = {} - table = PpTable( - brokername, + pp_objs: dict[str, Position] = {} + table = Account( + mod, acctid, pp_objs, conf_path, @@ -831,12 +784,10 @@ def open_pps( ) # unmarshal/load ``pps.toml`` config entries into object form - # and update `PpTable` obj entries. + # and update `Account` obj entries. for fqme, entry in conf.items(): - # atype = entry.get('asset_type', '') - - # unique broker market id + # unique broker-backend-system market id bs_mktid = str( entry.get('bsuid') or entry.get('bs_mktid') @@ -860,7 +811,7 @@ def open_pps( fqme, price_tick=price_tick, size_tick=size_tick, - bs_mktid=bs_mktid + bs_mktid=bs_mktid, ) # TODO: RE: general "events" instead of just "clears": @@ -875,6 +826,7 @@ def open_pps( # for toml re-presentation) back into a master table. toml_clears_list: list[dict[str, Any]] = entry['clears'] trans: list[Transaction] = [] + for clears_table in toml_clears_list: tid = clears_table['tid'] dt: tomlkit.items.DateTime | str = clears_table['dt'] @@ -887,23 +839,18 @@ def open_pps( clears_table['dt'] = dt trans.append(Transaction( fqme=bs_mktid, - sym=mkt, + # sym=mkt, bs_mktid=bs_mktid, tid=tid, + # XXX: not sure why sometimes these are loaded as + # `tomlkit.Integer` and are eventually written with + # an extra `-` in front like `--1`? size=float(clears_table['size']), price=float(clears_table['price']), cost=clears_table['cost'], dt=dt, )) - # size = entry['size'] - - # # TODO: remove but, handle old field name for now - # ppu = entry.get( - # 'ppu', - # entry.get('be_price', 0), - # ) - split_ratio = entry.get('split_ratio') # if a string-ified expiry field is loaded we try to parse @@ -929,9 +876,6 @@ def open_pps( for t in trans: pp.add_clear(t) - # audit entries loaded from toml - # pp.ensure_state() - try: yield table finally: @@ -939,7 +883,21 @@ def open_pps( table.write_config() -def load_pps_from_ledger( +# TODO: drop the old name and THIS! +@cm +def open_pps( + *args, + **kwargs, +) -> Generator[Account, None, None]: + log.warning( + '`open_pps()` is now deprecated!\n' + 'Please use `with open_account() as cnt:`' + ) + with open_account(*args, **kwargs) as acnt: + yield acnt + + +def load_account_from_ledger( brokername: str, acctname: str, @@ -947,10 +905,9 @@ def load_pps_from_ledger( # post normalization filter on ledger entries to be processed filter_by_ids: dict[str, list[str]] | None = None, -) -> tuple[ - pl.DataFrame, - PpTable, -]: + ledger: TransactionLedger | None = None, + +) -> Account: ''' Open a ledger file by broker name and account and read in and process any trade records into our normalized ``Transaction`` form @@ -958,67 +915,12 @@ def load_pps_from_ledger( bs_mktid-mapped dict-sets of the transactions and pps. ''' - ledger: TransactionLedger - table: PpTable - with ( - open_trade_ledger(brokername, acctname) as ledger, - open_pps(brokername, acctname) as table, - ): - if not ledger: - # null case, no ledger file with content - return {} + acnt: Account + with open_pps( + brokername, + acctname, + ) as acnt: + if ledger is not None: + acnt.update_from_ledger(ledger) - from ..brokers import get_brokermod - mod = get_brokermod(brokername) - src_records: dict[str, Transaction] = mod.norm_trade_records( - ledger - ) - table.update_from_trans(src_records) - - fdf = df = pl.DataFrame( - list(rec.to_dict() for rec in src_records.values()), - # schema=[ - # ('tid', str), - # ('fqme', str), - # ('dt', str), - # ('size', pl.Float64), - # ('price', pl.Float64), - # ('cost', pl.Float64), - # ('expiry', str), - # ('bs_mktid', str), - # ], - ).sort('dt').select([ - pl.col('fqme'), - pl.col('dt').str.to_datetime(), - # pl.col('expiry').dt.datetime(), - pl.col('bs_mktid'), - pl.col('size'), - pl.col('price'), - ]) - # ppt = df.groupby('fqme').agg([ - # # TODO: ppu and bep !! - # pl.cumsum('size').alias('cumsum'), - # ]) - acts = df.partition_by('fqme', as_dict=True) - # ppt: dict[str, pl.DataFrame] = {} - # for fqme, ppt in act.items(): - # ppt.with_columuns - # # TODO: ppu and bep !! - # pl.cumsum('size').alias('cumsum'), - # ]) - - # filter out to the columns matching values filter passed - # as input. - if filter_by_ids: - for col, vals in filter_by_ids.items(): - str_vals = set(map(str, vals)) - pred: pl.Expr = pl.col(col).eq(str_vals.pop()) - for val in str_vals: - pred |= pl.col(col).eq(val) - - fdf = df.filter(pred) - - bs_mktid: str = fdf[0]['bs_mktid'] - # pos: Position = table.pps[bs_mktid] - - return fdf, acts, table + return acnt