`Account` api update and refine

Rename `open_pps()` -> `open_account()` for obvious reasons as well as
expect a bit tighter integration with `SymbologyCache` and consequently
`LedgerTransaction` in order to drop `Transaction.sym: MktPair`
dependence when compiling / allocating new `Position`s from a ledger.

Also we drop a bunch of  prior attrs and do some cleaning,
- `Position.first_clear_dt` we no longer sort during insert.
- `._clears` now replaces by `._events` table.
- drop the now masked `.ensure_state()` method (eventually moved to
  `.calc` submod for maybe-later-use).
- drop `.sym=` from all remaining txns init calls.
- clean out the `Position.add_clear()` method and only add the provided
  txn directly to the `._events` table.

Improve some `Account` docs and interface:
- fill out the main type descr.
- add the backend broker modules as `Account.mod` allowing to drop
  `.brokername` as input and instead wrap as a `@property`.
- make `.update_from_trans()` now a new `.update_from_ledger()` and
  expect either of a `TransactionLedger` (user-dict) or a dict of txns;
  in the latter case if we have not been also passed a symcache as input
  then runtime error since the symcache is necessary to allocate
  positions.
  - also, delegate to `TransactionLedger.iter_txns()` instead of
    a manual datetime sorted iter-loop.
  - drop all the clears datetime don't-insert-if-earlier-then-first
    logic.
- rename `.to_toml()` -> `.prep_toml()`.
- drop old `PpTable` alias.
- rename `load_pps_from_ledger()` -> `load_account_from_ledger()` and
  make it only deliver the account instance and also move out all the
  `polars.DataFrame` related stuff (to `.calc`).

And tweak some account clears table formatting,
- store datetimes as TOML native equivs.
- drop `be_price` fixing.
- obvsly drop `.ensure_state()` call to pps.
account_tests
Tyler Goodlet 2023-07-07 22:22:06 -04:00
parent 0e94e89373
commit f5d4f58610
1 changed files with 148 additions and 246 deletions

View File

