Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet c53071e43a WIP adding draft-commented code to try and get splits workin.. 2022-10-19 13:18:19 -04:00
Tyler Goodlet 41ffccc59e Add data-reset-task global state var
Allows keeping mutex state around data reset requests which (if more
then one are sent) can cause a throttling condition where ib's servers
will get slower and slower to conduct a reconnect. With this you can
have multiple ongoing contract requests without hitting that issue and
we can go back to having a nice 3s timeout on the history queries before
activating the hack.
2022-10-19 13:03:47 -04:00
Tyler Goodlet 55856f5e8b Add back adhoc symbol lookup support, some exchs info is off 2022-10-16 14:34:34 -04:00
Tyler Goodlet e8ab28e456 Order ledger entries by processed datetime
To make it easier to manually read/decipher long ledger files this adds
`dict` sorting based on record-type-specific (api vs. flex report)
datetime processing prior to ledger file write.

- break up parsers into separate routines for flex and api record
  processing.
- add `parse_flex_dt()` for special handling of the weird semicolon
  stamps in flex reports.
2022-10-10 09:27:30 -04:00
Tyler Goodlet d2b6216994 Comment format tweak 2022-10-10 09:27:16 -04:00
Tyler Goodlet eb743759a4 Subtract duration instead of passing to `.subtract()` (facepalm) 2022-10-10 09:27:16 -04:00
Tyler Goodlet 74910ba56c Fix `piker services`; `tractor.run()` is done.. 2022-10-10 09:27:16 -04:00
Tyler Goodlet 28535fa977 Re-request quote feed on data reset events
When a network outage or data feed connection is reset often the
`ib_insync` task will hang until some kind of (internal?) timeout takes
place or, in some (worst) cases it never re-establishes (the event
stream) and thus the backend needs to restart or the live feed will
never resume..

In order to avoid this issue once and for all this patch implements an
additional (extremely simple) task that is started with the  real-time
feed and simply waits for any market data reset events; when detected
restarts the `open_aio_quote_stream()` call in a loop using
a surrounding cancel scope.

Been meaning to implement this for ages and it's finally working!
2022-10-10 09:27:16 -04:00
Tyler Goodlet 1d7e642dbd Support no-disconnect on `open_aio_clients()` exit
Allows for easier restarts of certain `trio` side tasks without killing
the `asyncio`-side clients; support via flag.

Also fix a bug in `Client.bars()`: we need to return the duration on the
empty bars case..
2022-10-10 09:27:16 -04:00
Tyler Goodlet 69be65237f Drop duplicate frame request
Must have gotten left in during refactor from the `trimeter` version?
Drop down to 6 years for 1m sampling.
2022-10-10 09:27:16 -04:00
7 changed files with 376 additions and 201 deletions

View File

@ -195,9 +195,8 @@ async def open_piker_runtime(
) -> Optional[tractor._portal.Portal]:
'''
Start a piker actor who's runtime will automatically
sync with existing piker actors in local network
based on configuration.
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
'''
global _services

View File

