Compare commits
No commits in common. "c53071e43a291251924f42cba84d8e01672b729f" and "96f5a8abb878c391c875e6c0b3853e397c8d50d2" have entirely different histories.
c53071e43a
...
96f5a8abb8
|
@ -195,8 +195,9 @@ async def open_piker_runtime(
|
|||
|
||||
) -> Optional[tractor._portal.Portal]:
|
||||
'''
|
||||
Start a piker actor who's runtime will automatically sync with
|
||||
existing piker actors on the local link based on configuration.
|
||||
Start a piker actor who's runtime will automatically
|
||||
sync with existing piker actors in local network
|
||||
based on configuration.
|
||||
|
||||
'''
|
||||
global _services
|
||||
|
|
|
@ -412,7 +412,7 @@ class Client:
|
|||
# ``end_dt`` which exceeds the ``duration``,
|
||||
# - a timeout occurred in which case insync internals return
|
||||
# an empty list thing with bars.clear()...
|
||||
return [], np.empty(0), dt_duration
|
||||
return [], np.empty(0)
|
||||
# TODO: we could maybe raise ``NoData`` instead if we
|
||||
# rewrite the method in the first case? right now there's no
|
||||
# way to detect a timeout.
|
||||
|
@ -483,7 +483,7 @@ class Client:
|
|||
self,
|
||||
pattern: str,
|
||||
# how many contracts to search "up to"
|
||||
upto: int = 16,
|
||||
upto: int = 6,
|
||||
asdicts: bool = True,
|
||||
|
||||
) -> dict[str, ContractDetails]:
|
||||
|
@ -518,16 +518,6 @@ class Client:
|
|||
|
||||
exch = tract.exchange
|
||||
if exch not in _exch_skip_list:
|
||||
|
||||
# try to lookup any contracts from our adhoc set
|
||||
# since often the exchange/venue is named slightly
|
||||
# different (eg. BRR.CMECRYPTO` instead of just
|
||||
# `.CME`).
|
||||
info = _adhoc_symbol_map.get(sym)
|
||||
if info:
|
||||
con_kwargs, bars_kwargs = info
|
||||
exch = con_kwargs['exchange']
|
||||
|
||||
# try get all possible contracts for symbol as per,
|
||||
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
|
||||
con = ibis.Future(
|
||||
|
@ -1096,7 +1086,6 @@ async def load_aio_clients(
|
|||
# retry a few times to get the client going..
|
||||
connect_retries: int = 3,
|
||||
connect_timeout: float = 0.5,
|
||||
disconnect_on_exit: bool = True,
|
||||
|
||||
) -> dict[str, Client]:
|
||||
'''
|
||||
|
@ -1238,11 +1227,10 @@ async def load_aio_clients(
|
|||
finally:
|
||||
# TODO: for re-scans we'll want to not teardown clients which
|
||||
# are up and stable right?
|
||||
if disconnect_on_exit:
|
||||
for acct, client in _accounts2clients.items():
|
||||
log.info(f'Disconnecting {acct}@{client}')
|
||||
client.ib.disconnect()
|
||||
_client_cache.pop((host, port), None)
|
||||
for acct, client in _accounts2clients.items():
|
||||
log.info(f'Disconnecting {acct}@{client}')
|
||||
client.ib.disconnect()
|
||||
_client_cache.pop((host, port), None)
|
||||
|
||||
|
||||
async def load_clients_for_trio(
|
||||
|
|
|
@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
|
|||
entry['listingExchange'] = pexch
|
||||
|
||||
conf = get_config()
|
||||
entries = api_trades_to_ledger_entries(
|
||||
entries = trades_to_ledger_entries(
|
||||
conf['accounts'].inverse,
|
||||
trade_entries,
|
||||
)
|
||||
|
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
|
|||
else:
|
||||
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
||||
|
||||
raise ValueError(
|
||||
# log.error(
|
||||
# raise ValueError(
|
||||
log.error(
|
||||
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
||||
f'ib: {ibppmsg}\n'
|
||||
f'piker: {msg}\n'
|
||||
|
@ -1123,16 +1123,18 @@ def norm_trade_records(
|
|||
continue
|
||||
|
||||
# timestamping is way different in API records
|
||||
dtstr = record.get('datetime')
|
||||
date = record.get('date')
|
||||
flex_dtstr = record.get('dateTime')
|
||||
|
||||
if dtstr or date:
|
||||
dt = pendulum.parse(dtstr or date)
|
||||
|
||||
elif flex_dtstr:
|
||||
if not date:
|
||||
# probably a flex record with a wonky non-std timestamp..
|
||||
dt = parse_flex_dt(record['dateTime'])
|
||||
date, ts = record['dateTime'].split(';')
|
||||
dt = pendulum.parse(date)
|
||||
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
||||
tsdt = pendulum.parse(ts)
|
||||
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
||||
|
||||
else:
|
||||
# epoch_dt = pendulum.from_timestamp(record.get('time'))
|
||||
dt = pendulum.parse(date)
|
||||
|
||||
# special handling of symbol extraction from
|
||||
# flex records using some ad-hoc schema parsing.
|
||||
|
@ -1181,58 +1183,69 @@ def norm_trade_records(
|
|||
return {r.tid: r for r in records}
|
||||
|
||||
|
||||
def parse_flex_dt(
|
||||
record: str,
|
||||
) -> pendulum.datetime:
|
||||
date, ts = record.split(';')
|
||||
dt = pendulum.parse(date)
|
||||
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
||||
tsdt = pendulum.parse(ts)
|
||||
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
||||
|
||||
|
||||
def api_trades_to_ledger_entries(
|
||||
def trades_to_ledger_entries(
|
||||
accounts: bidict,
|
||||
trade_entries: list[object],
|
||||
source_type: str = 'api',
|
||||
|
||||
) -> dict:
|
||||
'''
|
||||
Convert API execution objects entry objects into ``dict`` form,
|
||||
pretty much straight up without modification except add
|
||||
a `pydatetime` field from the parsed timestamp.
|
||||
Convert either of API execution objects or flex report
|
||||
entry objects into ``dict`` form, pretty much straight up
|
||||
without modification.
|
||||
|
||||
'''
|
||||
trades_by_account = {}
|
||||
|
||||
for t in trade_entries:
|
||||
# NOTE: example of schema we pull from the API client.
|
||||
# {
|
||||
# 'commissionReport': CommissionReport(...
|
||||
# 'contract': {...
|
||||
# 'execution': Execution(...
|
||||
# 'time': 1654801166.0
|
||||
# }
|
||||
if source_type == 'flex':
|
||||
entry = t.__dict__
|
||||
|
||||
# flatten all sub-dicts and values into one top level entry.
|
||||
entry = {}
|
||||
for section, val in t.items():
|
||||
match section:
|
||||
case 'contract' | 'execution' | 'commissionReport':
|
||||
# sub-dict cases
|
||||
entry.update(val)
|
||||
# XXX: LOL apparently ``toml`` has a bug
|
||||
# where a section key error will show up in the write
|
||||
# if you leave a table key as an `int`? So i guess
|
||||
# cast to strs for all keys..
|
||||
|
||||
case 'time':
|
||||
# ib has wack ns timestamps, or is that us?
|
||||
continue
|
||||
# oddly for some so-called "BookTrade" entries
|
||||
# this field seems to be blank, no cuckin clue.
|
||||
# trade['ibExecID']
|
||||
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
||||
# date = str(entry['tradeDate'])
|
||||
|
||||
case _:
|
||||
entry[section] = val
|
||||
# XXX: is it going to cause problems if a account name
|
||||
# get's lost? The user should be able to find it based
|
||||
# on the actual exec history right?
|
||||
acctid = accounts[str(entry['accountId'])]
|
||||
|
||||
tid = str(entry['execId'])
|
||||
dt = pendulum.from_timestamp(entry['time'])
|
||||
# TODO: why isn't this showing seconds in the str?
|
||||
entry['pydatetime'] = dt
|
||||
entry['datetime'] = str(dt)
|
||||
acctid = accounts[entry['acctNumber']]
|
||||
elif source_type == 'api':
|
||||
# NOTE: example of schema we pull from the API client.
|
||||
# {
|
||||
# 'commissionReport': CommissionReport(...
|
||||
# 'contract': {...
|
||||
# 'execution': Execution(...
|
||||
# 'time': 1654801166.0
|
||||
# }
|
||||
|
||||
# flatten all sub-dicts and values into one top level entry.
|
||||
entry = {}
|
||||
for section, val in t.items():
|
||||
match section:
|
||||
case 'contract' | 'execution' | 'commissionReport':
|
||||
# sub-dict cases
|
||||
entry.update(val)
|
||||
|
||||
case 'time':
|
||||
# ib has wack ns timestamps, or is that us?
|
||||
continue
|
||||
|
||||
case _:
|
||||
entry[section] = val
|
||||
|
||||
tid = str(entry['execId'])
|
||||
dt = pendulum.from_timestamp(entry['time'])
|
||||
# TODO: why isn't this showing seconds in the str?
|
||||
entry['date'] = str(dt)
|
||||
acctid = accounts[entry['acctNumber']]
|
||||
|
||||
if not tid:
|
||||
# this is likely some kind of internal adjustment
|
||||
|
@ -1250,73 +1263,6 @@ def api_trades_to_ledger_entries(
|
|||
acctid, {}
|
||||
)[tid] = entry
|
||||
|
||||
# sort entries in output by python based datetime
|
||||
for acctid in trades_by_account:
|
||||
trades_by_account[acctid] = dict(sorted(
|
||||
trades_by_account[acctid].items(),
|
||||
key=lambda entry: entry[1].pop('pydatetime'),
|
||||
))
|
||||
|
||||
return trades_by_account
|
||||
|
||||
|
||||
def flex_records_to_ledger_entries(
|
||||
accounts: bidict,
|
||||
trade_entries: list[object],
|
||||
|
||||
) -> dict:
|
||||
'''
|
||||
Convert flex report entry objects into ``dict`` form, pretty much
|
||||
straight up without modification except add a `pydatetime` field
|
||||
from the parsed timestamp.
|
||||
|
||||
'''
|
||||
trades_by_account = {}
|
||||
for t in trade_entries:
|
||||
entry = t.__dict__
|
||||
|
||||
# XXX: LOL apparently ``toml`` has a bug
|
||||
# where a section key error will show up in the write
|
||||
# if you leave a table key as an `int`? So i guess
|
||||
# cast to strs for all keys..
|
||||
|
||||
# oddly for some so-called "BookTrade" entries
|
||||
# this field seems to be blank, no cuckin clue.
|
||||
# trade['ibExecID']
|
||||
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
||||
# date = str(entry['tradeDate'])
|
||||
|
||||
# XXX: is it going to cause problems if a account name
|
||||
# get's lost? The user should be able to find it based
|
||||
# on the actual exec history right?
|
||||
acctid = accounts[str(entry['accountId'])]
|
||||
|
||||
# probably a flex record with a wonky non-std timestamp..
|
||||
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
|
||||
entry['datetime'] = str(dt)
|
||||
|
||||
if not tid:
|
||||
# this is likely some kind of internal adjustment
|
||||
# transaction, likely one of the following:
|
||||
# - an expiry event that will show a "book trade" indicating
|
||||
# some adjustment to cash balances: zeroing or itm settle.
|
||||
# - a manual cash balance position adjustment likely done by
|
||||
# the user from the accounts window in TWS where they can
|
||||
# manually set the avg price and size:
|
||||
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
|
||||
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
|
||||
continue
|
||||
|
||||
trades_by_account.setdefault(
|
||||
acctid, {}
|
||||
)[tid] = entry
|
||||
|
||||
for acctid in trades_by_account:
|
||||
trades_by_account[acctid] = dict(sorted(
|
||||
trades_by_account[acctid].items(),
|
||||
key=lambda entry: entry[1]['pydatetime'],
|
||||
))
|
||||
|
||||
return trades_by_account
|
||||
|
||||
|
||||
|
@ -1363,16 +1309,15 @@ def load_flex_trades(
|
|||
ln = len(trade_entries)
|
||||
log.info(f'Loaded {ln} trades from flex query')
|
||||
|
||||
trades_by_account = flex_records_to_ledger_entries(
|
||||
conf['accounts'].inverse, # reverse map to user account names
|
||||
trades_by_account = trades_to_ledger_entries(
|
||||
# get reverse map to user account names
|
||||
conf['accounts'].inverse,
|
||||
trade_entries,
|
||||
source_type='flex',
|
||||
)
|
||||
|
||||
ledger_dict: Optional[dict] = None
|
||||
|
||||
for acctid in trades_by_account:
|
||||
trades_by_id = trades_by_account[acctid]
|
||||
|
||||
with open_trade_ledger('ib', acctid) as ledger_dict:
|
||||
tid_delta = set(trades_by_id) - set(ledger_dict)
|
||||
log.info(
|
||||
|
@ -1380,11 +1325,9 @@ def load_flex_trades(
|
|||
f'{pformat(tid_delta)}'
|
||||
)
|
||||
if tid_delta:
|
||||
sorted_delta = dict(sorted(
|
||||
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
|
||||
key=lambda entry: entry[1].pop('pydatetime'),
|
||||
))
|
||||
ledger_dict.update(sorted_delta)
|
||||
ledger_dict.update(
|
||||
{tid: trades_by_id[tid] for tid in tid_delta}
|
||||
)
|
||||
|
||||
return ledger_dict
|
||||
|
||||
|
|
|
@ -254,9 +254,6 @@ async def wait_on_data_reset(
|
|||
return False
|
||||
|
||||
|
||||
_data_resetter_task: trio.Task | None = None
|
||||
|
||||
|
||||
async def get_bars(
|
||||
|
||||
proxy: MethodProxy,
|
||||
|
@ -267,7 +264,7 @@ async def get_bars(
|
|||
end_dt: str = '',
|
||||
|
||||
# TODO: make this more dynamic based on measured frame rx latency..
|
||||
timeout: float = 3, # how long before we trigger a feed reset
|
||||
timeout: float = 1.5, # how long before we trigger a feed reset
|
||||
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
|
@ -277,15 +274,13 @@ async def get_bars(
|
|||
a ``MethoProxy``.
|
||||
|
||||
'''
|
||||
global _data_resetter_task
|
||||
|
||||
data_cs: trio.CancelScope | None = None
|
||||
result: tuple[
|
||||
data_cs: Optional[trio.CancelScope] = None
|
||||
result: Optional[tuple[
|
||||
ibis.objects.BarDataList,
|
||||
np.ndarray,
|
||||
datetime,
|
||||
datetime,
|
||||
] | None = None
|
||||
]] = None
|
||||
result_ready = trio.Event()
|
||||
|
||||
async def query():
|
||||
|
@ -312,7 +307,7 @@ async def get_bars(
|
|||
log.warning(
|
||||
f'History is blank for {dt_duration} from {end_dt}'
|
||||
)
|
||||
end_dt -= dt_duration
|
||||
end_dt = end_dt.subtract(dt_duration)
|
||||
continue
|
||||
|
||||
if bars_array is None:
|
||||
|
@ -405,10 +400,6 @@ async def get_bars(
|
|||
else:
|
||||
raise
|
||||
|
||||
# TODO: make this global across all history task/requests
|
||||
# such that simultaneous symbol queries don't try data resettingn
|
||||
# too fast..
|
||||
unset_resetter: bool = False
|
||||
async with trio.open_nursery() as nurse:
|
||||
|
||||
# start history request that we allow
|
||||
|
@ -423,14 +414,6 @@ async def get_bars(
|
|||
await result_ready.wait()
|
||||
break
|
||||
|
||||
if _data_resetter_task:
|
||||
# don't double invoke the reset hack if another
|
||||
# requester task already has it covered.
|
||||
continue
|
||||
else:
|
||||
_data_resetter_task = trio.lowlevel.current_task()
|
||||
unset_resetter = True
|
||||
|
||||
# spawn new data reset task
|
||||
data_cs, reset_done = await nurse.start(
|
||||
partial(
|
||||
|
@ -442,7 +425,6 @@ async def get_bars(
|
|||
# sync wait on reset to complete
|
||||
await reset_done.wait()
|
||||
|
||||
_data_resetter_task = None if unset_resetter else _data_resetter_task
|
||||
return result, data_cs is not None
|
||||
|
||||
|
||||
|
@ -501,9 +483,7 @@ async def _setup_quote_stream(
|
|||
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
async with load_aio_clients(
|
||||
disconnect_on_exit=False,
|
||||
) as accts2clients:
|
||||
async with load_aio_clients() as accts2clients:
|
||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||
contract = contract or (await client.find_contract(symbol))
|
||||
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
||||
|
@ -583,8 +563,7 @@ async def open_aio_quote_stream(
|
|||
from_aio = _quote_streams.get(symbol)
|
||||
if from_aio:
|
||||
|
||||
# if we already have a cached feed deliver a rx side clone
|
||||
# to consumer
|
||||
# if we already have a cached feed deliver a rx side clone to consumer
|
||||
async with broadcast_receiver(
|
||||
from_aio,
|
||||
2**6,
|
||||
|
@ -775,97 +754,67 @@ async def stream_quotes(
|
|||
await trio.sleep_forever()
|
||||
return # we never expect feed to come up?
|
||||
|
||||
cs: Optional[trio.CancelScope] = None
|
||||
startup: bool = True
|
||||
while (
|
||||
startup
|
||||
or cs.cancel_called
|
||||
):
|
||||
with trio.CancelScope() as cs:
|
||||
async with (
|
||||
trio.open_nursery() as nurse,
|
||||
open_aio_quote_stream(
|
||||
symbol=sym,
|
||||
contract=con,
|
||||
) as stream,
|
||||
):
|
||||
# ugh, clear ticks since we've consumed them
|
||||
# (ahem, ib_insync is stateful trash)
|
||||
first_ticker.ticks = []
|
||||
async with open_aio_quote_stream(
|
||||
symbol=sym,
|
||||
contract=con,
|
||||
) as stream:
|
||||
|
||||
# only on first entry at feed boot up
|
||||
if startup:
|
||||
startup = False
|
||||
task_status.started((init_msgs, first_quote))
|
||||
# ugh, clear ticks since we've consumed them
|
||||
# (ahem, ib_insync is stateful trash)
|
||||
first_ticker.ticks = []
|
||||
|
||||
# start a stream restarter task which monitors the
|
||||
# data feed event.
|
||||
async def reset_on_feed():
|
||||
task_status.started((init_msgs, first_quote))
|
||||
|
||||
# TODO: this seems to be surpressed from the
|
||||
# traceback in ``tractor``?
|
||||
# assert 0
|
||||
async with aclosing(stream):
|
||||
if syminfo.get('no_vlm', False):
|
||||
|
||||
rt_ev = proxy.status_event(
|
||||
'Market data farm connection is OK:usfarm'
|
||||
)
|
||||
await rt_ev.wait()
|
||||
cs.cancel() # cancel called should now be set
|
||||
# generally speaking these feeds don't
|
||||
# include vlm data.
|
||||
atype = syminfo['asset_type']
|
||||
log.info(
|
||||
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
|
||||
)
|
||||
|
||||
nurse.start_soon(reset_on_feed)
|
||||
|
||||
async with aclosing(stream):
|
||||
if syminfo.get('no_vlm', False):
|
||||
|
||||
# generally speaking these feeds don't
|
||||
# include vlm data.
|
||||
atype = syminfo['asset_type']
|
||||
log.info(
|
||||
f'No-vlm {sym}@{atype}, skipping quote poll'
|
||||
)
|
||||
else:
|
||||
# wait for real volume on feed (trading might be closed)
|
||||
while True:
|
||||
ticker = await stream.receive()
|
||||
|
||||
# for a real volume contract we rait for the first
|
||||
# "real" trade to take place
|
||||
if (
|
||||
# not calc_price
|
||||
# and not ticker.rtTime
|
||||
not ticker.rtTime
|
||||
):
|
||||
# spin consuming tickers until we get a real
|
||||
# market datum
|
||||
log.debug(f"New unsent ticker: {ticker}")
|
||||
continue
|
||||
else:
|
||||
# wait for real volume on feed (trading might be
|
||||
# closed)
|
||||
while True:
|
||||
ticker = await stream.receive()
|
||||
|
||||
# for a real volume contract we rait for
|
||||
# the first "real" trade to take place
|
||||
if (
|
||||
# not calc_price
|
||||
# and not ticker.rtTime
|
||||
not ticker.rtTime
|
||||
):
|
||||
# spin consuming tickers until we
|
||||
# get a real market datum
|
||||
log.debug(f"New unsent ticker: {ticker}")
|
||||
continue
|
||||
else:
|
||||
log.debug("Received first volume tick")
|
||||
# ugh, clear ticks since we've
|
||||
# consumed them (ahem, ib_insync is
|
||||
# truly stateful trash)
|
||||
ticker.ticks = []
|
||||
|
||||
# XXX: this works because we don't use
|
||||
# ``aclosing()`` above?
|
||||
break
|
||||
|
||||
quote = normalize(ticker)
|
||||
log.debug(f"First ticker received {quote}")
|
||||
|
||||
# tell caller quotes are now coming in live
|
||||
feed_is_live.set()
|
||||
|
||||
# last = time.time()
|
||||
async for ticker in stream:
|
||||
quote = normalize(ticker)
|
||||
await send_chan.send({quote['fqsn']: quote})
|
||||
|
||||
log.debug("Received first real volume tick")
|
||||
# ugh, clear ticks since we've consumed them
|
||||
# (ahem, ib_insync is truly stateful trash)
|
||||
ticker.ticks = []
|
||||
# last = time.time()
|
||||
|
||||
# XXX: this works because we don't use
|
||||
# ``aclosing()`` above?
|
||||
break
|
||||
|
||||
quote = normalize(ticker)
|
||||
log.debug(f"First ticker received {quote}")
|
||||
|
||||
# tell caller quotes are now coming in live
|
||||
feed_is_live.set()
|
||||
|
||||
# last = time.time()
|
||||
async for ticker in stream:
|
||||
quote = normalize(ticker)
|
||||
await send_chan.send({quote['fqsn']: quote})
|
||||
|
||||
# ugh, clear ticks since we've consumed them
|
||||
ticker.ticks = []
|
||||
# last = time.time()
|
||||
|
||||
|
||||
async def data_reset_hack(
|
||||
|
@ -973,14 +922,7 @@ async def open_symbol_search(
|
|||
except trio.WouldBlock:
|
||||
pass
|
||||
|
||||
if (
|
||||
not pattern
|
||||
or pattern.isspace()
|
||||
|
||||
# XXX: not sure if this is a bad assumption but it
|
||||
# seems to make search snappier?
|
||||
or len(pattern) < 1
|
||||
):
|
||||
if not pattern or pattern.isspace():
|
||||
log.warning('empty pattern received, skipping..')
|
||||
|
||||
# TODO: *BUG* if nothing is returned here the client
|
||||
|
|
|
@ -138,26 +138,25 @@ def cli(ctx, brokers, loglevel, tl, configdir):
|
|||
@click.pass_obj
|
||||
def services(config, tl, names):
|
||||
|
||||
from .._daemon import open_piker_runtime
|
||||
|
||||
async def list_services():
|
||||
async with (
|
||||
open_piker_runtime(
|
||||
name='service_query',
|
||||
loglevel=config['loglevel'] if tl else None,
|
||||
),
|
||||
tractor.get_arbiter(
|
||||
*_tractor_kwargs['arbiter_addr']
|
||||
) as portal
|
||||
):
|
||||
|
||||
async with tractor.get_arbiter(
|
||||
*_tractor_kwargs['arbiter_addr']
|
||||
) as portal:
|
||||
registry = await portal.run_from_ns('self', 'get_registry')
|
||||
json_d = {}
|
||||
for key, socket in registry.items():
|
||||
# name, uuid = uid
|
||||
host, port = socket
|
||||
json_d[key] = f'{host}:{port}'
|
||||
click.echo(f"{colorize_json(json_d)}")
|
||||
|
||||
trio.run(list_services)
|
||||
tractor.run(
|
||||
list_services,
|
||||
name='service_query',
|
||||
loglevel=config['loglevel'] if tl else None,
|
||||
arbiter_addr=_tractor_kwargs['arbiter_addr'],
|
||||
)
|
||||
|
||||
|
||||
def _load_clis() -> None:
|
||||
|
|
|
@ -333,7 +333,7 @@ async def start_backfill(
|
|||
# do a decently sized backfill and load it into storage.
|
||||
periods = {
|
||||
1: {'days': 6},
|
||||
60: {'years': 6},
|
||||
60: {'years': 10},
|
||||
}
|
||||
|
||||
kwargs = periods[step_size_s]
|
||||
|
@ -348,45 +348,36 @@ async def start_backfill(
|
|||
# last retrieved start dt to the next request as
|
||||
# it's end dt.
|
||||
starts: set[datetime] = set()
|
||||
|
||||
while start_dt > last_tsdb_dt:
|
||||
|
||||
print(f"QUERY end_dt={start_dt}")
|
||||
try:
|
||||
log.info(
|
||||
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
||||
)
|
||||
array, next_start_dt, end_dt = await hist(
|
||||
array, start_dt, end_dt = await hist(
|
||||
timeframe,
|
||||
end_dt=start_dt,
|
||||
)
|
||||
|
||||
if next_start_dt in starts:
|
||||
start_dt = min(starts)
|
||||
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
|
||||
continue
|
||||
|
||||
# only update new start point if new
|
||||
start_dt = next_start_dt
|
||||
starts.add(start_dt)
|
||||
|
||||
assert array['time'][0] == start_dt.timestamp()
|
||||
|
||||
except NoData:
|
||||
# XXX: unhandled history gap (shouldn't happen?)
|
||||
log.warning(
|
||||
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
||||
)
|
||||
await tractor.breakpoint()
|
||||
return None # discard signal
|
||||
|
||||
except DataUnavailable: # as duerr:
|
||||
# broker is being a bish and we can't pull any more..
|
||||
log.warning(
|
||||
f'NO-MORE-DATA: backend {mod.name} halted history!?'
|
||||
)
|
||||
except DataUnavailable as duerr:
|
||||
# broker is being a bish and we can't pull
|
||||
# any more..
|
||||
log.warning('backend halted on data deliver !?!?')
|
||||
|
||||
# ugh, what's a better way?
|
||||
# TODO: fwiw, we probably want a way to signal a throttle
|
||||
# condition (eg. with ib) so that we can halt the
|
||||
# request loop until the condition is resolved?
|
||||
return
|
||||
return duerr
|
||||
|
||||
diff = end_dt - start_dt
|
||||
frame_time_diff_s = diff.seconds
|
||||
|
@ -403,6 +394,22 @@ async def start_backfill(
|
|||
f'{diff} ~= {frame_time_diff_s} seconds'
|
||||
)
|
||||
|
||||
array, _start_dt, end_dt = await hist(
|
||||
timeframe,
|
||||
end_dt=start_dt,
|
||||
)
|
||||
|
||||
if (
|
||||
_start_dt in starts
|
||||
):
|
||||
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
|
||||
start_dt = min(starts)
|
||||
continue
|
||||
|
||||
# only update new start point if new
|
||||
start_dt = _start_dt
|
||||
starts.add(start_dt)
|
||||
|
||||
to_push = diff_history(
|
||||
array,
|
||||
start_dt,
|
||||
|
@ -491,10 +498,10 @@ async def manage_history(
|
|||
readonly=False,
|
||||
)
|
||||
# TODO: history validation
|
||||
# if not opened:
|
||||
# raise RuntimeError(
|
||||
# "Persistent shm for sym was already open?!"
|
||||
# )
|
||||
if not opened:
|
||||
raise RuntimeError(
|
||||
"Persistent shm for sym was already open?!"
|
||||
)
|
||||
|
||||
rt_shm, opened = maybe_open_shm_array(
|
||||
key=f'{fqsn}_rt',
|
||||
|
@ -506,10 +513,10 @@ async def manage_history(
|
|||
readonly=False,
|
||||
size=3*_secs_in_day,
|
||||
)
|
||||
# if not opened:
|
||||
# raise RuntimeError(
|
||||
# "Persistent shm for sym was already open?!"
|
||||
# )
|
||||
if not opened:
|
||||
raise RuntimeError(
|
||||
"Persistent shm for sym was already open?!"
|
||||
)
|
||||
|
||||
log.info('Scanning for existing `marketstored`')
|
||||
|
||||
|
|
71
piker/pp.py
71
piker/pp.py
|
@ -20,7 +20,6 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
|
|||
(looking at you `ib` and dirt-bird friends)
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import contextmanager as cm
|
||||
from pprint import pformat
|
||||
import os
|
||||
|
@ -139,31 +138,13 @@ class Position(Struct):
|
|||
|
||||
# ordered record of known constituent trade messages
|
||||
clears: dict[
|
||||
str | int, # trade id
|
||||
Union[str, int, Status], # trade id
|
||||
dict[str, Any], # transaction history summaries
|
||||
] = {}
|
||||
first_clear_dt: Optional[datetime] = None
|
||||
|
||||
expiry: Optional[datetime] = None
|
||||
|
||||
# @property
|
||||
# def clears(self) -> dict[
|
||||
# Union[str, int, Status], # trade id
|
||||
# dict[str, Any], # transaction history summaries
|
||||
# ]:
|
||||
# '''
|
||||
# Datetime sorted reference to internal clears table.
|
||||
|
||||
# '''
|
||||
# # self._clears = {}
|
||||
# self._clears = dict(sorted(
|
||||
# self._clears.items(),
|
||||
# key=lambda entry: entry[1]['dt'],
|
||||
# ))
|
||||
# # self._clears[k] = v
|
||||
|
||||
# return self._clears
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
f: getattr(self, f)
|
||||
|
@ -238,10 +219,6 @@ class Position(Struct):
|
|||
|
||||
'''
|
||||
clears = list(self.clears.values())
|
||||
if not clears:
|
||||
log.warning(f'No clears table for {self.symbol}!?')
|
||||
return
|
||||
|
||||
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
|
||||
last_clear = clears[-1]
|
||||
|
||||
|
@ -646,7 +623,6 @@ class PpTable(Struct):
|
|||
|
||||
def to_toml(
|
||||
self,
|
||||
min_clears: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
|
||||
active, closed = self.dump_active()
|
||||
|
@ -659,9 +635,7 @@ class PpTable(Struct):
|
|||
|
||||
# keep the minimal amount of clears that make up this
|
||||
# position since the last net-zero state.
|
||||
if min_clears:
|
||||
pos.minimize_clears()
|
||||
|
||||
pos.minimize_clears()
|
||||
pos.ensure_state()
|
||||
|
||||
# serialize to pre-toml form
|
||||
|
@ -708,8 +682,6 @@ def load_pps_from_ledger(
|
|||
brokername: str,
|
||||
acctname: str,
|
||||
|
||||
table: Optional[PpTable] = None,
|
||||
|
||||
# post normalization filter on ledger entries to be processed
|
||||
filter_by: Optional[list[dict]] = None,
|
||||
|
||||
|
@ -726,6 +698,7 @@ def load_pps_from_ledger(
|
|||
'''
|
||||
with (
|
||||
open_trade_ledger(brokername, acctname) as ledger,
|
||||
open_pps(brokername, acctname) as table,
|
||||
):
|
||||
if not ledger:
|
||||
# null case, no ledger file with content
|
||||
|
@ -743,11 +716,7 @@ def load_pps_from_ledger(
|
|||
else:
|
||||
records = src_records
|
||||
|
||||
if table is None:
|
||||
with open_pps(brokername, acctname) as table:
|
||||
updated = table.update_from_trans(records)
|
||||
else:
|
||||
updated = table.update_from_trans(records)
|
||||
updated = table.update_from_trans(records)
|
||||
|
||||
return records, updated
|
||||
|
||||
|
@ -917,27 +886,15 @@ def open_pps(
|
|||
conf=conf,
|
||||
)
|
||||
|
||||
# first pass populate all missing clears record tables
|
||||
# for fqsn, entry in pps.items():
|
||||
# # convert clears sub-tables (only in this form
|
||||
# # for toml re-presentation) back into a master table.
|
||||
# clears_list = entry.get('clears', [])
|
||||
|
||||
# # attempt to reload from ledger
|
||||
# if not clears_list:
|
||||
# trans, pos = load_pps_from_ledger(
|
||||
# brokername,
|
||||
# acctid,
|
||||
# filter_by=[entry['bsuid']],
|
||||
# table=table,
|
||||
# )
|
||||
# # breakpoint()
|
||||
|
||||
# unmarshal/load ``pps.toml`` config entries into object form
|
||||
# and update `PpTable` obj entries.
|
||||
for fqsn, entry in pps.items():
|
||||
bsuid = entry['bsuid']
|
||||
|
||||
# convert clears sub-tables (only in this form
|
||||
# for toml re-presentation) back into a master table.
|
||||
clears_list = entry['clears']
|
||||
|
||||
# index clears entries in "object" form by tid in a top
|
||||
# level dict instead of a list (as is presented in our
|
||||
# ``pps.toml``).
|
||||
|
@ -949,18 +906,6 @@ def open_pps(
|
|||
# processing of new clear events.
|
||||
trans: list[Transaction] = []
|
||||
|
||||
# convert clears sub-tables (only in this form
|
||||
# for toml re-presentation) back into a master table.
|
||||
clears_list = entry['clears']
|
||||
|
||||
# # attempt to reload from ledger
|
||||
# if not clears_list:
|
||||
# trans, pos = load_pps_from_ledger(
|
||||
# brokername,
|
||||
# acctid,
|
||||
# table=table,
|
||||
# )
|
||||
|
||||
for clears_table in clears_list:
|
||||
tid = clears_table.pop('tid')
|
||||
dtstr = clears_table['dt']
|
||||
|
|
Loading…
Reference in New Issue