Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet c53071e43a WIP adding draft-commented code to try and get splits workin.. 2022-10-19 13:18:19 -04:00
Tyler Goodlet 41ffccc59e Add data-reset-task global state var
Allows keeping mutex state around data reset requests which (if more
then one are sent) can cause a throttling condition where ib's servers
will get slower and slower to conduct a reconnect. With this you can
have multiple ongoing contract requests without hitting that issue and
we can go back to having a nice 3s timeout on the history queries before
activating the hack.
2022-10-19 13:03:47 -04:00
Tyler Goodlet 55856f5e8b Add back adhoc symbol lookup support, some exchs info is off 2022-10-16 14:34:34 -04:00
Tyler Goodlet e8ab28e456 Order ledger entries by processed datetime
To make it easier to manually read/decipher long ledger files this adds
`dict` sorting based on record-type-specific (api vs. flex report)
datetime processing prior to ledger file write.

- break up parsers into separate routines for flex and api record
  processing.
- add `parse_flex_dt()` for special handling of the weird semicolon
  stamps in flex reports.
2022-10-10 09:27:30 -04:00
Tyler Goodlet d2b6216994 Comment format tweak 2022-10-10 09:27:16 -04:00
Tyler Goodlet eb743759a4 Subtract duration instead of passing to `.subtract()` (facepalm) 2022-10-10 09:27:16 -04:00
Tyler Goodlet 74910ba56c Fix `piker services`; `tractor.run()` is done.. 2022-10-10 09:27:16 -04:00
Tyler Goodlet 28535fa977 Re-request quote feed on data reset events
When a network outage or data feed connection is reset often the
`ib_insync` task will hang until some kind of (internal?) timeout takes
place or, in some (worst) cases it never re-establishes (the event
stream) and thus the backend needs to restart or the live feed will
never resume..

In order to avoid this issue once and for all this patch implements an
additional (extremely simple) task that is started with the  real-time
feed and simply waits for any market data reset events; when detected
restarts the `open_aio_quote_stream()` call in a loop using
a surrounding cancel scope.

Been meaning to implement this for ages and it's finally working!
2022-10-10 09:27:16 -04:00
Tyler Goodlet 1d7e642dbd Support no-disconnect on `open_aio_clients()` exit
Allows for easier restarts of certain `trio` side tasks without killing
the `asyncio`-side clients; support via flag.

Also fix a bug in `Client.bars()`: we need to return the duration on the
empty bars case..
2022-10-10 09:27:16 -04:00
Tyler Goodlet 69be65237f Drop duplicate frame request
Must have gotten left in during refactor from the `trimeter` version?
Drop down to 6 years for 1m sampling.
2022-10-10 09:27:16 -04:00
7 changed files with 376 additions and 201 deletions

View File

@ -195,9 +195,8 @@ async def open_piker_runtime(
) -> Optional[tractor._portal.Portal]: ) -> Optional[tractor._portal.Portal]:
''' '''
Start a piker actor who's runtime will automatically Start a piker actor who's runtime will automatically sync with
sync with existing piker actors in local network existing piker actors on the local link based on configuration.
based on configuration.
''' '''
global _services global _services

View File

