Compare commits

..

No commits in common. "c53071e43a291251924f42cba84d8e01672b729f" and "96f5a8abb878c391c875e6c0b3853e397c8d50d2" have entirely different histories.

7 changed files with 193 additions and 368 deletions

View File

@ -195,8 +195,9 @@ async def open_piker_runtime(
) -> Optional[tractor._portal.Portal]:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
Start a piker actor who's runtime will automatically
sync with existing piker actors in local network
based on configuration.
'''
global _services

View File

@ -412,7 +412,7 @@ class Client:
# ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
return [], np.empty(0), dt_duration
return [], np.empty(0)
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
@ -483,7 +483,7 @@ class Client:
self,
pattern: str,
# how many contracts to search "up to"
upto: int = 16,
upto: int = 6,
asdicts: bool = True,
) -> dict[str, ContractDetails]:
@ -518,16 +518,6 @@ class Client:
exch = tract.exchange
if exch not in _exch_skip_list:
# try to lookup any contracts from our adhoc set
# since often the exchange/venue is named slightly
# different (eg. BRR.CMECRYPTO` instead of just
# `.CME`).
info = _adhoc_symbol_map.get(sym)
if info:
con_kwargs, bars_kwargs = info
exch = con_kwargs['exchange']
# try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = ibis.Future(
@ -1096,7 +1086,6 @@ async def load_aio_clients(
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 0.5,
disconnect_on_exit: bool = True,
) -> dict[str, Client]:
'''
@ -1238,7 +1227,6 @@ async def load_aio_clients(
finally:
# TODO: for re-scans we'll want to not teardown clients which
# are up and stable right?
if disconnect_on_exit:
for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect()

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch
conf = get_config()
entries = api_trades_to_ledger_entries(
entries = trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError(
# log.error(
# raise ValueError(
log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
@ -1123,16 +1123,18 @@ def norm_trade_records(
continue
# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
flex_dtstr = record.get('dateTime')
if dtstr or date:
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
if not date:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
else:
# epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
@ -1181,29 +1183,41 @@ def norm_trade_records(
return {r.tid: r for r in records}
def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
def trades_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',
) -> dict:
'''
Convert API execution objects entry objects into ``dict`` form,
pretty much straight up without modification except add
a `pydatetime` field from the parsed timestamp.
Convert either of API execution objects or flex report
entry objects into ``dict`` form, pretty much straight up
without modification.
'''
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
@ -1230,8 +1244,7 @@ def api_trades_to_ledger_entries(
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['pydatetime'] = dt
entry['datetime'] = str(dt)
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
if not tid:
@ -1250,73 +1263,6 @@ def api_trades_to_ledger_entries(
acctid, {}
)[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account
@ -1363,16 +1309,15 @@ def load_flex_trades(
ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = flex_records_to_ledger_entries(
conf['accounts'].inverse, # reverse map to user account names
trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
conf['accounts'].inverse,
trade_entries,
source_type='flex',
)
ledger_dict: Optional[dict] = None
for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
@ -1380,11 +1325,9 @@ def load_flex_trades(
f'{pformat(tid_delta)}'
)
if tid_delta:
sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
return ledger_dict

View File

@ -254,9 +254,6 @@ async def wait_on_data_reset(
return False
_data_resetter_task: trio.Task | None = None
async def get_bars(
proxy: MethodProxy,
@ -267,7 +264,7 @@ async def get_bars(
end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency..
timeout: float = 3, # how long before we trigger a feed reset
timeout: float = 1.5, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -277,15 +274,13 @@ async def get_bars(
a ``MethoProxy``.
'''
global _data_resetter_task
data_cs: trio.CancelScope | None = None
result: tuple[
data_cs: Optional[trio.CancelScope] = None
result: Optional[tuple[
ibis.objects.BarDataList,
np.ndarray,
datetime,
datetime,
] | None = None
]] = None
result_ready = trio.Event()
async def query():
@ -312,7 +307,7 @@ async def get_bars(
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
end_dt = end_dt.subtract(dt_duration)
continue
if bars_array is None:
@ -405,10 +400,6 @@ async def get_bars(
else:
raise
# TODO: make this global across all history task/requests
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
# start history request that we allow
@ -423,14 +414,6 @@ async def get_bars(
await result_ready.wait()
break
if _data_resetter_task:
# don't double invoke the reset hack if another
# requester task already has it covered.
continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter = True
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
@ -442,7 +425,6 @@ async def get_bars(
# sync wait on reset to complete
await reset_done.wait()
_data_resetter_task = None if unset_resetter else _data_resetter_task
return result, data_cs is not None
@ -501,9 +483,7 @@ async def _setup_quote_stream(
to_trio.send_nowait(None)
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
async with load_aio_clients() as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -583,8 +563,7 @@ async def open_aio_quote_stream(
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone
# to consumer
# if we already have a cached feed deliver a rx side clone to consumer
async with broadcast_receiver(
from_aio,
2**6,
@ -775,45 +754,17 @@ async def stream_quotes(
await trio.sleep_forever()
return # we never expect feed to come up?
cs: Optional[trio.CancelScope] = None
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
async with open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
):
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup = False
task_status.started((init_msgs, first_quote))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream):
if syminfo.get('no_vlm', False):
@ -821,31 +772,29 @@ async def stream_quotes(
# include vlm data.
atype = syminfo['asset_type']
log.info(
f'No-vlm {sym}@{atype}, skipping quote poll'
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
)
else:
# wait for real volume on feed (trading might be
# closed)
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
# for a real volume contract we rait for the first
# "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
@ -973,14 +922,7 @@ async def open_symbol_search(
except trio.WouldBlock:
pass
if (
not pattern
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
or len(pattern) < 1
):
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client

View File

@ -138,26 +138,25 @@ def cli(ctx, brokers, loglevel, tl, configdir):
@click.pass_obj
def services(config, tl, names):
from .._daemon import open_piker_runtime
async def list_services():
async with (
open_piker_runtime(
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
async with tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr']
) as portal
):
) as portal:
registry = await portal.run_from_ns('self', 'get_registry')
json_d = {}
for key, socket in registry.items():
# name, uuid = uid
host, port = socket
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}")
trio.run(list_services)
tractor.run(
list_services,
name='service_query',
loglevel=config['loglevel'] if tl else None,
arbiter_addr=_tractor_kwargs['arbiter_addr'],
)
def _load_clis() -> None:

View File

@ -333,7 +333,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
60: {'years': 6},
60: {'years': 10},
}
kwargs = periods[step_size_s]
@ -348,45 +348,36 @@ async def start_backfill(
# last retrieved start dt to the next request as
# it's end dt.
starts: set[datetime] = set()
while start_dt > last_tsdb_dt:
print(f"QUERY end_dt={start_dt}")
try:
log.info(
f'Requesting {step_size_s}s frame ending in {start_dt}'
)
array, next_start_dt, end_dt = await hist(
array, start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if new
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp()
except NoData:
# XXX: unhandled history gap (shouldn't happen?)
log.warning(
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
)
await tractor.breakpoint()
return None # discard signal
except DataUnavailable: # as duerr:
# broker is being a bish and we can't pull any more..
log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history!?'
)
except DataUnavailable as duerr:
# broker is being a bish and we can't pull
# any more..
log.warning('backend halted on data deliver !?!?')
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
return
return duerr
diff = end_dt - start_dt
frame_time_diff_s = diff.seconds
@ -403,6 +394,22 @@ async def start_backfill(
f'{diff} ~= {frame_time_diff_s} seconds'
)
array, _start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
if (
_start_dt in starts
):
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
start_dt = min(starts)
continue
# only update new start point if new
start_dt = _start_dt
starts.add(start_dt)
to_push = diff_history(
array,
start_dt,
@ -491,10 +498,10 @@ async def manage_history(
readonly=False,
)
# TODO: history validation
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt',
@ -506,10 +513,10 @@ async def manage_history(
readonly=False,
size=3*_secs_in_day,
)
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`')

View File

@ -20,7 +20,6 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends)
'''
from __future__ import annotations
from contextlib import contextmanager as cm
from pprint import pformat
import os
@ -139,31 +138,13 @@ class Position(Struct):
# ordered record of known constituent trade messages
clears: dict[
str | int, # trade id
Union[str, int, Status], # trade id
dict[str, Any], # transaction history summaries
] = {}
first_clear_dt: Optional[datetime] = None
expiry: Optional[datetime] = None
# @property
# def clears(self) -> dict[
# Union[str, int, Status], # trade id
# dict[str, Any], # transaction history summaries
# ]:
# '''
# Datetime sorted reference to internal clears table.
# '''
# # self._clears = {}
# self._clears = dict(sorted(
# self._clears.items(),
# key=lambda entry: entry[1]['dt'],
# ))
# # self._clears[k] = v
# return self._clears
def to_dict(self) -> dict:
return {
f: getattr(self, f)
@ -238,10 +219,6 @@ class Position(Struct):
'''
clears = list(self.clears.values())
if not clears:
log.warning(f'No clears table for {self.symbol}!?')
return
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
last_clear = clears[-1]
@ -646,7 +623,6 @@ class PpTable(Struct):
def to_toml(
self,
min_clears: bool = True,
) -> dict[str, Any]:
active, closed = self.dump_active()
@ -659,9 +635,7 @@ class PpTable(Struct):
# keep the minimal amount of clears that make up this
# position since the last net-zero state.
if min_clears:
pos.minimize_clears()
pos.ensure_state()
# serialize to pre-toml form
@ -708,8 +682,6 @@ def load_pps_from_ledger(
brokername: str,
acctname: str,
table: Optional[PpTable] = None,
# post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None,
@ -726,6 +698,7 @@ def load_pps_from_ledger(
'''
with (
open_trade_ledger(brokername, acctname) as ledger,
open_pps(brokername, acctname) as table,
):
if not ledger:
# null case, no ledger file with content
@ -743,10 +716,6 @@ def load_pps_from_ledger(
else:
records = src_records
if table is None:
with open_pps(brokername, acctname) as table:
updated = table.update_from_trans(records)
else:
updated = table.update_from_trans(records)
return records, updated
@ -917,27 +886,15 @@ def open_pps(
conf=conf,
)
# first pass populate all missing clears record tables
# for fqsn, entry in pps.items():
# # convert clears sub-tables (only in this form
# # for toml re-presentation) back into a master table.
# clears_list = entry.get('clears', [])
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=[entry['bsuid']],
# table=table,
# )
# # breakpoint()
# unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries.
for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our
# ``pps.toml``).
@ -949,18 +906,6 @@ def open_pps(
# processing of new clear events.
trans: list[Transaction] = []
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# table=table,
# )
for clears_table in clears_list:
tid = clears_table.pop('tid')
dtstr = clears_table['dt']