Finally, just drop `Transaction.sym`

Turns out we don't really need it directly for most "txn processing" AND
if we do it's usually related to some `Account`-ing related calcs; which
means we can instead just rely on the new `SymbologyCache` lookup to get
it when needed. So, basically just get rid of it and rely instead on the
`.fqme` to be the god-key to getting `MktPair` info (from the cache).

Further, extend the `TransactionLedger` to contain much more info on the
pertaining backend:
- `.mod` mapping to the (pkg) py mod.
- `.filepath` pointing to the actual ledger TOML file.
- `_symcache` for doing any needed asset or mkt lookup as mentioned
  above.
- rename `.iter_trans()` -> `.iter_txns()` and allow passing in
  a symcache or using the init provided one.
  - rename `.to_trans()` similarly.
- delegate paper account txn processing to the `.clearing._paper_engine`
  mod's `norm_trade()` (and expect this similarly from other backends!)
- use new `SymbologyCache.search()` to find the best but
  un-fully-qualified fqme for a given `txdict` being processed when
  writing a config (aka always try to expand to the most verbose `.fqme`
  possible).
- add a `rewrite: bool` control to `open_trade_ledger()`.
account_tests
Tyler Goodlet 2023-07-07 19:40:51 -04:00
parent 520414a096
commit 0e94e89373
1 changed files with 95 additions and 83 deletions

View File