@ -412,7 +412,7 @@ class Client:
# ``end_dt`` which exceeds the ``duration``, # ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return # - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()... # an empty list thing with bars.clear()...
return [], np.empty(0) return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we # TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no # rewrite the method in the first case? right now there's no
# way to detect a timeout. # way to detect a timeout.
@ -483,7 +483,7 @@ class Client:
self, self,
pattern: str, pattern: str,
# how many contracts to search "up to" # how many contracts to search "up to"
upto: int = 6, upto: int = 16,
asdicts: bool = True, asdicts: bool = True,
) -> dict[str, ContractDetails]: ) -> dict[str, ContractDetails]:
@ -518,6 +518,16 @@ class Client:
exch = tract.exchange exch = tract.exchange
if exch not in _exch_skip_list: if exch not in _exch_skip_list:
# try to lookup any contracts from our adhoc set
# since often the exchange/venue is named slightly
# different (eg. BRR.CMECRYPTO` instead of just
# `.CME`).
info = _adhoc_symbol_map.get(sym)
if info:
con_kwargs, bars_kwargs = info
exch = con_kwargs['exchange']
# try get all possible contracts for symbol as per, # try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = ibis.Future( con = ibis.Future(
@ -1086,6 +1096,7 @@ async def load_aio_clients(
# retry a few times to get the client going.. # retry a few times to get the client going..
connect_retries: int = 3, connect_retries: int = 3,
connect_timeout: float = 0.5, connect_timeout: float = 0.5,
disconnect_on_exit: bool = True,
) -> dict[str, Client]: ) -> dict[str, Client]:
''' '''
@ -1227,6 +1238,7 @@ async def load_aio_clients(
finally: finally:
# TODO: for re-scans we'll want to not teardown clients which # TODO: for re-scans we'll want to not teardown clients which
# are up and stable right? # are up and stable right?
if disconnect_on_exit:
for acct, client in _accounts2clients.items(): for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}') log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect() client.ib.disconnect()

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch entry['listingExchange'] = pexch
conf = get_config() conf = get_config()
entries = trades_to_ledger_entries( entries = api_trades_to_ledger_entries(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else: else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}' entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
# raise ValueError( raise ValueError(
log.error( # log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n' f'ib: {ibppmsg}\n'
f'piker: {msg}\n' f'piker: {msg}\n'
@ -1123,18 +1123,16 @@ def norm_trade_records(
continue continue
# timestamping is way different in API records # timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date') date = record.get('date')
if not date: flex_dtstr = record.get('dateTime')
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
else: if dtstr or date:
# epoch_dt = pendulum.from_timestamp(record.get('time')) dt = pendulum.parse(dtstr or date)
dt = pendulum.parse(date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from # special handling of symbol extraction from
# flex records using some ad-hoc schema parsing. # flex records using some ad-hoc schema parsing.
@ -1183,41 +1181,29 @@ def norm_trade_records(
return {r.tid: r for r in records} return {r.tid: r for r in records}
def trades_to_ledger_entries( def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
accounts: bidict, accounts: bidict,
trade_entries: list[object], trade_entries: list[object],
source_type: str = 'api',
) -> dict: ) -> dict:
''' '''
Convert either of API execution objects or flex report Convert API execution objects entry objects into ``dict`` form,
entry objects into ``dict`` form, pretty much straight up pretty much straight up without modification except add
without modification. a `pydatetime` field from the parsed timestamp.
''' '''
trades_by_account = {} trades_by_account = {}
for t in trade_entries: for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
# NOTE: example of schema we pull from the API client. # NOTE: example of schema we pull from the API client.
# { # {
# 'commissionReport': CommissionReport(... # 'commissionReport': CommissionReport(...
@ -1244,7 +1230,8 @@ def trades_to_ledger_entries(
tid = str(entry['execId']) tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time']) dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str? # TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt) entry['pydatetime'] = dt
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']] acctid = accounts[entry['acctNumber']]
if not tid: if not tid:
@ -1263,6 +1250,73 @@ def trades_to_ledger_entries(
acctid, {} acctid, {}
)[tid] = entry )[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account return trades_by_account
@ -1309,15 +1363,16 @@ def load_flex_trades(
ln = len(trade_entries) ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query') log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries( trades_by_account = flex_records_to_ledger_entries(
# get reverse map to user account names conf['accounts'].inverse, # reverse map to user account names
conf['accounts'].inverse,
trade_entries, trade_entries,
source_type='flex',
) )
ledger_dict: Optional[dict] = None
for acctid in trades_by_account: for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid] trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict: with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict) tid_delta = set(trades_by_id) - set(ledger_dict)
log.info( log.info(
@ -1325,9 +1380,11 @@ def load_flex_trades(
f'{pformat(tid_delta)}' f'{pformat(tid_delta)}'
) )
if tid_delta: if tid_delta:
ledger_dict.update( sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta} {tid: trades_by_id[tid] for tid in tid_delta}.items(),
) key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)
return ledger_dict return ledger_dict

View File

@ -254,6 +254,9 @@ async def wait_on_data_reset(
return False return False
_data_resetter_task: trio.Task | None = None
async def get_bars( async def get_bars(
proxy: MethodProxy, proxy: MethodProxy,
@ -264,7 +267,7 @@ async def get_bars(
end_dt: str = '', end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency.. # TODO: make this more dynamic based on measured frame rx latency..
timeout: float = 1.5, # how long before we trigger a feed reset timeout: float = 3, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -274,13 +277,15 @@ async def get_bars(
a ``MethoProxy``. a ``MethoProxy``.
''' '''
data_cs: Optional[trio.CancelScope] = None global _data_resetter_task
result: Optional[tuple[
data_cs: trio.CancelScope | None = None
result: tuple[
ibis.objects.BarDataList, ibis.objects.BarDataList,
np.ndarray, np.ndarray,
datetime, datetime,
datetime, datetime,
]] = None ] | None = None
result_ready = trio.Event() result_ready = trio.Event()
async def query(): async def query():
@ -307,7 +312,7 @@ async def get_bars(
log.warning( log.warning(
f'History is blank for {dt_duration} from {end_dt}' f'History is blank for {dt_duration} from {end_dt}'
) )
end_dt = end_dt.subtract(dt_duration) end_dt -= dt_duration
continue continue
if bars_array is None: if bars_array is None:
@ -400,6 +405,10 @@ async def get_bars(
else: else:
raise raise
# TODO: make this global across all history task/requests
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
# start history request that we allow # start history request that we allow
@ -414,6 +423,14 @@ async def get_bars(
await result_ready.wait() await result_ready.wait()
break break
if _data_resetter_task:
# don't double invoke the reset hack if another
# requester task already has it covered.
continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter = True
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await nurse.start(
partial( partial(
@ -425,6 +442,7 @@ async def get_bars(
# sync wait on reset to complete # sync wait on reset to complete
await reset_done.wait() await reset_done.wait()
_data_resetter_task = None if unset_resetter else _data_resetter_task
return result, data_cs is not None return result, data_cs is not None
@ -483,7 +501,9 @@ async def _setup_quote_stream(
to_trio.send_nowait(None) to_trio.send_nowait(None)
async with load_aio_clients() as accts2clients: async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients) caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -563,7 +583,8 @@ async def open_aio_quote_stream(
from_aio = _quote_streams.get(symbol) from_aio = _quote_streams.get(symbol)
if from_aio: if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer # if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver( async with broadcast_receiver(
from_aio, from_aio,
2**6, 2**6,
@ -754,17 +775,45 @@ async def stream_quotes(
await trio.sleep_forever() await trio.sleep_forever()
return # we never expect feed to come up? return # we never expect feed to come up?
async with open_aio_quote_stream( cs: Optional[trio.CancelScope] = None
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as stream: ) as stream,
):
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup = False
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream): async with aclosing(stream):
if syminfo.get('no_vlm', False): if syminfo.get('no_vlm', False):
@ -772,29 +821,31 @@ async def stream_quotes(
# include vlm data. # include vlm data.
atype = syminfo['asset_type'] atype = syminfo['asset_type']
log.info( log.info(
f'Non-vlm asset {sym}@{atype}, skipping quote poll...' f'No-vlm {sym}@{atype}, skipping quote poll'
) )
else: else:
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be
# closed)
while True: while True:
ticker = await stream.receive() ticker = await stream.receive()
# for a real volume contract we rait for the first # for a real volume contract we rait for
# "real" trade to take place # the first "real" trade to take place
if ( if (
# not calc_price # not calc_price
# and not ticker.rtTime # and not ticker.rtTime
not ticker.rtTime not ticker.rtTime
): ):
# spin consuming tickers until we get a real # spin consuming tickers until we
# market datum # get a real market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
log.debug("Received first real volume tick") log.debug("Received first volume tick")
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've
# (ahem, ib_insync is truly stateful trash) # consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# XXX: this works because we don't use # XXX: this works because we don't use
@ -922,7 +973,14 @@ async def open_symbol_search(
except trio.WouldBlock: except trio.WouldBlock:
pass pass
if not pattern or pattern.isspace(): if (
not pattern
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
or len(pattern) < 1
):
log.warning('empty pattern received, skipping..') log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client # TODO: *BUG* if nothing is returned here the client

View File

@ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir):
@click.pass_obj @click.pass_obj
def services(config, tl, names): def services(config, tl, names):
async def list_services(): from .._daemon import open_piker_runtime
async with tractor.get_arbiter( async def list_services():
async with (
open_piker_runtime(
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr'] *_tractor_kwargs['arbiter_addr']
) as portal: ) as portal
):
registry = await portal.run_from_ns('self', 'get_registry') registry = await portal.run_from_ns('self', 'get_registry')
json_d = {} json_d = {}
for key, socket in registry.items(): for key, socket in registry.items():
# name, uuid = uid
host, port = socket host, port = socket
json_d[key] = f'{host}:{port}' json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}") click.echo(f"{colorize_json(json_d)}")
tractor.run( trio.run(list_services)
list_services,
name='service_query',
loglevel=config['loglevel'] if tl else None,
arbiter_addr=_tractor_kwargs['arbiter_addr'],
)
def _load_clis() -> None: def _load_clis() -> None:

View File

@ -333,7 +333,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage. # do a decently sized backfill and load it into storage.
periods = { periods = {
1: {'days': 6}, 1: {'days': 6},
60: {'years': 10}, 60: {'years': 6},
} }
kwargs = periods[step_size_s] kwargs = periods[step_size_s]
@ -348,36 +348,45 @@ async def start_backfill(
# last retrieved start dt to the next request as # last retrieved start dt to the next request as
# it's end dt. # it's end dt.
starts: set[datetime] = set() starts: set[datetime] = set()
while start_dt > last_tsdb_dt: while start_dt > last_tsdb_dt:
print(f"QUERY end_dt={start_dt}")
try: try:
log.info( log.info(
f'Requesting {step_size_s}s frame ending in {start_dt}' f'Requesting {step_size_s}s frame ending in {start_dt}'
) )
array, start_dt, end_dt = await hist( array, next_start_dt, end_dt = await hist(
timeframe, timeframe,
end_dt=start_dt, end_dt=start_dt,
) )
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if new
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp() assert array['time'][0] == start_dt.timestamp()
except NoData: except NoData:
# XXX: unhandled history gap (shouldn't happen?)
log.warning( log.warning(
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
) )
return None # discard signal await tractor.breakpoint()
except DataUnavailable as duerr: except DataUnavailable: # as duerr:
# broker is being a bish and we can't pull # broker is being a bish and we can't pull any more..
# any more.. log.warning(
log.warning('backend halted on data deliver !?!?') f'NO-MORE-DATA: backend {mod.name} halted history!?'
)
# ugh, what's a better way? # ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle # TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the # condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved? # request loop until the condition is resolved?
return duerr return
diff = end_dt - start_dt diff = end_dt - start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
@ -394,22 +403,6 @@ async def start_backfill(
f'{diff} ~= {frame_time_diff_s} seconds' f'{diff} ~= {frame_time_diff_s} seconds'
) )
array, _start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
if (
_start_dt in starts
):
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
start_dt = min(starts)
continue
# only update new start point if new
start_dt = _start_dt
starts.add(start_dt)
to_push = diff_history( to_push = diff_history(
array, array,
start_dt, start_dt,
@ -498,10 +491,10 @@ async def manage_history(
readonly=False, readonly=False,
) )
# TODO: history validation # TODO: history validation
if not opened: # if not opened:
raise RuntimeError( # raise RuntimeError(
"Persistent shm for sym was already open?!" # "Persistent shm for sym was already open?!"
) # )
rt_shm, opened = maybe_open_shm_array( rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt', key=f'{fqsn}_rt',
@ -513,10 +506,10 @@ async def manage_history(
readonly=False, readonly=False,
size=3*_secs_in_day, size=3*_secs_in_day,
) )
if not opened: # if not opened:
raise RuntimeError( # raise RuntimeError(
"Persistent shm for sym was already open?!" # "Persistent shm for sym was already open?!"
) # )
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')

View File

@ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends) (looking at you `ib` and dirt-bird friends)
''' '''
from __future__ import annotations
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
from pprint import pformat from pprint import pformat
import os import os
@ -138,13 +139,31 @@ class Position(Struct):
# ordered record of known constituent trade messages # ordered record of known constituent trade messages
clears: dict[ clears: dict[
Union[str, int, Status], # trade id str | int, # trade id
dict[str, Any], # transaction history summaries dict[str, Any], # transaction history summaries
] = {} ] = {}
first_clear_dt: Optional[datetime] = None first_clear_dt: Optional[datetime] = None
expiry: Optional[datetime] = None expiry: Optional[datetime] = None
# @property
# def clears(self) -> dict[
# Union[str, int, Status], # trade id
# dict[str, Any], # transaction history summaries
# ]:
# '''
# Datetime sorted reference to internal clears table.
# '''
# # self._clears = {}
# self._clears = dict(sorted(
# self._clears.items(),
# key=lambda entry: entry[1]['dt'],
# ))
# # self._clears[k] = v
# return self._clears
def to_dict(self) -> dict: def to_dict(self) -> dict:
return { return {
f: getattr(self, f) f: getattr(self, f)
@ -219,6 +238,10 @@ class Position(Struct):
''' '''
clears = list(self.clears.values()) clears = list(self.clears.values())
if not clears:
log.warning(f'No clears table for {self.symbol}!?')
return
self.first_clear_dt = min(list(entry['dt'] for entry in clears)) self.first_clear_dt = min(list(entry['dt'] for entry in clears))
last_clear = clears[-1] last_clear = clears[-1]
@ -623,6 +646,7 @@ class PpTable(Struct):
def to_toml( def to_toml(
self, self,
min_clears: bool = True,
) -> dict[str, Any]: ) -> dict[str, Any]:
active, closed = self.dump_active() active, closed = self.dump_active()
@ -635,7 +659,9 @@ class PpTable(Struct):
# keep the minimal amount of clears that make up this # keep the minimal amount of clears that make up this
# position since the last net-zero state. # position since the last net-zero state.
if min_clears:
pos.minimize_clears() pos.minimize_clears()
pos.ensure_state() pos.ensure_state()
# serialize to pre-toml form # serialize to pre-toml form
@ -682,6 +708,8 @@ def load_pps_from_ledger(
brokername: str, brokername: str,
acctname: str, acctname: str,
table: Optional[PpTable] = None,
# post normalization filter on ledger entries to be processed # post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None, filter_by: Optional[list[dict]] = None,
@ -698,7 +726,6 @@ def load_pps_from_ledger(
''' '''
with ( with (
open_trade_ledger(brokername, acctname) as ledger, open_trade_ledger(brokername, acctname) as ledger,
open_pps(brokername, acctname) as table,
): ):
if not ledger: if not ledger:
# null case, no ledger file with content # null case, no ledger file with content
@ -716,6 +743,10 @@ def load_pps_from_ledger(
else: else:
records = src_records records = src_records
if table is None:
with open_pps(brokername, acctname) as table:
updated = table.update_from_trans(records)
else:
updated = table.update_from_trans(records) updated = table.update_from_trans(records)
return records, updated return records, updated
@ -886,15 +917,27 @@ def open_pps(
conf=conf, conf=conf,
) )
# first pass populate all missing clears record tables
# for fqsn, entry in pps.items():
# # convert clears sub-tables (only in this form
# # for toml re-presentation) back into a master table.
# clears_list = entry.get('clears', [])
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=[entry['bsuid']],
# table=table,
# )
# # breakpoint()
# unmarshal/load ``pps.toml`` config entries into object form # unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries. # and update `PpTable` obj entries.
for fqsn, entry in pps.items(): for fqsn, entry in pps.items():
bsuid = entry['bsuid'] bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top # index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our # level dict instead of a list (as is presented in our
# ``pps.toml``). # ``pps.toml``).
@ -906,6 +949,18 @@ def open_pps(
# processing of new clear events. # processing of new clear events.
trans: list[Transaction] = [] trans: list[Transaction] = []
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# table=table,
# )
for clears_table in clears_list: for clears_table in clears_list:
tid = clears_table.pop('tid') tid = clears_table.pop('tid')
dtstr = clears_table['dt'] dtstr = clears_table['dt']