Compare commits
10 Commits
96f5a8abb8
...
c53071e43a
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | c53071e43a | |
Tyler Goodlet | 41ffccc59e | |
Tyler Goodlet | 55856f5e8b | |
Tyler Goodlet | e8ab28e456 | |
Tyler Goodlet | d2b6216994 | |
Tyler Goodlet | eb743759a4 | |
Tyler Goodlet | 74910ba56c | |
Tyler Goodlet | 28535fa977 | |
Tyler Goodlet | 1d7e642dbd | |
Tyler Goodlet | 69be65237f |
|
@ -195,9 +195,8 @@ async def open_piker_runtime(
|
||||||
|
|
||||||
) -> Optional[tractor._portal.Portal]:
|
) -> Optional[tractor._portal.Portal]:
|
||||||
'''
|
'''
|
||||||
Start a piker actor who's runtime will automatically
|
Start a piker actor who's runtime will automatically sync with
|
||||||
sync with existing piker actors in local network
|
existing piker actors on the local link based on configuration.
|
||||||
based on configuration.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _services
|
global _services
|
||||||
|
|
|
@ -412,7 +412,7 @@ class Client:
|
||||||
# ``end_dt`` which exceeds the ``duration``,
|
# ``end_dt`` which exceeds the ``duration``,
|
||||||
# - a timeout occurred in which case insync internals return
|
# - a timeout occurred in which case insync internals return
|
||||||
# an empty list thing with bars.clear()...
|
# an empty list thing with bars.clear()...
|
||||||
return [], np.empty(0)
|
return [], np.empty(0), dt_duration
|
||||||
# TODO: we could maybe raise ``NoData`` instead if we
|
# TODO: we could maybe raise ``NoData`` instead if we
|
||||||
# rewrite the method in the first case? right now there's no
|
# rewrite the method in the first case? right now there's no
|
||||||
# way to detect a timeout.
|
# way to detect a timeout.
|
||||||
|
@ -483,7 +483,7 @@ class Client:
|
||||||
self,
|
self,
|
||||||
pattern: str,
|
pattern: str,
|
||||||
# how many contracts to search "up to"
|
# how many contracts to search "up to"
|
||||||
upto: int = 6,
|
upto: int = 16,
|
||||||
asdicts: bool = True,
|
asdicts: bool = True,
|
||||||
|
|
||||||
) -> dict[str, ContractDetails]:
|
) -> dict[str, ContractDetails]:
|
||||||
|
@ -518,6 +518,16 @@ class Client:
|
||||||
|
|
||||||
exch = tract.exchange
|
exch = tract.exchange
|
||||||
if exch not in _exch_skip_list:
|
if exch not in _exch_skip_list:
|
||||||
|
|
||||||
|
# try to lookup any contracts from our adhoc set
|
||||||
|
# since often the exchange/venue is named slightly
|
||||||
|
# different (eg. BRR.CMECRYPTO` instead of just
|
||||||
|
# `.CME`).
|
||||||
|
info = _adhoc_symbol_map.get(sym)
|
||||||
|
if info:
|
||||||
|
con_kwargs, bars_kwargs = info
|
||||||
|
exch = con_kwargs['exchange']
|
||||||
|
|
||||||
# try get all possible contracts for symbol as per,
|
# try get all possible contracts for symbol as per,
|
||||||
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
|
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
|
||||||
con = ibis.Future(
|
con = ibis.Future(
|
||||||
|
@ -1086,6 +1096,7 @@ async def load_aio_clients(
|
||||||
# retry a few times to get the client going..
|
# retry a few times to get the client going..
|
||||||
connect_retries: int = 3,
|
connect_retries: int = 3,
|
||||||
connect_timeout: float = 0.5,
|
connect_timeout: float = 0.5,
|
||||||
|
disconnect_on_exit: bool = True,
|
||||||
|
|
||||||
) -> dict[str, Client]:
|
) -> dict[str, Client]:
|
||||||
'''
|
'''
|
||||||
|
@ -1227,6 +1238,7 @@ async def load_aio_clients(
|
||||||
finally:
|
finally:
|
||||||
# TODO: for re-scans we'll want to not teardown clients which
|
# TODO: for re-scans we'll want to not teardown clients which
|
||||||
# are up and stable right?
|
# are up and stable right?
|
||||||
|
if disconnect_on_exit:
|
||||||
for acct, client in _accounts2clients.items():
|
for acct, client in _accounts2clients.items():
|
||||||
log.info(f'Disconnecting {acct}@{client}')
|
log.info(f'Disconnecting {acct}@{client}')
|
||||||
client.ib.disconnect()
|
client.ib.disconnect()
|
||||||
|
|
|
@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
|
||||||
entry['listingExchange'] = pexch
|
entry['listingExchange'] = pexch
|
||||||
|
|
||||||
conf = get_config()
|
conf = get_config()
|
||||||
entries = trades_to_ledger_entries(
|
entries = api_trades_to_ledger_entries(
|
||||||
conf['accounts'].inverse,
|
conf['accounts'].inverse,
|
||||||
trade_entries,
|
trade_entries,
|
||||||
)
|
)
|
||||||
|
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
|
||||||
else:
|
else:
|
||||||
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
||||||
|
|
||||||
# raise ValueError(
|
raise ValueError(
|
||||||
log.error(
|
# log.error(
|
||||||
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
||||||
f'ib: {ibppmsg}\n'
|
f'ib: {ibppmsg}\n'
|
||||||
f'piker: {msg}\n'
|
f'piker: {msg}\n'
|
||||||
|
@ -1123,18 +1123,16 @@ def norm_trade_records(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# timestamping is way different in API records
|
# timestamping is way different in API records
|
||||||
|
dtstr = record.get('datetime')
|
||||||
date = record.get('date')
|
date = record.get('date')
|
||||||
if not date:
|
flex_dtstr = record.get('dateTime')
|
||||||
# probably a flex record with a wonky non-std timestamp..
|
|
||||||
date, ts = record['dateTime'].split(';')
|
|
||||||
dt = pendulum.parse(date)
|
|
||||||
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
|
||||||
tsdt = pendulum.parse(ts)
|
|
||||||
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
|
||||||
|
|
||||||
else:
|
if dtstr or date:
|
||||||
# epoch_dt = pendulum.from_timestamp(record.get('time'))
|
dt = pendulum.parse(dtstr or date)
|
||||||
dt = pendulum.parse(date)
|
|
||||||
|
elif flex_dtstr:
|
||||||
|
# probably a flex record with a wonky non-std timestamp..
|
||||||
|
dt = parse_flex_dt(record['dateTime'])
|
||||||
|
|
||||||
# special handling of symbol extraction from
|
# special handling of symbol extraction from
|
||||||
# flex records using some ad-hoc schema parsing.
|
# flex records using some ad-hoc schema parsing.
|
||||||
|
@ -1183,41 +1181,29 @@ def norm_trade_records(
|
||||||
return {r.tid: r for r in records}
|
return {r.tid: r for r in records}
|
||||||
|
|
||||||
|
|
||||||
def trades_to_ledger_entries(
|
def parse_flex_dt(
|
||||||
|
record: str,
|
||||||
|
) -> pendulum.datetime:
|
||||||
|
date, ts = record.split(';')
|
||||||
|
dt = pendulum.parse(date)
|
||||||
|
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
||||||
|
tsdt = pendulum.parse(ts)
|
||||||
|
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
||||||
|
|
||||||
|
|
||||||
|
def api_trades_to_ledger_entries(
|
||||||
accounts: bidict,
|
accounts: bidict,
|
||||||
trade_entries: list[object],
|
trade_entries: list[object],
|
||||||
source_type: str = 'api',
|
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
'''
|
'''
|
||||||
Convert either of API execution objects or flex report
|
Convert API execution objects entry objects into ``dict`` form,
|
||||||
entry objects into ``dict`` form, pretty much straight up
|
pretty much straight up without modification except add
|
||||||
without modification.
|
a `pydatetime` field from the parsed timestamp.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
trades_by_account = {}
|
trades_by_account = {}
|
||||||
|
|
||||||
for t in trade_entries:
|
for t in trade_entries:
|
||||||
if source_type == 'flex':
|
|
||||||
entry = t.__dict__
|
|
||||||
|
|
||||||
# XXX: LOL apparently ``toml`` has a bug
|
|
||||||
# where a section key error will show up in the write
|
|
||||||
# if you leave a table key as an `int`? So i guess
|
|
||||||
# cast to strs for all keys..
|
|
||||||
|
|
||||||
# oddly for some so-called "BookTrade" entries
|
|
||||||
# this field seems to be blank, no cuckin clue.
|
|
||||||
# trade['ibExecID']
|
|
||||||
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
|
||||||
# date = str(entry['tradeDate'])
|
|
||||||
|
|
||||||
# XXX: is it going to cause problems if a account name
|
|
||||||
# get's lost? The user should be able to find it based
|
|
||||||
# on the actual exec history right?
|
|
||||||
acctid = accounts[str(entry['accountId'])]
|
|
||||||
|
|
||||||
elif source_type == 'api':
|
|
||||||
# NOTE: example of schema we pull from the API client.
|
# NOTE: example of schema we pull from the API client.
|
||||||
# {
|
# {
|
||||||
# 'commissionReport': CommissionReport(...
|
# 'commissionReport': CommissionReport(...
|
||||||
|
@ -1244,7 +1230,8 @@ def trades_to_ledger_entries(
|
||||||
tid = str(entry['execId'])
|
tid = str(entry['execId'])
|
||||||
dt = pendulum.from_timestamp(entry['time'])
|
dt = pendulum.from_timestamp(entry['time'])
|
||||||
# TODO: why isn't this showing seconds in the str?
|
# TODO: why isn't this showing seconds in the str?
|
||||||
entry['date'] = str(dt)
|
entry['pydatetime'] = dt
|
||||||
|
entry['datetime'] = str(dt)
|
||||||
acctid = accounts[entry['acctNumber']]
|
acctid = accounts[entry['acctNumber']]
|
||||||
|
|
||||||
if not tid:
|
if not tid:
|
||||||
|
@ -1263,6 +1250,73 @@ def trades_to_ledger_entries(
|
||||||
acctid, {}
|
acctid, {}
|
||||||
)[tid] = entry
|
)[tid] = entry
|
||||||
|
|
||||||
|
# sort entries in output by python based datetime
|
||||||
|
for acctid in trades_by_account:
|
||||||
|
trades_by_account[acctid] = dict(sorted(
|
||||||
|
trades_by_account[acctid].items(),
|
||||||
|
key=lambda entry: entry[1].pop('pydatetime'),
|
||||||
|
))
|
||||||
|
|
||||||
|
return trades_by_account
|
||||||
|
|
||||||
|
|
||||||
|
def flex_records_to_ledger_entries(
|
||||||
|
accounts: bidict,
|
||||||
|
trade_entries: list[object],
|
||||||
|
|
||||||
|
) -> dict:
|
||||||
|
'''
|
||||||
|
Convert flex report entry objects into ``dict`` form, pretty much
|
||||||
|
straight up without modification except add a `pydatetime` field
|
||||||
|
from the parsed timestamp.
|
||||||
|
|
||||||
|
'''
|
||||||
|
trades_by_account = {}
|
||||||
|
for t in trade_entries:
|
||||||
|
entry = t.__dict__
|
||||||
|
|
||||||
|
# XXX: LOL apparently ``toml`` has a bug
|
||||||
|
# where a section key error will show up in the write
|
||||||
|
# if you leave a table key as an `int`? So i guess
|
||||||
|
# cast to strs for all keys..
|
||||||
|
|
||||||
|
# oddly for some so-called "BookTrade" entries
|
||||||
|
# this field seems to be blank, no cuckin clue.
|
||||||
|
# trade['ibExecID']
|
||||||
|
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
||||||
|
# date = str(entry['tradeDate'])
|
||||||
|
|
||||||
|
# XXX: is it going to cause problems if a account name
|
||||||
|
# get's lost? The user should be able to find it based
|
||||||
|
# on the actual exec history right?
|
||||||
|
acctid = accounts[str(entry['accountId'])]
|
||||||
|
|
||||||
|
# probably a flex record with a wonky non-std timestamp..
|
||||||
|
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
|
||||||
|
entry['datetime'] = str(dt)
|
||||||
|
|
||||||
|
if not tid:
|
||||||
|
# this is likely some kind of internal adjustment
|
||||||
|
# transaction, likely one of the following:
|
||||||
|
# - an expiry event that will show a "book trade" indicating
|
||||||
|
# some adjustment to cash balances: zeroing or itm settle.
|
||||||
|
# - a manual cash balance position adjustment likely done by
|
||||||
|
# the user from the accounts window in TWS where they can
|
||||||
|
# manually set the avg price and size:
|
||||||
|
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
|
||||||
|
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
|
||||||
|
continue
|
||||||
|
|
||||||
|
trades_by_account.setdefault(
|
||||||
|
acctid, {}
|
||||||
|
)[tid] = entry
|
||||||
|
|
||||||
|
for acctid in trades_by_account:
|
||||||
|
trades_by_account[acctid] = dict(sorted(
|
||||||
|
trades_by_account[acctid].items(),
|
||||||
|
key=lambda entry: entry[1]['pydatetime'],
|
||||||
|
))
|
||||||
|
|
||||||
return trades_by_account
|
return trades_by_account
|
||||||
|
|
||||||
|
|
||||||
|
@ -1309,15 +1363,16 @@ def load_flex_trades(
|
||||||
ln = len(trade_entries)
|
ln = len(trade_entries)
|
||||||
log.info(f'Loaded {ln} trades from flex query')
|
log.info(f'Loaded {ln} trades from flex query')
|
||||||
|
|
||||||
trades_by_account = trades_to_ledger_entries(
|
trades_by_account = flex_records_to_ledger_entries(
|
||||||
# get reverse map to user account names
|
conf['accounts'].inverse, # reverse map to user account names
|
||||||
conf['accounts'].inverse,
|
|
||||||
trade_entries,
|
trade_entries,
|
||||||
source_type='flex',
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ledger_dict: Optional[dict] = None
|
||||||
|
|
||||||
for acctid in trades_by_account:
|
for acctid in trades_by_account:
|
||||||
trades_by_id = trades_by_account[acctid]
|
trades_by_id = trades_by_account[acctid]
|
||||||
|
|
||||||
with open_trade_ledger('ib', acctid) as ledger_dict:
|
with open_trade_ledger('ib', acctid) as ledger_dict:
|
||||||
tid_delta = set(trades_by_id) - set(ledger_dict)
|
tid_delta = set(trades_by_id) - set(ledger_dict)
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -1325,9 +1380,11 @@ def load_flex_trades(
|
||||||
f'{pformat(tid_delta)}'
|
f'{pformat(tid_delta)}'
|
||||||
)
|
)
|
||||||
if tid_delta:
|
if tid_delta:
|
||||||
ledger_dict.update(
|
sorted_delta = dict(sorted(
|
||||||
{tid: trades_by_id[tid] for tid in tid_delta}
|
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
|
||||||
)
|
key=lambda entry: entry[1].pop('pydatetime'),
|
||||||
|
))
|
||||||
|
ledger_dict.update(sorted_delta)
|
||||||
|
|
||||||
return ledger_dict
|
return ledger_dict
|
||||||
|
|
||||||
|
|
|
@ -254,6 +254,9 @@ async def wait_on_data_reset(
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
_data_resetter_task: trio.Task | None = None
|
||||||
|
|
||||||
|
|
||||||
async def get_bars(
|
async def get_bars(
|
||||||
|
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
|
@ -264,7 +267,7 @@ async def get_bars(
|
||||||
end_dt: str = '',
|
end_dt: str = '',
|
||||||
|
|
||||||
# TODO: make this more dynamic based on measured frame rx latency..
|
# TODO: make this more dynamic based on measured frame rx latency..
|
||||||
timeout: float = 1.5, # how long before we trigger a feed reset
|
timeout: float = 3, # how long before we trigger a feed reset
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
@ -274,13 +277,15 @@ async def get_bars(
|
||||||
a ``MethoProxy``.
|
a ``MethoProxy``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
data_cs: Optional[trio.CancelScope] = None
|
global _data_resetter_task
|
||||||
result: Optional[tuple[
|
|
||||||
|
data_cs: trio.CancelScope | None = None
|
||||||
|
result: tuple[
|
||||||
ibis.objects.BarDataList,
|
ibis.objects.BarDataList,
|
||||||
np.ndarray,
|
np.ndarray,
|
||||||
datetime,
|
datetime,
|
||||||
datetime,
|
datetime,
|
||||||
]] = None
|
] | None = None
|
||||||
result_ready = trio.Event()
|
result_ready = trio.Event()
|
||||||
|
|
||||||
async def query():
|
async def query():
|
||||||
|
@ -307,7 +312,7 @@ async def get_bars(
|
||||||
log.warning(
|
log.warning(
|
||||||
f'History is blank for {dt_duration} from {end_dt}'
|
f'History is blank for {dt_duration} from {end_dt}'
|
||||||
)
|
)
|
||||||
end_dt = end_dt.subtract(dt_duration)
|
end_dt -= dt_duration
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if bars_array is None:
|
if bars_array is None:
|
||||||
|
@ -400,6 +405,10 @@ async def get_bars(
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# TODO: make this global across all history task/requests
|
||||||
|
# such that simultaneous symbol queries don't try data resettingn
|
||||||
|
# too fast..
|
||||||
|
unset_resetter: bool = False
|
||||||
async with trio.open_nursery() as nurse:
|
async with trio.open_nursery() as nurse:
|
||||||
|
|
||||||
# start history request that we allow
|
# start history request that we allow
|
||||||
|
@ -414,6 +423,14 @@ async def get_bars(
|
||||||
await result_ready.wait()
|
await result_ready.wait()
|
||||||
break
|
break
|
||||||
|
|
||||||
|
if _data_resetter_task:
|
||||||
|
# don't double invoke the reset hack if another
|
||||||
|
# requester task already has it covered.
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
_data_resetter_task = trio.lowlevel.current_task()
|
||||||
|
unset_resetter = True
|
||||||
|
|
||||||
# spawn new data reset task
|
# spawn new data reset task
|
||||||
data_cs, reset_done = await nurse.start(
|
data_cs, reset_done = await nurse.start(
|
||||||
partial(
|
partial(
|
||||||
|
@ -425,6 +442,7 @@ async def get_bars(
|
||||||
# sync wait on reset to complete
|
# sync wait on reset to complete
|
||||||
await reset_done.wait()
|
await reset_done.wait()
|
||||||
|
|
||||||
|
_data_resetter_task = None if unset_resetter else _data_resetter_task
|
||||||
return result, data_cs is not None
|
return result, data_cs is not None
|
||||||
|
|
||||||
|
|
||||||
|
@ -483,7 +501,9 @@ async def _setup_quote_stream(
|
||||||
|
|
||||||
to_trio.send_nowait(None)
|
to_trio.send_nowait(None)
|
||||||
|
|
||||||
async with load_aio_clients() as accts2clients:
|
async with load_aio_clients(
|
||||||
|
disconnect_on_exit=False,
|
||||||
|
) as accts2clients:
|
||||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||||
contract = contract or (await client.find_contract(symbol))
|
contract = contract or (await client.find_contract(symbol))
|
||||||
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
||||||
|
@ -563,7 +583,8 @@ async def open_aio_quote_stream(
|
||||||
from_aio = _quote_streams.get(symbol)
|
from_aio = _quote_streams.get(symbol)
|
||||||
if from_aio:
|
if from_aio:
|
||||||
|
|
||||||
# if we already have a cached feed deliver a rx side clone to consumer
|
# if we already have a cached feed deliver a rx side clone
|
||||||
|
# to consumer
|
||||||
async with broadcast_receiver(
|
async with broadcast_receiver(
|
||||||
from_aio,
|
from_aio,
|
||||||
2**6,
|
2**6,
|
||||||
|
@ -754,17 +775,45 @@ async def stream_quotes(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
return # we never expect feed to come up?
|
return # we never expect feed to come up?
|
||||||
|
|
||||||
async with open_aio_quote_stream(
|
cs: Optional[trio.CancelScope] = None
|
||||||
|
startup: bool = True
|
||||||
|
while (
|
||||||
|
startup
|
||||||
|
or cs.cancel_called
|
||||||
|
):
|
||||||
|
with trio.CancelScope() as cs:
|
||||||
|
async with (
|
||||||
|
trio.open_nursery() as nurse,
|
||||||
|
open_aio_quote_stream(
|
||||||
symbol=sym,
|
symbol=sym,
|
||||||
contract=con,
|
contract=con,
|
||||||
) as stream:
|
) as stream,
|
||||||
|
):
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
# (ahem, ib_insync is stateful trash)
|
# (ahem, ib_insync is stateful trash)
|
||||||
first_ticker.ticks = []
|
first_ticker.ticks = []
|
||||||
|
|
||||||
|
# only on first entry at feed boot up
|
||||||
|
if startup:
|
||||||
|
startup = False
|
||||||
task_status.started((init_msgs, first_quote))
|
task_status.started((init_msgs, first_quote))
|
||||||
|
|
||||||
|
# start a stream restarter task which monitors the
|
||||||
|
# data feed event.
|
||||||
|
async def reset_on_feed():
|
||||||
|
|
||||||
|
# TODO: this seems to be surpressed from the
|
||||||
|
# traceback in ``tractor``?
|
||||||
|
# assert 0
|
||||||
|
|
||||||
|
rt_ev = proxy.status_event(
|
||||||
|
'Market data farm connection is OK:usfarm'
|
||||||
|
)
|
||||||
|
await rt_ev.wait()
|
||||||
|
cs.cancel() # cancel called should now be set
|
||||||
|
|
||||||
|
nurse.start_soon(reset_on_feed)
|
||||||
|
|
||||||
async with aclosing(stream):
|
async with aclosing(stream):
|
||||||
if syminfo.get('no_vlm', False):
|
if syminfo.get('no_vlm', False):
|
||||||
|
|
||||||
|
@ -772,29 +821,31 @@ async def stream_quotes(
|
||||||
# include vlm data.
|
# include vlm data.
|
||||||
atype = syminfo['asset_type']
|
atype = syminfo['asset_type']
|
||||||
log.info(
|
log.info(
|
||||||
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
|
f'No-vlm {sym}@{atype}, skipping quote poll'
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# wait for real volume on feed (trading might be closed)
|
# wait for real volume on feed (trading might be
|
||||||
|
# closed)
|
||||||
while True:
|
while True:
|
||||||
ticker = await stream.receive()
|
ticker = await stream.receive()
|
||||||
|
|
||||||
# for a real volume contract we rait for the first
|
# for a real volume contract we rait for
|
||||||
# "real" trade to take place
|
# the first "real" trade to take place
|
||||||
if (
|
if (
|
||||||
# not calc_price
|
# not calc_price
|
||||||
# and not ticker.rtTime
|
# and not ticker.rtTime
|
||||||
not ticker.rtTime
|
not ticker.rtTime
|
||||||
):
|
):
|
||||||
# spin consuming tickers until we get a real
|
# spin consuming tickers until we
|
||||||
# market datum
|
# get a real market datum
|
||||||
log.debug(f"New unsent ticker: {ticker}")
|
log.debug(f"New unsent ticker: {ticker}")
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
log.debug("Received first real volume tick")
|
log.debug("Received first volume tick")
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've
|
||||||
# (ahem, ib_insync is truly stateful trash)
|
# consumed them (ahem, ib_insync is
|
||||||
|
# truly stateful trash)
|
||||||
ticker.ticks = []
|
ticker.ticks = []
|
||||||
|
|
||||||
# XXX: this works because we don't use
|
# XXX: this works because we don't use
|
||||||
|
@ -922,7 +973,14 @@ async def open_symbol_search(
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if not pattern or pattern.isspace():
|
if (
|
||||||
|
not pattern
|
||||||
|
or pattern.isspace()
|
||||||
|
|
||||||
|
# XXX: not sure if this is a bad assumption but it
|
||||||
|
# seems to make search snappier?
|
||||||
|
or len(pattern) < 1
|
||||||
|
):
|
||||||
log.warning('empty pattern received, skipping..')
|
log.warning('empty pattern received, skipping..')
|
||||||
|
|
||||||
# TODO: *BUG* if nothing is returned here the client
|
# TODO: *BUG* if nothing is returned here the client
|
||||||
|
|
|
@ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir):
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def services(config, tl, names):
|
def services(config, tl, names):
|
||||||
|
|
||||||
async def list_services():
|
from .._daemon import open_piker_runtime
|
||||||
|
|
||||||
async with tractor.get_arbiter(
|
async def list_services():
|
||||||
|
async with (
|
||||||
|
open_piker_runtime(
|
||||||
|
name='service_query',
|
||||||
|
loglevel=config['loglevel'] if tl else None,
|
||||||
|
),
|
||||||
|
tractor.get_arbiter(
|
||||||
*_tractor_kwargs['arbiter_addr']
|
*_tractor_kwargs['arbiter_addr']
|
||||||
) as portal:
|
) as portal
|
||||||
|
):
|
||||||
registry = await portal.run_from_ns('self', 'get_registry')
|
registry = await portal.run_from_ns('self', 'get_registry')
|
||||||
json_d = {}
|
json_d = {}
|
||||||
for key, socket in registry.items():
|
for key, socket in registry.items():
|
||||||
# name, uuid = uid
|
|
||||||
host, port = socket
|
host, port = socket
|
||||||
json_d[key] = f'{host}:{port}'
|
json_d[key] = f'{host}:{port}'
|
||||||
click.echo(f"{colorize_json(json_d)}")
|
click.echo(f"{colorize_json(json_d)}")
|
||||||
|
|
||||||
tractor.run(
|
trio.run(list_services)
|
||||||
list_services,
|
|
||||||
name='service_query',
|
|
||||||
loglevel=config['loglevel'] if tl else None,
|
|
||||||
arbiter_addr=_tractor_kwargs['arbiter_addr'],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _load_clis() -> None:
|
def _load_clis() -> None:
|
||||||
|
|
|
@ -333,7 +333,7 @@ async def start_backfill(
|
||||||
# do a decently sized backfill and load it into storage.
|
# do a decently sized backfill and load it into storage.
|
||||||
periods = {
|
periods = {
|
||||||
1: {'days': 6},
|
1: {'days': 6},
|
||||||
60: {'years': 10},
|
60: {'years': 6},
|
||||||
}
|
}
|
||||||
|
|
||||||
kwargs = periods[step_size_s]
|
kwargs = periods[step_size_s]
|
||||||
|
@ -348,36 +348,45 @@ async def start_backfill(
|
||||||
# last retrieved start dt to the next request as
|
# last retrieved start dt to the next request as
|
||||||
# it's end dt.
|
# it's end dt.
|
||||||
starts: set[datetime] = set()
|
starts: set[datetime] = set()
|
||||||
|
|
||||||
while start_dt > last_tsdb_dt:
|
while start_dt > last_tsdb_dt:
|
||||||
|
|
||||||
print(f"QUERY end_dt={start_dt}")
|
|
||||||
try:
|
try:
|
||||||
log.info(
|
log.info(
|
||||||
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
||||||
)
|
)
|
||||||
array, start_dt, end_dt = await hist(
|
array, next_start_dt, end_dt = await hist(
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=start_dt,
|
end_dt=start_dt,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if next_start_dt in starts:
|
||||||
|
start_dt = min(starts)
|
||||||
|
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# only update new start point if new
|
||||||
|
start_dt = next_start_dt
|
||||||
|
starts.add(start_dt)
|
||||||
|
|
||||||
assert array['time'][0] == start_dt.timestamp()
|
assert array['time'][0] == start_dt.timestamp()
|
||||||
|
|
||||||
except NoData:
|
except NoData:
|
||||||
|
# XXX: unhandled history gap (shouldn't happen?)
|
||||||
log.warning(
|
log.warning(
|
||||||
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
||||||
)
|
)
|
||||||
return None # discard signal
|
await tractor.breakpoint()
|
||||||
|
|
||||||
except DataUnavailable as duerr:
|
except DataUnavailable: # as duerr:
|
||||||
# broker is being a bish and we can't pull
|
# broker is being a bish and we can't pull any more..
|
||||||
# any more..
|
log.warning(
|
||||||
log.warning('backend halted on data deliver !?!?')
|
f'NO-MORE-DATA: backend {mod.name} halted history!?'
|
||||||
|
)
|
||||||
|
|
||||||
# ugh, what's a better way?
|
# ugh, what's a better way?
|
||||||
# TODO: fwiw, we probably want a way to signal a throttle
|
# TODO: fwiw, we probably want a way to signal a throttle
|
||||||
# condition (eg. with ib) so that we can halt the
|
# condition (eg. with ib) so that we can halt the
|
||||||
# request loop until the condition is resolved?
|
# request loop until the condition is resolved?
|
||||||
return duerr
|
return
|
||||||
|
|
||||||
diff = end_dt - start_dt
|
diff = end_dt - start_dt
|
||||||
frame_time_diff_s = diff.seconds
|
frame_time_diff_s = diff.seconds
|
||||||
|
@ -394,22 +403,6 @@ async def start_backfill(
|
||||||
f'{diff} ~= {frame_time_diff_s} seconds'
|
f'{diff} ~= {frame_time_diff_s} seconds'
|
||||||
)
|
)
|
||||||
|
|
||||||
array, _start_dt, end_dt = await hist(
|
|
||||||
timeframe,
|
|
||||||
end_dt=start_dt,
|
|
||||||
)
|
|
||||||
|
|
||||||
if (
|
|
||||||
_start_dt in starts
|
|
||||||
):
|
|
||||||
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
|
|
||||||
start_dt = min(starts)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# only update new start point if new
|
|
||||||
start_dt = _start_dt
|
|
||||||
starts.add(start_dt)
|
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
@ -498,10 +491,10 @@ async def manage_history(
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
# TODO: history validation
|
# TODO: history validation
|
||||||
if not opened:
|
# if not opened:
|
||||||
raise RuntimeError(
|
# raise RuntimeError(
|
||||||
"Persistent shm for sym was already open?!"
|
# "Persistent shm for sym was already open?!"
|
||||||
)
|
# )
|
||||||
|
|
||||||
rt_shm, opened = maybe_open_shm_array(
|
rt_shm, opened = maybe_open_shm_array(
|
||||||
key=f'{fqsn}_rt',
|
key=f'{fqsn}_rt',
|
||||||
|
@ -513,10 +506,10 @@ async def manage_history(
|
||||||
readonly=False,
|
readonly=False,
|
||||||
size=3*_secs_in_day,
|
size=3*_secs_in_day,
|
||||||
)
|
)
|
||||||
if not opened:
|
# if not opened:
|
||||||
raise RuntimeError(
|
# raise RuntimeError(
|
||||||
"Persistent shm for sym was already open?!"
|
# "Persistent shm for sym was already open?!"
|
||||||
)
|
# )
|
||||||
|
|
||||||
log.info('Scanning for existing `marketstored`')
|
log.info('Scanning for existing `marketstored`')
|
||||||
|
|
||||||
|
|
67
piker/pp.py
67
piker/pp.py
|
@ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
|
||||||
(looking at you `ib` and dirt-bird friends)
|
(looking at you `ib` and dirt-bird friends)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
from contextlib import contextmanager as cm
|
from contextlib import contextmanager as cm
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import os
|
import os
|
||||||
|
@ -138,13 +139,31 @@ class Position(Struct):
|
||||||
|
|
||||||
# ordered record of known constituent trade messages
|
# ordered record of known constituent trade messages
|
||||||
clears: dict[
|
clears: dict[
|
||||||
Union[str, int, Status], # trade id
|
str | int, # trade id
|
||||||
dict[str, Any], # transaction history summaries
|
dict[str, Any], # transaction history summaries
|
||||||
] = {}
|
] = {}
|
||||||
first_clear_dt: Optional[datetime] = None
|
first_clear_dt: Optional[datetime] = None
|
||||||
|
|
||||||
expiry: Optional[datetime] = None
|
expiry: Optional[datetime] = None
|
||||||
|
|
||||||
|
# @property
|
||||||
|
# def clears(self) -> dict[
|
||||||
|
# Union[str, int, Status], # trade id
|
||||||
|
# dict[str, Any], # transaction history summaries
|
||||||
|
# ]:
|
||||||
|
# '''
|
||||||
|
# Datetime sorted reference to internal clears table.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# # self._clears = {}
|
||||||
|
# self._clears = dict(sorted(
|
||||||
|
# self._clears.items(),
|
||||||
|
# key=lambda entry: entry[1]['dt'],
|
||||||
|
# ))
|
||||||
|
# # self._clears[k] = v
|
||||||
|
|
||||||
|
# return self._clears
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> dict:
|
||||||
return {
|
return {
|
||||||
f: getattr(self, f)
|
f: getattr(self, f)
|
||||||
|
@ -219,6 +238,10 @@ class Position(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
clears = list(self.clears.values())
|
clears = list(self.clears.values())
|
||||||
|
if not clears:
|
||||||
|
log.warning(f'No clears table for {self.symbol}!?')
|
||||||
|
return
|
||||||
|
|
||||||
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
|
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
|
||||||
last_clear = clears[-1]
|
last_clear = clears[-1]
|
||||||
|
|
||||||
|
@ -623,6 +646,7 @@ class PpTable(Struct):
|
||||||
|
|
||||||
def to_toml(
|
def to_toml(
|
||||||
self,
|
self,
|
||||||
|
min_clears: bool = True,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
|
|
||||||
active, closed = self.dump_active()
|
active, closed = self.dump_active()
|
||||||
|
@ -635,7 +659,9 @@ class PpTable(Struct):
|
||||||
|
|
||||||
# keep the minimal amount of clears that make up this
|
# keep the minimal amount of clears that make up this
|
||||||
# position since the last net-zero state.
|
# position since the last net-zero state.
|
||||||
|
if min_clears:
|
||||||
pos.minimize_clears()
|
pos.minimize_clears()
|
||||||
|
|
||||||
pos.ensure_state()
|
pos.ensure_state()
|
||||||
|
|
||||||
# serialize to pre-toml form
|
# serialize to pre-toml form
|
||||||
|
@ -682,6 +708,8 @@ def load_pps_from_ledger(
|
||||||
brokername: str,
|
brokername: str,
|
||||||
acctname: str,
|
acctname: str,
|
||||||
|
|
||||||
|
table: Optional[PpTable] = None,
|
||||||
|
|
||||||
# post normalization filter on ledger entries to be processed
|
# post normalization filter on ledger entries to be processed
|
||||||
filter_by: Optional[list[dict]] = None,
|
filter_by: Optional[list[dict]] = None,
|
||||||
|
|
||||||
|
@ -698,7 +726,6 @@ def load_pps_from_ledger(
|
||||||
'''
|
'''
|
||||||
with (
|
with (
|
||||||
open_trade_ledger(brokername, acctname) as ledger,
|
open_trade_ledger(brokername, acctname) as ledger,
|
||||||
open_pps(brokername, acctname) as table,
|
|
||||||
):
|
):
|
||||||
if not ledger:
|
if not ledger:
|
||||||
# null case, no ledger file with content
|
# null case, no ledger file with content
|
||||||
|
@ -716,6 +743,10 @@ def load_pps_from_ledger(
|
||||||
else:
|
else:
|
||||||
records = src_records
|
records = src_records
|
||||||
|
|
||||||
|
if table is None:
|
||||||
|
with open_pps(brokername, acctname) as table:
|
||||||
|
updated = table.update_from_trans(records)
|
||||||
|
else:
|
||||||
updated = table.update_from_trans(records)
|
updated = table.update_from_trans(records)
|
||||||
|
|
||||||
return records, updated
|
return records, updated
|
||||||
|
@ -886,15 +917,27 @@ def open_pps(
|
||||||
conf=conf,
|
conf=conf,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# first pass populate all missing clears record tables
|
||||||
|
# for fqsn, entry in pps.items():
|
||||||
|
# # convert clears sub-tables (only in this form
|
||||||
|
# # for toml re-presentation) back into a master table.
|
||||||
|
# clears_list = entry.get('clears', [])
|
||||||
|
|
||||||
|
# # attempt to reload from ledger
|
||||||
|
# if not clears_list:
|
||||||
|
# trans, pos = load_pps_from_ledger(
|
||||||
|
# brokername,
|
||||||
|
# acctid,
|
||||||
|
# filter_by=[entry['bsuid']],
|
||||||
|
# table=table,
|
||||||
|
# )
|
||||||
|
# # breakpoint()
|
||||||
|
|
||||||
# unmarshal/load ``pps.toml`` config entries into object form
|
# unmarshal/load ``pps.toml`` config entries into object form
|
||||||
# and update `PpTable` obj entries.
|
# and update `PpTable` obj entries.
|
||||||
for fqsn, entry in pps.items():
|
for fqsn, entry in pps.items():
|
||||||
bsuid = entry['bsuid']
|
bsuid = entry['bsuid']
|
||||||
|
|
||||||
# convert clears sub-tables (only in this form
|
|
||||||
# for toml re-presentation) back into a master table.
|
|
||||||
clears_list = entry['clears']
|
|
||||||
|
|
||||||
# index clears entries in "object" form by tid in a top
|
# index clears entries in "object" form by tid in a top
|
||||||
# level dict instead of a list (as is presented in our
|
# level dict instead of a list (as is presented in our
|
||||||
# ``pps.toml``).
|
# ``pps.toml``).
|
||||||
|
@ -906,6 +949,18 @@ def open_pps(
|
||||||
# processing of new clear events.
|
# processing of new clear events.
|
||||||
trans: list[Transaction] = []
|
trans: list[Transaction] = []
|
||||||
|
|
||||||
|
# convert clears sub-tables (only in this form
|
||||||
|
# for toml re-presentation) back into a master table.
|
||||||
|
clears_list = entry['clears']
|
||||||
|
|
||||||
|
# # attempt to reload from ledger
|
||||||
|
# if not clears_list:
|
||||||
|
# trans, pos = load_pps_from_ledger(
|
||||||
|
# brokername,
|
||||||
|
# acctid,
|
||||||
|
# table=table,
|
||||||
|
# )
|
||||||
|
|
||||||
for clears_table in clears_list:
|
for clears_table in clears_list:
|
||||||
tid = clears_table.pop('tid')
|
tid = clears_table.pop('tid')
|
||||||
dtstr = clears_table['dt']
|
dtstr = clears_table['dt']
|
||||||
|
|
Loading…
Reference in New Issue