Compare commits

..

No commits in common. "c53071e43a291251924f42cba84d8e01672b729f" and "96f5a8abb878c391c875e6c0b3853e397c8d50d2" have entirely different histories.

7 changed files with 193 additions and 368 deletions

View File

@ -195,8 +195,9 @@ async def open_piker_runtime(
) -> Optional[tractor._portal.Portal]: ) -> Optional[tractor._portal.Portal]:
''' '''
Start a piker actor who's runtime will automatically sync with Start a piker actor who's runtime will automatically
existing piker actors on the local link based on configuration. sync with existing piker actors in local network
based on configuration.
''' '''
global _services global _services

View File

@ -412,7 +412,7 @@ class Client:
# ``end_dt`` which exceeds the ``duration``, # ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return # - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()... # an empty list thing with bars.clear()...
return [], np.empty(0), dt_duration return [], np.empty(0)
# TODO: we could maybe raise ``NoData`` instead if we # TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no # rewrite the method in the first case? right now there's no
# way to detect a timeout. # way to detect a timeout.
@ -483,7 +483,7 @@ class Client:
self, self,
pattern: str, pattern: str,
# how many contracts to search "up to" # how many contracts to search "up to"
upto: int = 16, upto: int = 6,
asdicts: bool = True, asdicts: bool = True,
) -> dict[str, ContractDetails]: ) -> dict[str, ContractDetails]:
@ -518,16 +518,6 @@ class Client:
exch = tract.exchange exch = tract.exchange
if exch not in _exch_skip_list: if exch not in _exch_skip_list:
# try to lookup any contracts from our adhoc set
# since often the exchange/venue is named slightly
# different (eg. BRR.CMECRYPTO` instead of just
# `.CME`).
info = _adhoc_symbol_map.get(sym)
if info:
con_kwargs, bars_kwargs = info
exch = con_kwargs['exchange']
# try get all possible contracts for symbol as per, # try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = ibis.Future( con = ibis.Future(
@ -1096,7 +1086,6 @@ async def load_aio_clients(
# retry a few times to get the client going.. # retry a few times to get the client going..
connect_retries: int = 3, connect_retries: int = 3,
connect_timeout: float = 0.5, connect_timeout: float = 0.5,
disconnect_on_exit: bool = True,
) -> dict[str, Client]: ) -> dict[str, Client]:
''' '''
@ -1238,7 +1227,6 @@ async def load_aio_clients(
finally: finally:
# TODO: for re-scans we'll want to not teardown clients which # TODO: for re-scans we'll want to not teardown clients which
# are up and stable right? # are up and stable right?
if disconnect_on_exit:
for acct, client in _accounts2clients.items(): for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}') log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect() client.ib.disconnect()

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch entry['listingExchange'] = pexch
conf = get_config() conf = get_config()
entries = api_trades_to_ledger_entries( entries = trades_to_ledger_entries(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else: else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}' entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError( # raise ValueError(
# log.error( log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n' f'ib: {ibppmsg}\n'
f'piker: {msg}\n' f'piker: {msg}\n'
@ -1123,16 +1123,18 @@ def norm_trade_records(
continue continue
# timestamping is way different in API records # timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date') date = record.get('date')
flex_dtstr = record.get('dateTime') if not date:
if dtstr or date:
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp.. # probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime']) date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
else:
# epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
# special handling of symbol extraction from # special handling of symbol extraction from
# flex records using some ad-hoc schema parsing. # flex records using some ad-hoc schema parsing.
@ -1181,29 +1183,41 @@ def norm_trade_records(
return {r.tid: r for r in records} return {r.tid: r for r in records}
def parse_flex_dt( def trades_to_ledger_entries(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
accounts: bidict, accounts: bidict,
trade_entries: list[object], trade_entries: list[object],
source_type: str = 'api',
) -> dict: ) -> dict:
''' '''
Convert API execution objects entry objects into ``dict`` form, Convert either of API execution objects or flex report
pretty much straight up without modification except add entry objects into ``dict`` form, pretty much straight up
a `pydatetime` field from the parsed timestamp. without modification.
''' '''
trades_by_account = {} trades_by_account = {}
for t in trade_entries: for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
# NOTE: example of schema we pull from the API client. # NOTE: example of schema we pull from the API client.
# { # {
# 'commissionReport': CommissionReport(... # 'commissionReport': CommissionReport(...
@ -1230,8 +1244,7 @@ def api_trades_to_ledger_entries(
tid = str(entry['execId']) tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time']) dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str? # TODO: why isn't this showing seconds in the str?
entry['pydatetime'] = dt entry['date'] = str(dt)
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']] acctid = accounts[entry['acctNumber']]
if not tid: if not tid:
@ -1250,73 +1263,6 @@ def api_trades_to_ledger_entries(
acctid, {} acctid, {}
)[tid] = entry )[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account return trades_by_account
@ -1363,16 +1309,15 @@ def load_flex_trades(
ln = len(trade_entries) ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query') log.info(f'Loaded {ln} trades from flex query')
trades_by_account = flex_records_to_ledger_entries( trades_by_account = trades_to_ledger_entries(
conf['accounts'].inverse, # reverse map to user account names # get reverse map to user account names
conf['accounts'].inverse,
trade_entries, trade_entries,
source_type='flex',
) )
ledger_dict: Optional[dict] = None
for acctid in trades_by_account: for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid] trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict: with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict) tid_delta = set(trades_by_id) - set(ledger_dict)
log.info( log.info(
@ -1380,11 +1325,9 @@ def load_flex_trades(
f'{pformat(tid_delta)}' f'{pformat(tid_delta)}'
) )
if tid_delta: if tid_delta:
sorted_delta = dict(sorted( ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}.items(), {tid: trades_by_id[tid] for tid in tid_delta}
key=lambda entry: entry[1].pop('pydatetime'), )
))
ledger_dict.update(sorted_delta)
return ledger_dict return ledger_dict

View File

@ -254,9 +254,6 @@ async def wait_on_data_reset(
return False return False
_data_resetter_task: trio.Task | None = None
async def get_bars( async def get_bars(
proxy: MethodProxy, proxy: MethodProxy,
@ -267,7 +264,7 @@ async def get_bars(
end_dt: str = '', end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency.. # TODO: make this more dynamic based on measured frame rx latency..
timeout: float = 3, # how long before we trigger a feed reset timeout: float = 1.5, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -277,15 +274,13 @@ async def get_bars(
a ``MethoProxy``. a ``MethoProxy``.
''' '''
global _data_resetter_task data_cs: Optional[trio.CancelScope] = None
result: Optional[tuple[
data_cs: trio.CancelScope | None = None
result: tuple[
ibis.objects.BarDataList, ibis.objects.BarDataList,
np.ndarray, np.ndarray,
datetime, datetime,
datetime, datetime,
] | None = None ]] = None
result_ready = trio.Event() result_ready = trio.Event()
async def query(): async def query():
@ -312,7 +307,7 @@ async def get_bars(
log.warning( log.warning(
f'History is blank for {dt_duration} from {end_dt}' f'History is blank for {dt_duration} from {end_dt}'
) )
end_dt -= dt_duration end_dt = end_dt.subtract(dt_duration)
continue continue
if bars_array is None: if bars_array is None:
@ -405,10 +400,6 @@ async def get_bars(
else: else:
raise raise
# TODO: make this global across all history task/requests
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
# start history request that we allow # start history request that we allow
@ -423,14 +414,6 @@ async def get_bars(
await result_ready.wait() await result_ready.wait()
break break
if _data_resetter_task:
# don't double invoke the reset hack if another
# requester task already has it covered.
continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter = True
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await nurse.start(
partial( partial(
@ -442,7 +425,6 @@ async def get_bars(
# sync wait on reset to complete # sync wait on reset to complete
await reset_done.wait() await reset_done.wait()
_data_resetter_task = None if unset_resetter else _data_resetter_task
return result, data_cs is not None return result, data_cs is not None
@ -501,9 +483,7 @@ async def _setup_quote_stream(
to_trio.send_nowait(None) to_trio.send_nowait(None)
async with load_aio_clients( async with load_aio_clients() as accts2clients:
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients) caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -583,8 +563,7 @@ async def open_aio_quote_stream(
from_aio = _quote_streams.get(symbol) from_aio = _quote_streams.get(symbol)
if from_aio: if from_aio:
# if we already have a cached feed deliver a rx side clone # if we already have a cached feed deliver a rx side clone to consumer
# to consumer
async with broadcast_receiver( async with broadcast_receiver(
from_aio, from_aio,
2**6, 2**6,
@ -775,45 +754,17 @@ async def stream_quotes(
await trio.sleep_forever() await trio.sleep_forever()
return # we never expect feed to come up? return # we never expect feed to come up?
cs: Optional[trio.CancelScope] = None async with open_aio_quote_stream(
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as stream, ) as stream:
):
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup = False
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream): async with aclosing(stream):
if syminfo.get('no_vlm', False): if syminfo.get('no_vlm', False):
@ -821,31 +772,29 @@ async def stream_quotes(
# include vlm data. # include vlm data.
atype = syminfo['asset_type'] atype = syminfo['asset_type']
log.info( log.info(
f'No-vlm {sym}@{atype}, skipping quote poll' f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
) )
else: else:
# wait for real volume on feed (trading might be # wait for real volume on feed (trading might be closed)
# closed)
while True: while True:
ticker = await stream.receive() ticker = await stream.receive()
# for a real volume contract we rait for # for a real volume contract we rait for the first
# the first "real" trade to take place # "real" trade to take place
if ( if (
# not calc_price # not calc_price
# and not ticker.rtTime # and not ticker.rtTime
not ticker.rtTime not ticker.rtTime
): ):
# spin consuming tickers until we # spin consuming tickers until we get a real
# get a real market datum # market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
log.debug("Received first volume tick") log.debug("Received first real volume tick")
# ugh, clear ticks since we've # ugh, clear ticks since we've consumed them
# consumed them (ahem, ib_insync is # (ahem, ib_insync is truly stateful trash)
# truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# XXX: this works because we don't use # XXX: this works because we don't use
@ -973,14 +922,7 @@ async def open_symbol_search(
except trio.WouldBlock: except trio.WouldBlock:
pass pass
if ( if not pattern or pattern.isspace():
not pattern
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
or len(pattern) < 1
):
log.warning('empty pattern received, skipping..') log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client # TODO: *BUG* if nothing is returned here the client

View File

@ -138,26 +138,25 @@ def cli(ctx, brokers, loglevel, tl, configdir):
@click.pass_obj @click.pass_obj
def services(config, tl, names): def services(config, tl, names):
from .._daemon import open_piker_runtime
async def list_services(): async def list_services():
async with (
open_piker_runtime( async with tractor.get_arbiter(
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr'] *_tractor_kwargs['arbiter_addr']
) as portal ) as portal:
):
registry = await portal.run_from_ns('self', 'get_registry') registry = await portal.run_from_ns('self', 'get_registry')
json_d = {} json_d = {}
for key, socket in registry.items(): for key, socket in registry.items():
# name, uuid = uid
host, port = socket host, port = socket
json_d[key] = f'{host}:{port}' json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}") click.echo(f"{colorize_json(json_d)}")
trio.run(list_services) tractor.run(
list_services,
name='service_query',
loglevel=config['loglevel'] if tl else None,
arbiter_addr=_tractor_kwargs['arbiter_addr'],
)
def _load_clis() -> None: def _load_clis() -> None:

View File

@ -333,7 +333,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage. # do a decently sized backfill and load it into storage.
periods = { periods = {
1: {'days': 6}, 1: {'days': 6},
60: {'years': 6}, 60: {'years': 10},
} }
kwargs = periods[step_size_s] kwargs = periods[step_size_s]
@ -348,45 +348,36 @@ async def start_backfill(
# last retrieved start dt to the next request as # last retrieved start dt to the next request as
# it's end dt. # it's end dt.
starts: set[datetime] = set() starts: set[datetime] = set()
while start_dt > last_tsdb_dt: while start_dt > last_tsdb_dt:
print(f"QUERY end_dt={start_dt}")
try: try:
log.info( log.info(
f'Requesting {step_size_s}s frame ending in {start_dt}' f'Requesting {step_size_s}s frame ending in {start_dt}'
) )
array, next_start_dt, end_dt = await hist( array, start_dt, end_dt = await hist(
timeframe, timeframe,
end_dt=start_dt, end_dt=start_dt,
) )
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if new
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp() assert array['time'][0] == start_dt.timestamp()
except NoData: except NoData:
# XXX: unhandled history gap (shouldn't happen?)
log.warning( log.warning(
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
) )
await tractor.breakpoint() return None # discard signal
except DataUnavailable: # as duerr: except DataUnavailable as duerr:
# broker is being a bish and we can't pull any more.. # broker is being a bish and we can't pull
log.warning( # any more..
f'NO-MORE-DATA: backend {mod.name} halted history!?' log.warning('backend halted on data deliver !?!?')
)
# ugh, what's a better way? # ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle # TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the # condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved? # request loop until the condition is resolved?
return return duerr
diff = end_dt - start_dt diff = end_dt - start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
@ -403,6 +394,22 @@ async def start_backfill(
f'{diff} ~= {frame_time_diff_s} seconds' f'{diff} ~= {frame_time_diff_s} seconds'
) )
array, _start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
if (
_start_dt in starts
):
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
start_dt = min(starts)
continue
# only update new start point if new
start_dt = _start_dt
starts.add(start_dt)
to_push = diff_history( to_push = diff_history(
array, array,
start_dt, start_dt,
@ -491,10 +498,10 @@ async def manage_history(
readonly=False, readonly=False,
) )
# TODO: history validation # TODO: history validation
# if not opened: if not opened:
# raise RuntimeError( raise RuntimeError(
# "Persistent shm for sym was already open?!" "Persistent shm for sym was already open?!"
# ) )
rt_shm, opened = maybe_open_shm_array( rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt', key=f'{fqsn}_rt',
@ -506,10 +513,10 @@ async def manage_history(
readonly=False, readonly=False,
size=3*_secs_in_day, size=3*_secs_in_day,
) )
# if not opened: if not opened:
# raise RuntimeError( raise RuntimeError(
# "Persistent shm for sym was already open?!" "Persistent shm for sym was already open?!"
# ) )
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')

View File

@ -20,7 +20,6 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends) (looking at you `ib` and dirt-bird friends)
''' '''
from __future__ import annotations
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
from pprint import pformat from pprint import pformat
import os import os
@ -139,31 +138,13 @@ class Position(Struct):
# ordered record of known constituent trade messages # ordered record of known constituent trade messages
clears: dict[ clears: dict[
str | int, # trade id Union[str, int, Status], # trade id
dict[str, Any], # transaction history summaries dict[str, Any], # transaction history summaries
] = {} ] = {}
first_clear_dt: Optional[datetime] = None first_clear_dt: Optional[datetime] = None
expiry: Optional[datetime] = None expiry: Optional[datetime] = None
# @property
# def clears(self) -> dict[
# Union[str, int, Status], # trade id
# dict[str, Any], # transaction history summaries
# ]:
# '''
# Datetime sorted reference to internal clears table.
# '''
# # self._clears = {}
# self._clears = dict(sorted(
# self._clears.items(),
# key=lambda entry: entry[1]['dt'],
# ))
# # self._clears[k] = v
# return self._clears
def to_dict(self) -> dict: def to_dict(self) -> dict:
return { return {
f: getattr(self, f) f: getattr(self, f)
@ -238,10 +219,6 @@ class Position(Struct):
''' '''
clears = list(self.clears.values()) clears = list(self.clears.values())
if not clears:
log.warning(f'No clears table for {self.symbol}!?')
return
self.first_clear_dt = min(list(entry['dt'] for entry in clears)) self.first_clear_dt = min(list(entry['dt'] for entry in clears))
last_clear = clears[-1] last_clear = clears[-1]
@ -646,7 +623,6 @@ class PpTable(Struct):
def to_toml( def to_toml(
self, self,
min_clears: bool = True,
) -> dict[str, Any]: ) -> dict[str, Any]:
active, closed = self.dump_active() active, closed = self.dump_active()
@ -659,9 +635,7 @@ class PpTable(Struct):
# keep the minimal amount of clears that make up this # keep the minimal amount of clears that make up this
# position since the last net-zero state. # position since the last net-zero state.
if min_clears:
pos.minimize_clears() pos.minimize_clears()
pos.ensure_state() pos.ensure_state()
# serialize to pre-toml form # serialize to pre-toml form
@ -708,8 +682,6 @@ def load_pps_from_ledger(
brokername: str, brokername: str,
acctname: str, acctname: str,
table: Optional[PpTable] = None,
# post normalization filter on ledger entries to be processed # post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None, filter_by: Optional[list[dict]] = None,
@ -726,6 +698,7 @@ def load_pps_from_ledger(
''' '''
with ( with (
open_trade_ledger(brokername, acctname) as ledger, open_trade_ledger(brokername, acctname) as ledger,
open_pps(brokername, acctname) as table,
): ):
if not ledger: if not ledger:
# null case, no ledger file with content # null case, no ledger file with content
@ -743,10 +716,6 @@ def load_pps_from_ledger(
else: else:
records = src_records records = src_records
if table is None:
with open_pps(brokername, acctname) as table:
updated = table.update_from_trans(records)
else:
updated = table.update_from_trans(records) updated = table.update_from_trans(records)
return records, updated return records, updated
@ -917,27 +886,15 @@ def open_pps(
conf=conf, conf=conf,
) )
# first pass populate all missing clears record tables
# for fqsn, entry in pps.items():
# # convert clears sub-tables (only in this form
# # for toml re-presentation) back into a master table.
# clears_list = entry.get('clears', [])
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=[entry['bsuid']],
# table=table,
# )
# # breakpoint()
# unmarshal/load ``pps.toml`` config entries into object form # unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries. # and update `PpTable` obj entries.
for fqsn, entry in pps.items(): for fqsn, entry in pps.items():
bsuid = entry['bsuid'] bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top # index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our # level dict instead of a list (as is presented in our
# ``pps.toml``). # ``pps.toml``).
@ -949,18 +906,6 @@ def open_pps(
# processing of new clear events. # processing of new clear events.
trans: list[Transaction] = [] trans: list[Transaction] = []
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# table=table,
# )
for clears_table in clears_list: for clears_table in clears_list:
tid = clears_table.pop('tid') tid = clears_table.pop('tid')
dtstr = clears_table['dt'] dtstr = clears_table['dt']