Merge pull request #361 from pikers/pptables

`PpTable`s
historical_breakeven_pp_price
goodboy 2022-07-21 17:54:43 -04:00 committed by GitHub
commit a3d46f713e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 265 additions and 191 deletions

View File

@ -34,7 +34,6 @@ from typing import (
Union,
)
from msgspec import Struct
import pendulum
from pendulum import datetime, now
import tomli
@ -45,6 +44,7 @@ from .brokers import get_brokermod
from .clearing._messages import BrokerdPosition, Status
from .data._source import Symbol
from .log import get_logger
from .data.types import Struct
log = get_logger(__name__)
@ -82,21 +82,21 @@ def open_trade_ledger(
ledger = tomli.load(cf)
print(f'Ledger load took {time.time() - start}s')
cpy = ledger.copy()
try:
yield cpy
finally:
if cpy != ledger:
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ledger for {tradesfile}:\n')
ledger.update(cpy)
# we write on close the mutated ledger data
with open(tradesfile, 'w') as cf:
return toml.dump(ledger, cf)
yield cpy
if cpy != ledger:
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ledger for {tradesfile}:\n')
ledger.update(cpy)
# we write on close the mutated ledger data
with open(tradesfile, 'w') as cf:
toml.dump(ledger, cf)
class Transaction(Struct):
class Transaction(Struct, frozen=True):
# TODO: should this be ``.to`` (see below)?
fqsn: str
@ -302,6 +302,146 @@ class Position(Struct):
return self.clears
class PpTable(Struct):
pps: dict[str, Position]
conf: Optional[dict] = {}
def update_from_trans(
self,
trans: dict[str, Transaction],
) -> dict[str, Position]:
pps = self.pps
updated: dict[str, Position] = {}
# lifo update all pps from records
for tid, r in trans.items():
pp = pps.setdefault(
r.bsuid,
# if no existing pp, allocate fresh one.
Position(
Symbol.from_fqsn(
r.fqsn,
info={},
),
size=0.0,
be_price=0.0,
bsuid=r.bsuid,
expiry=r.expiry,
)
)
# don't do updates for ledger records we already have
# included in the current pps state.
if r.tid in pp.clears:
# NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from
# the current session, so we need to make sure we don't
# "double count" these in pp calculations.
continue
# lifo style "breakeven" price calc
pp.lifo_update(
r.size,
r.price,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost=2*r.cost,
)
# track clearing data
pp.update(r)
updated[r.bsuid] = pp
return updated
def dump_active(
self,
brokername: str,
) -> tuple[
dict[str, Any],
dict[str, Position]
]:
'''
Iterate all tabulated positions, render active positions to
a ``dict`` format amenable to serialization (via TOML) and drop
from state (``.pps``) as well as return in a ``dict`` all
``Position``s which have recently closed.
'''
# ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
# NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react
# based on the closure.
pp_entries = {}
closed_pp_objs: dict[str, Position] = {}
pp_objs = self.pps
for bsuid in list(pp_objs):
pp = pp_objs[bsuid]
# XXX: debug hook for size mismatches
# qqqbsuid = 320227571
# if bsuid == qqqbsuid:
# breakpoint()
pp.minimize_clears()
if (
# "net-zero" is a "closed" position
pp.size == 0
# time-expired pps (normally derivatives) are "closed"
or (pp.expiry and pp.expiry < now())
):
# for expired cases
pp.size = 0
# NOTE: we DO NOT pop the pp here since it can still be
# used to check for duplicate clears that may come in as
# new transaction from some backend API and need to be
# ignored; the closed positions won't be written to the
# ``pps.toml`` since ``pp_entries`` above is what's
# written.
# closed_pp = pp_objs.pop(bsuid, None)
closed_pp = pp_objs.get(bsuid)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else:
# serialize to pre-toml form
asdict = pp.to_pretoml()
if pp.expiry is None:
asdict.pop('expiry', None)
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.removeprefix(f'{brokername}.')
pp_entries[brokerless_key] = asdict
return pp_entries, closed_pp_objs
def update_pps(
records: dict[str, Transaction],
pps: Optional[dict[str, Position]] = None
@ -312,55 +452,12 @@ def update_pps(
'''
pps: dict[str, Position] = pps or {}
# lifo update all pps from records
for r in records:
pp = pps.setdefault(
r.bsuid,
# if no existing pp, allocate fresh one.
Position(
Symbol.from_fqsn(
r.fqsn,
info={},
),
size=0.0,
be_price=0.0,
bsuid=r.bsuid,
expiry=r.expiry,
)
)
# don't do updates for ledger records we already have
# included in the current pps state.
if r.tid in pp.clears:
# NOTE: likely you'll see repeats of the same
# ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from
# the current session, so we need to make sure we don't
# "double count" these in pp calculations.
continue
# lifo style "breakeven" price calc
pp.lifo_update(
r.size,
r.price,
# include transaction cost in breakeven price
# and presume the worst case of the same cost
# to exit this transaction (even though in reality
# it will be dynamic based on exit stratetgy).
cost=2*r.cost,
)
# track clearing data
pp.update(r)
return pps
table = PpTable(pps)
table.update_from_trans(records)
return table.pps
def load_pps_from_ledger(
def load_trans_from_ledger(
brokername: str,
acctname: str,
@ -385,55 +482,18 @@ def load_pps_from_ledger(
return {}
brokermod = get_brokermod(brokername)
src_records = brokermod.norm_trade_records(ledger)
src_records: dict[str, Transaction] = brokermod.norm_trade_records(ledger)
if filter_by:
records = {}
bsuids = set(filter_by)
records = list(filter(lambda r: r.bsuid in bsuids, src_records))
for tid, r in src_records.items():
if r.bsuid in bsuids:
records[tid] = r
else:
records = src_records
return update_pps(records)
def get_pps(
brokername: str,
acctids: Optional[set[str]] = set(),
) -> dict[str, dict[str, Position]]:
'''
Read out broker-specific position entries from
incremental update file: ``pps.toml``.
'''
conf, path = config.load(
'pps',
# load dicts as inlines to preserve compactness
# _dict=toml.decoder.InlineTableDict,
)
all_active = {}
all_closed = {}
# try to load any ledgers if no section found
bconf, path = config.load('brokers')
accounts = bconf[brokername]['accounts']
for account in accounts:
# TODO: instead of this filter we could
# always send all known pps but just not audit
# them since an active client might not be up?
if (
acctids and
f'{brokername}.{account}' not in acctids
):
continue
active, closed = update_pps_conf(brokername, account)
all_active.setdefault(account, {}).update(active)
all_closed.setdefault(account, {}).update(closed)
return all_active, all_closed
return records
# TODO: instead see if we can hack tomli and tomli-w to do the same:
@ -578,47 +638,77 @@ def load_pps_from_toml(
# caller to pass in a symbol set they'd like to reload from the
# underlying ledger to be reprocessed in computing pps state.
reload_records: Optional[dict[str, str]] = None,
# XXX: this is "global" update from ledger flag which
# does a full refresh of pps from the available ledger.
update_from_ledger: bool = False,
) -> tuple[dict, dict[str, Position]]:
) -> tuple[PpTable, dict[str, str]]:
'''
Load and marshal to objects all pps from either an existing
``pps.toml`` config, or from scratch from a ledger file when
none yet exists.
'''
with open_pps(
brokername,
acctid,
write_on_exit=False,
) as table:
pp_objs = table.pps
# no pps entry yet for this broker/account so parse any available
# ledgers to build a brand new pps state.
if not pp_objs or update_from_ledger:
trans = load_trans_from_ledger(
brokername,
acctid,
)
table.update_from_trans(trans)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pp_objs and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
trans = load_trans_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
table.update_from_trans(trans)
if not table.pps:
log.warning(
f'No `pps.toml` values could be loaded {brokername}:{acctid}'
)
return table, table.conf
@cm
def open_pps(
brokername: str,
acctid: str,
write_on_exit: bool = True,
) -> PpTable:
'''
Read out broker-specific position entries from
incremental update file: ``pps.toml``.
'''
conf, path = config.load('pps')
brokersection = conf.setdefault(brokername, {})
pps = brokersection.setdefault(acctid, {})
pp_objs = {}
table = PpTable(pp_objs, conf=conf)
# no pps entry yet for this broker/account so parse any available
# ledgers to build a brand new pps state.
if not pps or update_from_ledger:
pp_objs = load_pps_from_ledger(
brokername,
acctid,
)
# Reload symbol specific ledger entries if requested by the
# caller **AND** none exist in the current pps state table.
elif (
pps and reload_records
):
# no pps entry yet for this broker/account so parse
# any available ledgers to build a pps state.
pp_objs = load_pps_from_ledger(
brokername,
acctid,
filter_by=reload_records,
)
if not pps:
log.warning(
f'No `pps.toml` positions could be loaded {brokername}:{acctid}'
)
# unmarshal/load ``pps.toml`` config entries into object form.
# unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries.
for fqsn, entry in pps.items():
bsuid = entry['bsuid']
@ -674,29 +764,62 @@ def load_pps_from_toml(
clears=clears,
)
return conf, pp_objs
yield table
if not write_on_exit:
return
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n')
pp_entries, closed_pp_objs = table.dump_active(brokername)
conf[brokername][acctid] = pp_entries
# TODO: why tf haven't they already done this for inline
# tables smh..
enc = PpsEncoder(preserve=True)
# table_bs_type = type(toml.TomlDecoder().get_empty_inline_table())
enc.dump_funcs[
toml.decoder.InlineTableDict
] = enc.dump_inline_table
config.write(
conf,
'pps',
encoder=enc,
)
def update_pps_conf(
brokername: str,
acctid: str,
trade_records: Optional[list[Transaction]] = None,
trade_records: Optional[dict[str, Transaction]] = None,
ledger_reload: Optional[dict[str, str]] = None,
) -> tuple[
dict[str, Position],
dict[str, Position],
]:
# this maps `.bsuid` values to positions
pp_objs: dict[Union[str, int], Position]
# TODO: ideally we can pass in an existing
# pps state to this right? such that we
# don't have to do a ledger reload all the
# time.. a couple ideas I can think of,
# - load pps once after backend ledger state
# is loaded and keep maintainend in memory
# inside a with block,
# - mirror this in some client side actor which
# does the actual ledger updates (say the paper
# engine proc if we decide to always spawn it?),
# - do diffs against updates from the ledger writer
# actor and the in-mem state here?
if trade_records and ledger_reload:
for r in trade_records:
for tid, r in trade_records.items():
ledger_reload[r.bsuid] = r.fqsn
conf, pp_objs = load_pps_from_toml(
table, conf = load_pps_from_toml(
brokername,
acctid,
reload_records=ledger_reload,
@ -705,60 +828,11 @@ def update_pps_conf(
# update all pp objects from any (new) trade records which
# were passed in (aka incremental update case).
if trade_records:
pp_objs = update_pps(
trade_records,
pps=pp_objs,
)
table.update_from_trans(trade_records)
pp_entries = {} # dict-serialize all active pps
# NOTE: newly closed position are also important to report/return
# since a consumer, like an order mode UI ;), might want to react
# based on the closure.
closed_pp_objs: dict[str, Position] = {}
for bsuid in list(pp_objs):
pp = pp_objs[bsuid]
# XXX: debug hook for size mismatches
# if bsuid == 447767096:
# breakpoint()
pp.minimize_clears()
if (
pp.size == 0
# drop time-expired positions (normally derivatives)
or (pp.expiry and pp.expiry < now())
):
# if expired the position is closed
pp.size = 0
# position is already closed aka "net zero"
closed_pp = pp_objs.pop(bsuid, None)
if closed_pp:
closed_pp_objs[bsuid] = closed_pp
else:
# serialize to pre-toml form
asdict = pp.to_pretoml()
if pp.expiry is None:
asdict.pop('expiry', None)
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
# drop symbol obj in serialized form
s = asdict.pop('symbol')
fqsn = s.front_fqsn()
log.info(f'Updating active pp: {fqsn}')
# XXX: ugh, it's cuz we push the section under
# the broker name.. maybe we need to rethink this?
brokerless_key = fqsn.removeprefix(f'{brokername}.')
pp_entries[brokerless_key] = asdict
# this maps `.bsuid` values to positions
pp_entries, closed_pp_objs = table.dump_active(brokername)
pp_objs: dict[Union[str, int], Position] = table.pps
conf[brokername][acctid] = pp_entries