@ -22,18 +22,17 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
''' '''
from __future__ import annotations from __future__ import annotations
# from bisect import insort
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
from decimal import Decimal from decimal import Decimal
from pprint import pformat from pprint import pformat
from pathlib import Path from pathlib import Path
from types import ModuleType
from typing import ( from typing import (
Any, Any,
Iterator, Iterator,
Generator Generator
) )
import polars as pl
import pendulum import pendulum
from pendulum import ( from pendulum import (
datetime, datetime,
@ -43,7 +42,6 @@ import tomlkit
from ._ledger import ( from ._ledger import (
Transaction, Transaction,
open_trade_ledger,
TransactionLedger, TransactionLedger,
) )
from ._mktinfo import ( from ._mktinfo import (
@ -60,6 +58,7 @@ from ..clearing._messages import (
BrokerdPosition, BrokerdPosition,
) )
from ..data.types import Struct from ..data.types import Struct
from ..data._symcache import SymbologyCache
from ..log import get_logger from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
@ -105,19 +104,12 @@ class Position(Struct):
split_ratio: int | None = None split_ratio: int | None = None
# ordered record of known constituent trade messages # TODO: use a `pl.DataFrame` intead?
_clears: list[
dict[str, Any], # transaction history summaries
] = []
# _events: pl.DataFrame | None = None
_events: dict[str, Transaction | dict] = {} _events: dict[str, Transaction | dict] = {}
# first_clear_dt: datetime | None = None
@property @property
def expiry(self) -> datetime | None: def expiry(self) -> datetime | None:
exp: str = self.mkt.expiry exp: str = self.mkt.expiry.lower()
match exp: match exp:
# empty str, 'perp' (contract) or simply a null # empty str, 'perp' (contract) or simply a null
# signifies instrument with NO expiry. # signifies instrument with NO expiry.
@ -188,7 +180,7 @@ class Position(Struct):
''' '''
# scan for the last "net zero" position by iterating # scan for the last "net zero" position by iterating
# transactions until the next net-zero accum_size, rinse, # transactions until the next net-zero cumsize, rinse,
# repeat. # repeat.
cumsize: float = 0 cumsize: float = 0
clears_since_zero: list[dict] = [] clears_since_zero: list[dict] = []
@ -223,6 +215,7 @@ class Position(Struct):
''' '''
mkt: MktPair = self.mkt mkt: MktPair = self.mkt
assert isinstance(mkt, MktPair) assert isinstance(mkt, MktPair)
# TODO: we need to figure out how to have one top level # TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing # listing venue here even when the backend isn't providing
# it via the trades ledger.. # it via the trades ledger..
@ -239,16 +232,19 @@ class Position(Struct):
asdict: dict[str, Any] = { asdict: dict[str, Any] = {
'bs_mktid': self.bs_mktid, 'bs_mktid': self.bs_mktid,
'expiry': self.expiry or '', # 'expiry': self.expiry or '',
'asset_type': asset_type, 'asset_type': asset_type,
'price_tick': mkt.price_tick, 'price_tick': mkt.price_tick,
'size_tick': mkt.size_tick, 'size_tick': mkt.size_tick,
} }
if exp := self.expiry: if exp := self.expiry:
asdict['expiry'] = exp asdict['expiry'] = exp
clears_since_zero: list[dict] = self.minimized_clears() clears_since_zero: list[dict] = self.minimized_clears()
# setup a "multi-line array of inline tables" which we call
# the "clears table", contained by each position entry in
# an "account file".
clears_table: tomlkit.Array = tomlkit.array() clears_table: tomlkit.Array = tomlkit.array()
clears_table.multiline( clears_table.multiline(
multiline=True, multiline=True,
@ -267,69 +263,21 @@ class Position(Struct):
for k in ['price', 'size', 'cost']: for k in ['price', 'size', 'cost']:
inline_table[k] = entry[k] inline_table[k] = entry[k]
# serialize datetime to parsable `str` # NOTE: we don't actually need to serialize datetime to parsable `str`
inline_table['dt'] = entry['dt']#.isoformat('T') # since `tomlkit` supports a native `DateTime` but
# assert 'Datetime' not in inline_table['dt'] # seems like we're not doing it entirely in clearing
# tables yet?
inline_table['dt'] = entry['dt'] # .isoformat('T')
tid: str = entry['tid'] tid: str = entry['tid']
inline_table['tid'] = tid inline_table['tid'] = tid
clears_table.append(inline_table) clears_table.append(inline_table)
# if val < 0:
# breakpoint()
# assert not events # assert not events
asdict['clears'] = clears_table asdict['clears'] = clears_table
return fqme, asdict return fqme, asdict
# def ensure_state(self) -> None:
# '''
# Audit either the `.cumsize` and `.ppu` local instance vars against
# the clears table calculations and return the calc-ed values if
# they differ and log warnings to console.
# '''
# # clears: list[dict] = self._clears
# # self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt']
# last_clear: dict = clears[-1]
# csize: float = self.calc_size()
# accum: float = last_clear['accum_size']
# if not self.expired():
# if (
# csize != accum
# and csize != round(accum * (self.split_ratio or 1))
# ):
# raise ValueError(f'Size mismatch: {csize}')
# else:
# assert csize == 0, 'Contract is expired but non-zero size?'
# if self.cumsize != csize:
# log.warning(
# 'Position state mismatch:\n'
# f'{self.cumsize} => {csize}'
# )
# self.cumsize = csize
# cppu: float = self.calc_ppu()
# ppu: float = last_clear['ppu']
# if (
# cppu != ppu
# and self.split_ratio is not None
# # handle any split info entered (for now) manually by user
# and cppu != (ppu / self.split_ratio)
# ):
# raise ValueError(f'PPU mismatch: {cppu}')
# if self.ppu != cppu:
# log.warning(
# 'Position state mismatch:\n'
# f'{self.ppu} => {cppu}'
# )
# self.ppu = cppu
def update_from_msg( def update_from_msg(
self, self,
msg: BrokerdPosition, msg: BrokerdPosition,
@ -337,12 +285,13 @@ class Position(Struct):
) -> None: ) -> None:
mkt: MktPair = self.mkt mkt: MktPair = self.mkt
# we summarize the pos with a single summary transaction
# (for now) until we either pass THIS type as msg directly # NOTE WARNING XXX: we summarize the pos with a single
# from emsd or come up with a better way? # summary transaction (for now) until we either pass THIS
# type as msg directly from emsd or come up with a better
# way?
t = Transaction( t = Transaction(
fqme=mkt.bs_mktid, fqme=mkt.fqme,
sym=mkt,
bs_mktid=mkt.bs_mktid, bs_mktid=mkt.bs_mktid,
tid='unknown', tid='unknown',
size=msg['size'], size=msg['size'],
@ -357,15 +306,16 @@ class Position(Struct):
@property @property
def dsize(self) -> float: def dsize(self) -> float:
''' '''
The "dollar" size of the pp, normally in trading (fiat) unit The "dollar" size of the pp, normally in source asset
terms. (fiat) units.
''' '''
return self.ppu * self.size return self.ppu * self.size
def expired(self) -> bool: def expired(self) -> bool:
''' '''
Predicate which checks if the contract/instrument is past its expiry. Predicate which checks if the contract/instrument is past
its expiry.
''' '''
return bool(self.expiry) and self.expiry < now() return bool(self.expiry) and self.expiry < now()
@ -388,36 +338,23 @@ class Position(Struct):
log.warning(f'{t} is already added?!') log.warning(f'{t} is already added?!')
return added return added
# clear: dict[str, float | str | int] = { # TODO: apparently this IS possible with a dict but not
# 'tid': t.tid, # common and probably not that beneficial unless we're also
# 'cost': t.cost, # going to do cum-calcs on each insert?
# 'price': t.price, # https://stackoverflow.com/questions/38079171/python-insert-new-element-into-sorted-list-of-dictionaries
# 'size': t.size, # from bisect import insort
# 'dt': t.dt
# }
self._events[tid] = t
return True
# insort( # insort(
# self._clears, # self._clears,
# clear, # clear,
# key=lambda entry: entry['dt'] # key=lambda entry: entry['dt']
# ) # )
self._events[tid] = t
return True
# TODO: compute these incrementally instead # TODO: compute these incrementally instead
# of re-looping through each time resulting in O(n**2) # of re-looping through each time resulting in O(n**2)
# behaviour..? # behaviour..? Can we have some kinda clears len to cached
# output subsys?
# NOTE: we compute these **after** adding the entry in order to
# make the recurrence relation math work inside
# ``.calc_size()``.
# self.size = clear['accum_size'] = self.calc_size()
# self.ppu = clear['ppu'] = self.calc_ppu()
# self.size: float = self.calc_size()
# self.ppu: float = self.calc_ppu()
# assert len(self._events) == len(self._clears)
# return clear
def calc_ppu(self) -> float: def calc_ppu(self) -> float:
return ppu(self.iter_by_type('clear')) return ppu(self.iter_by_type('clear'))
@ -487,20 +424,50 @@ class Position(Struct):
class Account(Struct): class Account(Struct):
'''
The real-time (double-entry accounting) state of
a given **asset ownership tracking system**, normally offered
or measured from some brokerage, CEX or (implied virtual)
summary crypto$ "wallets" aggregated and tracked over some set
of DEX-es.
brokername: str Both market-mapped and ledger-system-native (aka inter-account
"transfers") transactions are accounted and they pertain to
(implied) PnL relatve to any other accountable asset.
More specifically in piker terms, an account tracks all of:
- the *balances* of all assets currently available for use either
in (future) market or (inter-account/wallet) transfer
transactions.
- a transaction *ledger* from a given brokerd backend whic
is a recording of all (know) such transactions from the past.
- a set of financial *positions* as measured from the current
ledger state.
See the semantic origins from double-bookeeping:
https://en.wikipedia.org/wiki/Double-entry_bookkeeping
'''
mod: ModuleType
acctid: str acctid: str
pps: dict[str, Position] pps: dict[str, Position]
conf_path: Path conf_path: Path
conf: dict | None = {} conf: dict | None = {}
# TODO: track a table of asset balances as `.balances: # TODO: track a table of asset balances as `.balances:
# dict[Asset, float]`? # dict[Asset, float]`?
def update_from_trans( @property
def brokername(self) -> str:
return self.mod.name
def update_from_ledger(
self, self,
trans: dict[str, Transaction], ledger: TransactionLedger,
cost_scalar: float = 2, cost_scalar: float = 2,
symcache: SymbologyCache | None = None,
) -> dict[str, Position]: ) -> dict[str, Position]:
''' '''
@ -509,24 +476,36 @@ class Account(Struct):
accumulative size for each entry. accumulative size for each entry.
''' '''
if (
not isinstance(ledger, TransactionLedger)
and symcache is None
):
raise RuntimeError(
'No ledger provided!\n'
'We can not determine the `MktPair`s without a symcache..\n'
'Please provide `symcache: SymbologyCache` when '
'processing NEW positions!'
)
pps = self.pps pps = self.pps
updated: dict[str, Position] = {} updated: dict[str, Position] = {}
# lifo update all pps from records, ensuring # lifo update all pps from records, ensuring
# we compute the PPU and size sorted in time! # we compute the PPU and size sorted in time!
for t in sorted( for tid, txn in ledger.iter_txns():
trans.values(), # for t in sorted(
key=lambda t: t.dt, # trans.values(),
# reverse=True, # key=lambda t: t.dt,
): # ):
fqme: str = t.fqme fqme: str = txn.fqme
bs_mktid: str = t.bs_mktid bs_mktid: str = txn.bs_mktid
# template the mkt-info presuming a legacy market ticks # template the mkt-info presuming a legacy market ticks
# if no info exists in the transactions.. # if no info exists in the transactions..
mkt: MktPair = t.sys mkt: MktPair = ledger._symcache.mktmaps[fqme]
if not (pos := pps.get(bs_mktid)): if not (pos := pps.get(bs_mktid)):
# if no existing pos, allocate fresh one. # if no existing pos, allocate fresh one.
pos = pps[bs_mktid] = Position( pos = pps[bs_mktid] = Position(
mkt=mkt, mkt=mkt,
@ -541,33 +520,16 @@ class Account(Struct):
if len(pos.mkt.fqme) < len(fqme): if len(pos.mkt.fqme) < len(fqme):
pos.mkt = mkt pos.mkt = mkt
# clears: list[dict] = pos._clears # update clearing table!
# if clears: # NOTE: likely you'll see repeats of the same
# # first_clear_dt = pos.first_clear_dt # ``Transaction`` passed in here if/when you are restarting
# a ``brokerd.ib`` where the API will re-report trades from
# # don't do updates for ledger records we already have # the current session, so we need to make sure we don't
# # included in the current pps state. # "double count" these in pp calculations;
# if ( # `Position.add_clear()` stores txs in a `dict[tid,
# t.tid in clears # tx]` which should always ensure this is true B)
# # or ( pos.add_clear(txn)
# # first_clear_dt updated[txn.bs_mktid] = pos
# # and t.dt < first_clear_dt
# # )
# ):
# # NOTE: likely you'll see repeats of the same
# # ``Transaction`` passed in here if/when you are restarting
# # a ``brokerd.ib`` where the API will re-report trades from
# # the current session, so we need to make sure we don't
# # "double count" these in pp calculations.
# continue
# update clearing table
pos.add_clear(t)
updated[t.bs_mktid] = pos
# re-calc ppu and accumulative sizing.
# for bs_mktid, pos in updated.items():
# pos.ensure_state()
# NOTE: deliver only the position entries that were # NOTE: deliver only the position entries that were
# actually updated (modified the state) from the input # actually updated (modified the state) from the input
@ -614,7 +576,7 @@ class Account(Struct):
return open_pp_objs, closed_pp_objs return open_pp_objs, closed_pp_objs
def to_toml( def prep_toml(
self, self,
active: dict[str, Position] | None = None, active: dict[str, Position] | None = None,
@ -629,12 +591,12 @@ class Account(Struct):
pos: Position pos: Position
for bs_mktid, pos in active.items(): for bs_mktid, pos in active.items():
# NOTE: we only store the minimal amount of clears that make up this
# position since the last net-zero state.
# pos.minimize_clears()
# pos.ensure_state() # pos.ensure_state()
# serialize to pre-toml form # serialize to pre-toml form
# NOTE: we only store the minimal amount of clears that
# make up this position since the last net-zero state,
# see `Position.to_pretoml()` for details
fqme, asdict = pos.to_pretoml() fqme, asdict = pos.to_pretoml()
# clears: list[dict] = asdict['clears'] # clears: list[dict] = asdict['clears']
@ -650,7 +612,8 @@ class Account(Struct):
def write_config(self) -> None: def write_config(self) -> None:
''' '''
Write the current position table to the user's ``pps.toml``. Write the current account state to the user's account TOML file, normally
something like ``pps.toml``.
''' '''
# TODO: show diff output? # TODO: show diff output?
@ -658,7 +621,7 @@ class Account(Struct):
# active, closed_pp_objs = table.dump_active() # active, closed_pp_objs = table.dump_active()
active, closed = self.dump_active() active, closed = self.dump_active()
pp_entries = self.to_toml(active=active) pp_entries = self.prep_toml(active=active)
if pp_entries: if pp_entries:
log.info( log.info(
f'Updating positions in ``{self.conf_path}``:\n' f'Updating positions in ``{self.conf_path}``:\n'
@ -705,24 +668,12 @@ class Account(Struct):
# super weird --1 thing going on for cumsize!?1! # super weird --1 thing going on for cumsize!?1!
# NOTE: the fix was to always float() the size value loaded # NOTE: the fix was to always float() the size value loaded
# in open_pps() below! # in open_pps() below!
# confclears = self.conf["tsla.nasdaq.ib"]['clears']
# firstcum = confclears[0]['cumsize']
# if firstcum:
# breakpoint()
config.write( config.write(
config=self.conf, config=self.conf,
path=self.conf_path, path=self.conf_path,
fail_empty=False, fail_empty=False,
) )
# breakpoint()
# TODO: move over all broker backend usage to new name..
PpTable = Account
def load_account( def load_account(
brokername: str, brokername: str,
@ -784,12 +735,12 @@ def load_account(
@cm @cm
def open_pps( def open_account(
brokername: str, brokername: str,
acctid: str, acctid: str,
write_on_exit: bool = False, write_on_exit: bool = False,
) -> Generator[PpTable, None, None]: ) -> Generator[Account, None, None]:
''' '''
Read out broker-specific position entries from Read out broker-specific position entries from
incremental update file: ``pps.toml``. incremental update file: ``pps.toml``.
@ -820,10 +771,12 @@ def open_pps(
# engine proc if we decide to always spawn it?), # engine proc if we decide to always spawn it?),
# - do diffs against updates from the ledger writer # - do diffs against updates from the ledger writer
# actor and the in-mem state here? # actor and the in-mem state here?
from ..brokers import get_brokermod
mod: ModuleType = get_brokermod(brokername)
pp_objs = {} pp_objs: dict[str, Position] = {}
table = PpTable( table = Account(
brokername, mod,
acctid, acctid,
pp_objs, pp_objs,
conf_path, conf_path,
@ -831,12 +784,10 @@ def open_pps(
) )
# unmarshal/load ``pps.toml`` config entries into object form # unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries. # and update `Account` obj entries.
for fqme, entry in conf.items(): for fqme, entry in conf.items():
# atype = entry.get('asset_type', '<unknown>') # unique broker-backend-system market id
# unique broker market id
bs_mktid = str( bs_mktid = str(
entry.get('bsuid') entry.get('bsuid')
or entry.get('bs_mktid') or entry.get('bs_mktid')
@ -860,7 +811,7 @@ def open_pps(
fqme, fqme,
price_tick=price_tick, price_tick=price_tick,
size_tick=size_tick, size_tick=size_tick,
bs_mktid=bs_mktid bs_mktid=bs_mktid,
) )
# TODO: RE: general "events" instead of just "clears": # TODO: RE: general "events" instead of just "clears":
@ -875,6 +826,7 @@ def open_pps(
# for toml re-presentation) back into a master table. # for toml re-presentation) back into a master table.
toml_clears_list: list[dict[str, Any]] = entry['clears'] toml_clears_list: list[dict[str, Any]] = entry['clears']
trans: list[Transaction] = [] trans: list[Transaction] = []
for clears_table in toml_clears_list: for clears_table in toml_clears_list:
tid = clears_table['tid'] tid = clears_table['tid']
dt: tomlkit.items.DateTime | str = clears_table['dt'] dt: tomlkit.items.DateTime | str = clears_table['dt']
@ -887,23 +839,18 @@ def open_pps(
clears_table['dt'] = dt clears_table['dt'] = dt
trans.append(Transaction( trans.append(Transaction(
fqme=bs_mktid, fqme=bs_mktid,
sym=mkt, # sym=mkt,
bs_mktid=bs_mktid, bs_mktid=bs_mktid,
tid=tid, tid=tid,
# XXX: not sure why sometimes these are loaded as
# `tomlkit.Integer` and are eventually written with
# an extra `-` in front like `--1`?
size=float(clears_table['size']), size=float(clears_table['size']),
price=float(clears_table['price']), price=float(clears_table['price']),
cost=clears_table['cost'], cost=clears_table['cost'],
dt=dt, dt=dt,
)) ))
# size = entry['size']
# # TODO: remove but, handle old field name for now
# ppu = entry.get(
# 'ppu',
# entry.get('be_price', 0),
# )
split_ratio = entry.get('split_ratio') split_ratio = entry.get('split_ratio')
# if a string-ified expiry field is loaded we try to parse # if a string-ified expiry field is loaded we try to parse
@ -929,9 +876,6 @@ def open_pps(
for t in trans: for t in trans:
pp.add_clear(t) pp.add_clear(t)
# audit entries loaded from toml
# pp.ensure_state()
try: try:
yield table yield table
finally: finally:
@ -939,7 +883,21 @@ def open_pps(
table.write_config() table.write_config()
def load_pps_from_ledger( # TODO: drop the old name and THIS!
@cm
def open_pps(
*args,
**kwargs,
) -> Generator[Account, None, None]:
log.warning(
'`open_pps()` is now deprecated!\n'
'Please use `with open_account() as cnt:`'
)
with open_account(*args, **kwargs) as acnt:
yield acnt
def load_account_from_ledger(
brokername: str, brokername: str,
acctname: str, acctname: str,
@ -947,10 +905,9 @@ def load_pps_from_ledger(
# post normalization filter on ledger entries to be processed # post normalization filter on ledger entries to be processed
filter_by_ids: dict[str, list[str]] | None = None, filter_by_ids: dict[str, list[str]] | None = None,
) -> tuple[ ledger: TransactionLedger | None = None,
pl.DataFrame,
PpTable, ) -> Account:
]:
''' '''
Open a ledger file by broker name and account and read in and Open a ledger file by broker name and account and read in and
process any trade records into our normalized ``Transaction`` form process any trade records into our normalized ``Transaction`` form
@ -958,67 +915,12 @@ def load_pps_from_ledger(
bs_mktid-mapped dict-sets of the transactions and pps. bs_mktid-mapped dict-sets of the transactions and pps.
''' '''
ledger: TransactionLedger acnt: Account
table: PpTable with open_pps(
with ( brokername,
open_trade_ledger(brokername, acctname) as ledger, acctname,
open_pps(brokername, acctname) as table, ) as acnt:
): if ledger is not None:
if not ledger: acnt.update_from_ledger(ledger)
# null case, no ledger file with content
return {}
from ..brokers import get_brokermod return acnt
mod = get_brokermod(brokername)
src_records: dict[str, Transaction] = mod.norm_trade_records(
ledger
)
table.update_from_trans(src_records)
fdf = df = pl.DataFrame(
list(rec.to_dict() for rec in src_records.values()),
# schema=[
# ('tid', str),
# ('fqme', str),
# ('dt', str),
# ('size', pl.Float64),
# ('price', pl.Float64),
# ('cost', pl.Float64),
# ('expiry', str),
# ('bs_mktid', str),
# ],
).sort('dt').select([
pl.col('fqme'),
pl.col('dt').str.to_datetime(),
# pl.col('expiry').dt.datetime(),
pl.col('bs_mktid'),
pl.col('size'),
pl.col('price'),
])
# ppt = df.groupby('fqme').agg([
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
acts = df.partition_by('fqme', as_dict=True)
# ppt: dict[str, pl.DataFrame] = {}
# for fqme, ppt in act.items():
# ppt.with_columuns
# # TODO: ppu and bep !!
# pl.cumsum('size').alias('cumsum'),
# ])
# filter out to the columns matching values filter passed
# as input.
if filter_by_ids:
for col, vals in filter_by_ids.items():
str_vals = set(map(str, vals))
pred: pl.Expr = pl.col(col).eq(str_vals.pop())
for val in str_vals:
pred |= pl.col(col).eq(val)
fdf = df.filter(pred)
bs_mktid: str = fdf[0]['bs_mktid']
# pos: Position = table.pps[bs_mktid]
return fdf, acts, table