@ -22,16 +22,19 @@ from __future__ import annotations
from collections import UserDict
from contextlib import contextmanager as cm
from pathlib import Path
from pprint import pformat
from types import ModuleType
from typing import (
Any,
Callable,
Generator
Generator,
TYPE_CHECKING,
)
from pendulum import (
datetime,
DateTime,
parse,
# DateTime,
# parse,
)
import tomli_w # for fast ledger writing
@ -41,27 +44,37 @@ from ..log import get_logger
from .calc import (
iter_by_dt,
)
from ._mktinfo import (
Symbol, # legacy
MktPair,
Asset,
)
if TYPE_CHECKING:
from ..data._symcache import (
SymbologyCache,
)
log = get_logger(__name__)
class Transaction(Struct, frozen=True):
# TODO: unify this with the `MktPair`,
# once we have that as a required field,
# we don't really need the fqme any more..
# NOTE: this is a unified acronym also used in our `MktPair`
# and can stand for any of a
# "fully qualified <blank> endpoint":
# - "market" in the case of financial trades
# (btcusdt.spot.binance).
# - "merkel (tree)" aka a blockchain system "wallet tranfers"
# (btc.blockchain)
# - "money" for tradtitional (digital databases)
# *bank accounts* (usd.swift, eur.sepa)
fqme: str
tid: str | int # unique transaction id
size: float
price: float
cost: float # commisions or other additional costs
dt: datetime
# the "event type" in terms of "market events" see
# https://github.com/pikers/piker/issues/510 for where we're
# probably going with this.
etype: str = 'clear'
# TODO: we can drop this right since we
@ -69,19 +82,6 @@ class Transaction(Struct, frozen=True):
# via the `MktPair`?
expiry: datetime | None = None
# TODO: drop the Symbol type, construct using
# t.sys (the transaction system)
# the underlying "transaction system", normally one of a ``MktPair``
# (a description of a tradable double auction) or a ledger-recorded
# ("ledger" in any sense as long as you can record transfers) of any
# sort) ``Asset``.
sym: MktPair | Asset | Symbol | None = None
@property
def sys(self) -> Symbol:
return self.sym
# (optional) key-id defined by the broker-service backend which
# ensures the instrument-symbol market key for this record is unique
# in the "their backend/system" sense; i.e. this uid for the market
@ -90,14 +90,12 @@ class Transaction(Struct, frozen=True):
bs_mktid: str | int | None = None
def to_dict(self) -> dict:
dct = super().to_dict()
# TODO: switch to sys!
dct.pop('sym')
dct: dict[str, Any] = super().to_dict()
# ensure we use a pendulum formatted
# ISO style str here!@
dct['dt'] = str(self.dt)
return dct
@ -113,13 +111,35 @@ class TransactionLedger(UserDict):
self,
ledger_dict: dict,
file_path: Path,
account: str,
mod: ModuleType, # broker mod
tx_sort: Callable,
symcache: SymbologyCache,
) -> None:
self.file_path = file_path
self.tx_sort = tx_sort
self.account: str = account
self.file_path: Path = file_path
self.mod: ModuleType = mod
self.tx_sort: Callable = tx_sort
self._symcache: SymbologyCache = symcache
# any added txns we keep in that form for meta-data
# gathering purposes
self._txns: dict[str, Transaction] = {}
super().__init__(ledger_dict)
def __repr__(self) -> str:
return (
f'TransactionLedger: {len(self)}\n'
f'{pformat(list(self.data))}'
)
@property
def symcache(self) -> SymbologyCache:
return self._symcache
def update_from_t(
self,
t: Transaction,
@ -130,11 +150,11 @@ class TransactionLedger(UserDict):
'''
self.data[t.tid] = t.to_dict()
self._txns[t.tid] = t
def iter_trans(
def iter_txns(
self,
mkt_by_fqme: dict[str, MktPair],
broker: str = 'paper',
symcache: SymbologyCache | None = None,
) -> Generator[
tuple[str, Transaction],
@ -146,73 +166,42 @@ class TransactionLedger(UserDict):
form via generator.
'''
if broker != 'paper':
raise NotImplementedError('Per broker support not dun yet!')
symcache = symcache or self._symcache
# TODO: lookup some standard normalizer
# func in the backend?
# from ..brokers import get_brokermod
# mod = get_brokermod(broker)
# trans_dict = mod.norm_trade_records(self.data)
# NOTE: instead i propose the normalizer is
# a one shot routine (that can be lru cached)
# and instead call it for each entry incrementally:
# normer = mod.norm_trade_record(txdict)
if self.account == 'paper':
from piker.clearing import _paper_engine
norm_trade = _paper_engine.norm_trade
else:
norm_trade = self.mod.norm_trade
# datetime-sort and pack into txs
for txdict in self.tx_sort(self.data.values()):
txn = norm_trade(txdict)
yield txn.tid, txn
# special field handling for datetimes
# to ensure pendulum is used!
tid: str = txdict['tid']
fqme: str = txdict.get('fqme') or txdict['fqsn']
dt: DateTime = parse(txdict['dt'])
expiry: str | None = txdict.get('expiry')
if not (mkt := mkt_by_fqme.get(fqme)):
# we can't build a trans if we don't have
# the ``.sys: MktPair`` info, so skip.
continue
tx = Transaction(
fqme=fqme,
tid=txdict['tid'],
dt=dt,
price=txdict['price'],
size=txdict['size'],
cost=txdict.get('cost', 0),
bs_mktid=txdict['bs_mktid'],
# TODO: change to .sys!
sym=mkt,
expiry=parse(expiry) if expiry else None,
etype='clear',
)
yield tid, tx
def to_trans(
def to_txns(
self,
**kwargs,
symcache: SymbologyCache | None = None,
) -> dict[str, Transaction]:
'''
Return entire output from ``.iter_trans()`` in a ``dict``.
Return entire output from ``.iter_txns()`` in a ``dict``.
'''
return dict(self.iter_trans(**kwargs))
return dict(self.iter_txns(symcache=symcache))
def write_config(
self,
) -> None:
'''
Render the self.data ledger dict to it's TOML file form.
Render the self.data ledger dict to its TOML file form.
ALWAYS order datetime sorted!
'''
towrite: dict[str, Any] = {}
for tid, txdict in self.tx_sort(self.data.copy()):
# write blank-str expiry for non-expiring assets
if (
'expiry' in txdict
@ -221,9 +210,18 @@ class TransactionLedger(UserDict):
txdict['expiry'] = ''
# re-write old acro-key
if fqme := txdict.get('fqsn'):
txdict['fqme'] = fqme
fqme: str = txdict.pop('fqsn', None) or txdict['fqme']
if fqme not in self._symcache.mktmaps:
best_fqme: str = list(self._symcache.search(fqme))[0]
log.warning(
f'Could not find FQME: {fqme} in qualified set?\n'
f'Qualifying and expanding {fqme} -> {best_fqme}'
)
fqme = best_fqme
txdict['fqme'] = fqme
towrite[tid] = txdict
with self.file_path.open(mode='wb') as fp:
@ -276,6 +274,7 @@ def open_trade_ledger(
# default is to sort by detected datetime-ish field
tx_sort: Callable = iter_by_dt,
rewrite: bool = False,
) -> Generator[TransactionLedger, None, None]:
'''
@ -287,18 +286,31 @@ def open_trade_ledger(
name as defined in the user's ``brokers.toml`` config.
'''
from ..brokers import get_brokermod
mod: ModuleType = get_brokermod(broker)
ledger_dict, fpath = load_ledger(broker, account)
cpy = ledger_dict.copy()
from ..data._symcache import (
get_symcache,
)
symcache: SymbologyCache = get_symcache(broker)
ledger = TransactionLedger(
ledger_dict=cpy,
file_path=fpath,
account=account,
mod=mod,
symcache=symcache,
tx_sort=tx_sort,
)
try:
yield ledger
finally:
if ledger.data != ledger_dict:
if (
ledger.data != ledger_dict
or rewrite
):
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
log.info(f'Updating ledger for {fpath}:\n')