@ -412,7 +412,7 @@ class Client:
# ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
return [], np.empty(0)
return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
@ -483,7 +483,7 @@ class Client:
self,
pattern: str,
# how many contracts to search "up to"
upto: int = 6,
upto: int = 16,
asdicts: bool = True,
) -> dict[str, ContractDetails]:
@ -518,6 +518,16 @@ class Client:
exch = tract.exchange
if exch not in _exch_skip_list:
# try to lookup any contracts from our adhoc set
# since often the exchange/venue is named slightly
# different (eg. BRR.CMECRYPTO` instead of just
# `.CME`).
info = _adhoc_symbol_map.get(sym)
if info:
con_kwargs, bars_kwargs = info
exch = con_kwargs['exchange']
# try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = ibis.Future(
@ -1086,6 +1096,7 @@ async def load_aio_clients(
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 0.5,
disconnect_on_exit: bool = True,
) -> dict[str, Client]:
'''
@ -1227,10 +1238,11 @@ async def load_aio_clients(
finally:
# TODO: for re-scans we'll want to not teardown clients which
# are up and stable right?
for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect()
_client_cache.pop((host, port), None)
if disconnect_on_exit:
for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect()
_client_cache.pop((host, port), None)
async def load_clients_for_trio(

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch
conf = get_config()
entries = trades_to_ledger_entries(
entries = api_trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
# raise ValueError(
log.error(
raise ValueError(
# log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
@ -1123,18 +1123,16 @@ def norm_trade_records(
continue
# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
if not date:
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
flex_dtstr = record.get('dateTime')
else:
# epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
if dtstr or date:
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
@ -1183,69 +1181,58 @@ def norm_trade_records(
return {r.tid: r for r in records}
def trades_to_ledger_entries(
def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',
) -> dict:
'''
Convert either of API execution objects or flex report
entry objects into ``dict`` form, pretty much straight up
without modification.
Convert API execution objects entry objects into ``dict`` form,
pretty much straight up without modification except add
a `pydatetime` field from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
case 'time':
# ib has wack ns timestamps, or is that us?
continue
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
case _:
entry[section] = val
elif source_type == 'api':
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }
# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, val in t.items():
match section:
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
case 'time':
# ib has wack ns timestamps, or is that us?
continue
case _:
entry[section] = val
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['pydatetime'] = dt
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']]
if not tid:
# this is likely some kind of internal adjustment
@ -1263,6 +1250,73 @@ def trades_to_ledger_entries(
acctid, {}
)[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account
@ -1309,15 +1363,16 @@ def load_flex_trades(
ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
conf['accounts'].inverse,
trades_by_account = flex_records_to_ledger_entries(
conf['accounts'].inverse, # reverse map to user account names
trade_entries,
source_type='flex',
)
ledger_dict: Optional[dict] = None
for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
@ -1325,9 +1380,11 @@ def load_flex_trades(
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)
return ledger_dict

View File

@ -254,6 +254,9 @@ async def wait_on_data_reset(
return False
_data_resetter_task: trio.Task | None = None
async def get_bars(
proxy: MethodProxy,
@ -264,7 +267,7 @@ async def get_bars(
end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency..
timeout: float = 1.5, # how long before we trigger a feed reset
timeout: float = 3, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -274,13 +277,15 @@ async def get_bars(
a ``MethoProxy``.
'''
data_cs: Optional[trio.CancelScope] = None
result: Optional[tuple[
global _data_resetter_task
data_cs: trio.CancelScope | None = None
result: tuple[
ibis.objects.BarDataList,
np.ndarray,
datetime,
datetime,
]] = None
] | None = None
result_ready = trio.Event()
async def query():
@ -307,7 +312,7 @@ async def get_bars(
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt = end_dt.subtract(dt_duration)
end_dt -= dt_duration
continue
if bars_array is None:
@ -400,6 +405,10 @@ async def get_bars(
else:
raise
# TODO: make this global across all history task/requests
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
# start history request that we allow
@ -414,6 +423,14 @@ async def get_bars(
await result_ready.wait()
break
if _data_resetter_task:
# don't double invoke the reset hack if another
# requester task already has it covered.
continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter = True
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
@ -425,6 +442,7 @@ async def get_bars(
# sync wait on reset to complete
await reset_done.wait()
_data_resetter_task = None if unset_resetter else _data_resetter_task
return result, data_cs is not None
@ -483,7 +501,9 @@ async def _setup_quote_stream(
to_trio.send_nowait(None)
async with load_aio_clients() as accts2clients:
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -563,7 +583,8 @@ async def open_aio_quote_stream(
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer
# if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver(
from_aio,
2**6,
@ -754,67 +775,97 @@ async def stream_quotes(
await trio.sleep_forever()
return # we never expect feed to come up?
async with open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if syminfo.get('no_vlm', False):
# generally speaking these feeds don't
# include vlm data.
atype = syminfo['asset_type']
log.info(
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
)
else:
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
feed_is_live.set()
# last = time.time()
async for ticker in stream:
quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
cs: Optional[trio.CancelScope] = None
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
):
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup = False
task_status.started((init_msgs, first_quote))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream):
if syminfo.get('no_vlm', False):
# generally speaking these feeds don't
# include vlm data.
atype = syminfo['asset_type']
log.info(
f'No-vlm {sym}@{atype}, skipping quote poll'
)
else:
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
feed_is_live.set()
# last = time.time()
async for ticker in stream:
quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()
async def data_reset_hack(
@ -922,7 +973,14 @@ async def open_symbol_search(
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
if (
not pattern
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
or len(pattern) < 1
):
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client

View File

@ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir):
@click.pass_obj
def services(config, tl, names):
async def list_services():
from .._daemon import open_piker_runtime
async with tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr']
) as portal:
async def list_services():
async with (
open_piker_runtime(
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr']
) as portal
):
registry = await portal.run_from_ns('self', 'get_registry')
json_d = {}
for key, socket in registry.items():
# name, uuid = uid
host, port = socket
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}")
tractor.run(
list_services,
name='service_query',
loglevel=config['loglevel'] if tl else None,
arbiter_addr=_tractor_kwargs['arbiter_addr'],
)
trio.run(list_services)
def _load_clis() -> None:

View File

@ -333,7 +333,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
60: {'years': 10},
60: {'years': 6},
}
kwargs = periods[step_size_s]
@ -348,36 +348,45 @@ async def start_backfill(
# last retrieved start dt to the next request as
# it's end dt.
starts: set[datetime] = set()
while start_dt > last_tsdb_dt:
print(f"QUERY end_dt={start_dt}")
try:
log.info(
f'Requesting {step_size_s}s frame ending in {start_dt}'
)
array, start_dt, end_dt = await hist(
array, next_start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if new
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp()
except NoData:
# XXX: unhandled history gap (shouldn't happen?)
log.warning(
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
)
return None # discard signal
await tractor.breakpoint()
except DataUnavailable as duerr:
# broker is being a bish and we can't pull
# any more..
log.warning('backend halted on data deliver !?!?')
except DataUnavailable: # as duerr:
# broker is being a bish and we can't pull any more..
log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history!?'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
return duerr
return
diff = end_dt - start_dt
frame_time_diff_s = diff.seconds
@ -394,22 +403,6 @@ async def start_backfill(
f'{diff} ~= {frame_time_diff_s} seconds'
)
array, _start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
if (
_start_dt in starts
):
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
start_dt = min(starts)
continue
# only update new start point if new
start_dt = _start_dt
starts.add(start_dt)
to_push = diff_history(
array,
start_dt,
@ -498,10 +491,10 @@ async def manage_history(
readonly=False,
)
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt',
@ -513,10 +506,10 @@ async def manage_history(
readonly=False,
size=3*_secs_in_day,
)
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
log.info('Scanning for existing `marketstored`')

View File

@ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends)
'''
from __future__ import annotations
from contextlib import contextmanager as cm
from pprint import pformat
import os
@ -138,13 +139,31 @@ class Position(Struct):
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
str | int, # trade id
dict[str, Any], # transaction history summaries
] = {}
first_clear_dt: Optional[datetime] = None
expiry: Optional[datetime] = None
# @property
# def clears(self) -> dict[
# Union[str, int, Status], # trade id
# dict[str, Any], # transaction history summaries
# ]:
# '''
# Datetime sorted reference to internal clears table.
# '''
# # self._clears = {}
# self._clears = dict(sorted(
# self._clears.items(),
# key=lambda entry: entry[1]['dt'],
# ))
# # self._clears[k] = v
# return self._clears
def to_dict(self) -> dict:
return {
f: getattr(self, f)
@ -219,6 +238,10 @@ class Position(Struct):
'''
clears = list(self.clears.values())
if not clears:
log.warning(f'No clears table for {self.symbol}!?')
return
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
last_clear = clears[-1]
@ -623,6 +646,7 @@ class PpTable(Struct):
def to_toml(
self,
min_clears: bool = True,
) -> dict[str, Any]:
active, closed = self.dump_active()
@ -635,7 +659,9 @@ class PpTable(Struct):
# keep the minimal amount of clears that make up this
# position since the last net-zero state.
pos.minimize_clears()
if min_clears:
pos.minimize_clears()
pos.ensure_state()
# serialize to pre-toml form
@ -682,6 +708,8 @@ def load_pps_from_ledger(
brokername: str,
acctname: str,
table: Optional[PpTable] = None,
# post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None,
@ -698,7 +726,6 @@ def load_pps_from_ledger(
'''
with (
open_trade_ledger(brokername, acctname) as ledger,
open_pps(brokername, acctname) as table,
):
if not ledger:
# null case, no ledger file with content
@ -716,7 +743,11 @@ def load_pps_from_ledger(
else:
records = src_records
updated = table.update_from_trans(records)
if table is None:
with open_pps(brokername, acctname) as table:
updated = table.update_from_trans(records)
else:
updated = table.update_from_trans(records)
return records, updated
@ -886,15 +917,27 @@ def open_pps(
conf=conf,
)
# first pass populate all missing clears record tables
# for fqsn, entry in pps.items():
# # convert clears sub-tables (only in this form
# # for toml re-presentation) back into a master table.
# clears_list = entry.get('clears', [])
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=[entry['bsuid']],
# table=table,
# )
# # breakpoint()
# unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries.
for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our
# ``pps.toml``).
@ -906,6 +949,18 @@ def open_pps(
# processing of new clear events.
trans: list[Transaction] = []
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# table=table,
# )
for clears_table in clears_list:
tid = clears_table.pop('tid')
dtstr = clears_table['dt']