Compare commits
No commits in common. "c53071e43a291251924f42cba84d8e01672b729f" and "96f5a8abb878c391c875e6c0b3853e397c8d50d2" have entirely different histories.
c53071e43a
...
96f5a8abb8
|
@ -195,8 +195,9 @@ async def open_piker_runtime(
|
||||||
|
|
||||||
) -> Optional[tractor._portal.Portal]:
|
) -> Optional[tractor._portal.Portal]:
|
||||||
'''
|
'''
|
||||||
Start a piker actor who's runtime will automatically sync with
|
Start a piker actor who's runtime will automatically
|
||||||
existing piker actors on the local link based on configuration.
|
sync with existing piker actors in local network
|
||||||
|
based on configuration.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _services
|
global _services
|
||||||
|
|
|
@ -412,7 +412,7 @@ class Client:
|
||||||
# ``end_dt`` which exceeds the ``duration``,
|
# ``end_dt`` which exceeds the ``duration``,
|
||||||
# - a timeout occurred in which case insync internals return
|
# - a timeout occurred in which case insync internals return
|
||||||
# an empty list thing with bars.clear()...
|
# an empty list thing with bars.clear()...
|
||||||
return [], np.empty(0), dt_duration
|
return [], np.empty(0)
|
||||||
# TODO: we could maybe raise ``NoData`` instead if we
|
# TODO: we could maybe raise ``NoData`` instead if we
|
||||||
# rewrite the method in the first case? right now there's no
|
# rewrite the method in the first case? right now there's no
|
||||||
# way to detect a timeout.
|
# way to detect a timeout.
|
||||||
|
@ -483,7 +483,7 @@ class Client:
|
||||||
self,
|
self,
|
||||||
pattern: str,
|
pattern: str,
|
||||||
# how many contracts to search "up to"
|
# how many contracts to search "up to"
|
||||||
upto: int = 16,
|
upto: int = 6,
|
||||||
asdicts: bool = True,
|
asdicts: bool = True,
|
||||||
|
|
||||||
) -> dict[str, ContractDetails]:
|
) -> dict[str, ContractDetails]:
|
||||||
|
@ -518,16 +518,6 @@ class Client:
|
||||||
|
|
||||||
exch = tract.exchange
|
exch = tract.exchange
|
||||||
if exch not in _exch_skip_list:
|
if exch not in _exch_skip_list:
|
||||||
|
|
||||||
# try to lookup any contracts from our adhoc set
|
|
||||||
# since often the exchange/venue is named slightly
|
|
||||||
# different (eg. BRR.CMECRYPTO` instead of just
|
|
||||||
# `.CME`).
|
|
||||||
info = _adhoc_symbol_map.get(sym)
|
|
||||||
if info:
|
|
||||||
con_kwargs, bars_kwargs = info
|
|
||||||
exch = con_kwargs['exchange']
|
|
||||||
|
|
||||||
# try get all possible contracts for symbol as per,
|
# try get all possible contracts for symbol as per,
|
||||||
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
|
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
|
||||||
con = ibis.Future(
|
con = ibis.Future(
|
||||||
|
@ -1096,7 +1086,6 @@ async def load_aio_clients(
|
||||||
# retry a few times to get the client going..
|
# retry a few times to get the client going..
|
||||||
connect_retries: int = 3,
|
connect_retries: int = 3,
|
||||||
connect_timeout: float = 0.5,
|
connect_timeout: float = 0.5,
|
||||||
disconnect_on_exit: bool = True,
|
|
||||||
|
|
||||||
) -> dict[str, Client]:
|
) -> dict[str, Client]:
|
||||||
'''
|
'''
|
||||||
|
@ -1238,11 +1227,10 @@ async def load_aio_clients(
|
||||||
finally:
|
finally:
|
||||||
# TODO: for re-scans we'll want to not teardown clients which
|
# TODO: for re-scans we'll want to not teardown clients which
|
||||||
# are up and stable right?
|
# are up and stable right?
|
||||||
if disconnect_on_exit:
|
for acct, client in _accounts2clients.items():
|
||||||
for acct, client in _accounts2clients.items():
|
log.info(f'Disconnecting {acct}@{client}')
|
||||||
log.info(f'Disconnecting {acct}@{client}')
|
client.ib.disconnect()
|
||||||
client.ib.disconnect()
|
_client_cache.pop((host, port), None)
|
||||||
_client_cache.pop((host, port), None)
|
|
||||||
|
|
||||||
|
|
||||||
async def load_clients_for_trio(
|
async def load_clients_for_trio(
|
||||||
|
|
|
@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
|
||||||
entry['listingExchange'] = pexch
|
entry['listingExchange'] = pexch
|
||||||
|
|
||||||
conf = get_config()
|
conf = get_config()
|
||||||
entries = api_trades_to_ledger_entries(
|
entries = trades_to_ledger_entries(
|
||||||
conf['accounts'].inverse,
|
conf['accounts'].inverse,
|
||||||
trade_entries,
|
trade_entries,
|
||||||
)
|
)
|
||||||
|
@ -371,8 +371,8 @@ async def update_and_audit_msgs(
|
||||||
else:
|
else:
|
||||||
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
||||||
|
|
||||||
raise ValueError(
|
# raise ValueError(
|
||||||
# log.error(
|
log.error(
|
||||||
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
||||||
f'ib: {ibppmsg}\n'
|
f'ib: {ibppmsg}\n'
|
||||||
f'piker: {msg}\n'
|
f'piker: {msg}\n'
|
||||||
|
@ -1123,16 +1123,18 @@ def norm_trade_records(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# timestamping is way different in API records
|
# timestamping is way different in API records
|
||||||
dtstr = record.get('datetime')
|
|
||||||
date = record.get('date')
|
date = record.get('date')
|
||||||
flex_dtstr = record.get('dateTime')
|
if not date:
|
||||||
|
|
||||||
if dtstr or date:
|
|
||||||
dt = pendulum.parse(dtstr or date)
|
|
||||||
|
|
||||||
elif flex_dtstr:
|
|
||||||
# probably a flex record with a wonky non-std timestamp..
|
# probably a flex record with a wonky non-std timestamp..
|
||||||
dt = parse_flex_dt(record['dateTime'])
|
date, ts = record['dateTime'].split(';')
|
||||||
|
dt = pendulum.parse(date)
|
||||||
|
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
||||||
|
tsdt = pendulum.parse(ts)
|
||||||
|
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# epoch_dt = pendulum.from_timestamp(record.get('time'))
|
||||||
|
dt = pendulum.parse(date)
|
||||||
|
|
||||||
# special handling of symbol extraction from
|
# special handling of symbol extraction from
|
||||||
# flex records using some ad-hoc schema parsing.
|
# flex records using some ad-hoc schema parsing.
|
||||||
|
@ -1181,58 +1183,69 @@ def norm_trade_records(
|
||||||
return {r.tid: r for r in records}
|
return {r.tid: r for r in records}
|
||||||
|
|
||||||
|
|
||||||
def parse_flex_dt(
|
def trades_to_ledger_entries(
|
||||||
record: str,
|
|
||||||
) -> pendulum.datetime:
|
|
||||||
date, ts = record.split(';')
|
|
||||||
dt = pendulum.parse(date)
|
|
||||||
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
|
||||||
tsdt = pendulum.parse(ts)
|
|
||||||
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
|
||||||
|
|
||||||
|
|
||||||
def api_trades_to_ledger_entries(
|
|
||||||
accounts: bidict,
|
accounts: bidict,
|
||||||
trade_entries: list[object],
|
trade_entries: list[object],
|
||||||
|
source_type: str = 'api',
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
'''
|
'''
|
||||||
Convert API execution objects entry objects into ``dict`` form,
|
Convert either of API execution objects or flex report
|
||||||
pretty much straight up without modification except add
|
entry objects into ``dict`` form, pretty much straight up
|
||||||
a `pydatetime` field from the parsed timestamp.
|
without modification.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
trades_by_account = {}
|
trades_by_account = {}
|
||||||
|
|
||||||
for t in trade_entries:
|
for t in trade_entries:
|
||||||
# NOTE: example of schema we pull from the API client.
|
if source_type == 'flex':
|
||||||
# {
|
entry = t.__dict__
|
||||||
# 'commissionReport': CommissionReport(...
|
|
||||||
# 'contract': {...
|
|
||||||
# 'execution': Execution(...
|
|
||||||
# 'time': 1654801166.0
|
|
||||||
# }
|
|
||||||
|
|
||||||
# flatten all sub-dicts and values into one top level entry.
|
# XXX: LOL apparently ``toml`` has a bug
|
||||||
entry = {}
|
# where a section key error will show up in the write
|
||||||
for section, val in t.items():
|
# if you leave a table key as an `int`? So i guess
|
||||||
match section:
|
# cast to strs for all keys..
|
||||||
case 'contract' | 'execution' | 'commissionReport':
|
|
||||||
# sub-dict cases
|
|
||||||
entry.update(val)
|
|
||||||
|
|
||||||
case 'time':
|
# oddly for some so-called "BookTrade" entries
|
||||||
# ib has wack ns timestamps, or is that us?
|
# this field seems to be blank, no cuckin clue.
|
||||||
continue
|
# trade['ibExecID']
|
||||||
|
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
||||||
|
# date = str(entry['tradeDate'])
|
||||||
|
|
||||||
case _:
|
# XXX: is it going to cause problems if a account name
|
||||||
entry[section] = val
|
# get's lost? The user should be able to find it based
|
||||||
|
# on the actual exec history right?
|
||||||
|
acctid = accounts[str(entry['accountId'])]
|
||||||
|
|
||||||
tid = str(entry['execId'])
|
elif source_type == 'api':
|
||||||
dt = pendulum.from_timestamp(entry['time'])
|
# NOTE: example of schema we pull from the API client.
|
||||||
# TODO: why isn't this showing seconds in the str?
|
# {
|
||||||
entry['pydatetime'] = dt
|
# 'commissionReport': CommissionReport(...
|
||||||
entry['datetime'] = str(dt)
|
# 'contract': {...
|
||||||
acctid = accounts[entry['acctNumber']]
|
# 'execution': Execution(...
|
||||||
|
# 'time': 1654801166.0
|
||||||
|
# }
|
||||||
|
|
||||||
|
# flatten all sub-dicts and values into one top level entry.
|
||||||
|
entry = {}
|
||||||
|
for section, val in t.items():
|
||||||
|
match section:
|
||||||
|
case 'contract' | 'execution' | 'commissionReport':
|
||||||
|
# sub-dict cases
|
||||||
|
entry.update(val)
|
||||||
|
|
||||||
|
case 'time':
|
||||||
|
# ib has wack ns timestamps, or is that us?
|
||||||
|
continue
|
||||||
|
|
||||||
|
case _:
|
||||||
|
entry[section] = val
|
||||||
|
|
||||||
|
tid = str(entry['execId'])
|
||||||
|
dt = pendulum.from_timestamp(entry['time'])
|
||||||
|
# TODO: why isn't this showing seconds in the str?
|
||||||
|
entry['date'] = str(dt)
|
||||||
|
acctid = accounts[entry['acctNumber']]
|
||||||
|
|
||||||
if not tid:
|
if not tid:
|
||||||
# this is likely some kind of internal adjustment
|
# this is likely some kind of internal adjustment
|
||||||
|
@ -1250,73 +1263,6 @@ def api_trades_to_ledger_entries(
|
||||||
acctid, {}
|
acctid, {}
|
||||||
)[tid] = entry
|
)[tid] = entry
|
||||||
|
|
||||||
# sort entries in output by python based datetime
|
|
||||||
for acctid in trades_by_account:
|
|
||||||
trades_by_account[acctid] = dict(sorted(
|
|
||||||
trades_by_account[acctid].items(),
|
|
||||||
key=lambda entry: entry[1].pop('pydatetime'),
|
|
||||||
))
|
|
||||||
|
|
||||||
return trades_by_account
|
|
||||||
|
|
||||||
|
|
||||||
def flex_records_to_ledger_entries(
|
|
||||||
accounts: bidict,
|
|
||||||
trade_entries: list[object],
|
|
||||||
|
|
||||||
) -> dict:
|
|
||||||
'''
|
|
||||||
Convert flex report entry objects into ``dict`` form, pretty much
|
|
||||||
straight up without modification except add a `pydatetime` field
|
|
||||||
from the parsed timestamp.
|
|
||||||
|
|
||||||
'''
|
|
||||||
trades_by_account = {}
|
|
||||||
for t in trade_entries:
|
|
||||||
entry = t.__dict__
|
|
||||||
|
|
||||||
# XXX: LOL apparently ``toml`` has a bug
|
|
||||||
# where a section key error will show up in the write
|
|
||||||
# if you leave a table key as an `int`? So i guess
|
|
||||||
# cast to strs for all keys..
|
|
||||||
|
|
||||||
# oddly for some so-called "BookTrade" entries
|
|
||||||
# this field seems to be blank, no cuckin clue.
|
|
||||||
# trade['ibExecID']
|
|
||||||
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
|
||||||
# date = str(entry['tradeDate'])
|
|
||||||
|
|
||||||
# XXX: is it going to cause problems if a account name
|
|
||||||
# get's lost? The user should be able to find it based
|
|
||||||
# on the actual exec history right?
|
|
||||||
acctid = accounts[str(entry['accountId'])]
|
|
||||||
|
|
||||||
# probably a flex record with a wonky non-std timestamp..
|
|
||||||
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
|
|
||||||
entry['datetime'] = str(dt)
|
|
||||||
|
|
||||||
if not tid:
|
|
||||||
# this is likely some kind of internal adjustment
|
|
||||||
# transaction, likely one of the following:
|
|
||||||
# - an expiry event that will show a "book trade" indicating
|
|
||||||
# some adjustment to cash balances: zeroing or itm settle.
|
|
||||||
# - a manual cash balance position adjustment likely done by
|
|
||||||
# the user from the accounts window in TWS where they can
|
|
||||||
# manually set the avg price and size:
|
|
||||||
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
|
|
||||||
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
|
|
||||||
continue
|
|
||||||
|
|
||||||
trades_by_account.setdefault(
|
|
||||||
acctid, {}
|
|
||||||
)[tid] = entry
|
|
||||||
|
|
||||||
for acctid in trades_by_account:
|
|
||||||
trades_by_account[acctid] = dict(sorted(
|
|
||||||
trades_by_account[acctid].items(),
|
|
||||||
key=lambda entry: entry[1]['pydatetime'],
|
|
||||||
))
|
|
||||||
|
|
||||||
return trades_by_account
|
return trades_by_account
|
||||||
|
|
||||||
|
|
||||||
|
@ -1363,16 +1309,15 @@ def load_flex_trades(
|
||||||
ln = len(trade_entries)
|
ln = len(trade_entries)
|
||||||
log.info(f'Loaded {ln} trades from flex query')
|
log.info(f'Loaded {ln} trades from flex query')
|
||||||
|
|
||||||
trades_by_account = flex_records_to_ledger_entries(
|
trades_by_account = trades_to_ledger_entries(
|
||||||
conf['accounts'].inverse, # reverse map to user account names
|
# get reverse map to user account names
|
||||||
|
conf['accounts'].inverse,
|
||||||
trade_entries,
|
trade_entries,
|
||||||
|
source_type='flex',
|
||||||
)
|
)
|
||||||
|
|
||||||
ledger_dict: Optional[dict] = None
|
|
||||||
|
|
||||||
for acctid in trades_by_account:
|
for acctid in trades_by_account:
|
||||||
trades_by_id = trades_by_account[acctid]
|
trades_by_id = trades_by_account[acctid]
|
||||||
|
|
||||||
with open_trade_ledger('ib', acctid) as ledger_dict:
|
with open_trade_ledger('ib', acctid) as ledger_dict:
|
||||||
tid_delta = set(trades_by_id) - set(ledger_dict)
|
tid_delta = set(trades_by_id) - set(ledger_dict)
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -1380,11 +1325,9 @@ def load_flex_trades(
|
||||||
f'{pformat(tid_delta)}'
|
f'{pformat(tid_delta)}'
|
||||||
)
|
)
|
||||||
if tid_delta:
|
if tid_delta:
|
||||||
sorted_delta = dict(sorted(
|
ledger_dict.update(
|
||||||
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
|
{tid: trades_by_id[tid] for tid in tid_delta}
|
||||||
key=lambda entry: entry[1].pop('pydatetime'),
|
)
|
||||||
))
|
|
||||||
ledger_dict.update(sorted_delta)
|
|
||||||
|
|
||||||
return ledger_dict
|
return ledger_dict
|
||||||
|
|
||||||
|
|
|
@ -254,9 +254,6 @@ async def wait_on_data_reset(
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
_data_resetter_task: trio.Task | None = None
|
|
||||||
|
|
||||||
|
|
||||||
async def get_bars(
|
async def get_bars(
|
||||||
|
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
|
@ -267,7 +264,7 @@ async def get_bars(
|
||||||
end_dt: str = '',
|
end_dt: str = '',
|
||||||
|
|
||||||
# TODO: make this more dynamic based on measured frame rx latency..
|
# TODO: make this more dynamic based on measured frame rx latency..
|
||||||
timeout: float = 3, # how long before we trigger a feed reset
|
timeout: float = 1.5, # how long before we trigger a feed reset
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
@ -277,15 +274,13 @@ async def get_bars(
|
||||||
a ``MethoProxy``.
|
a ``MethoProxy``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _data_resetter_task
|
data_cs: Optional[trio.CancelScope] = None
|
||||||
|
result: Optional[tuple[
|
||||||
data_cs: trio.CancelScope | None = None
|
|
||||||
result: tuple[
|
|
||||||
ibis.objects.BarDataList,
|
ibis.objects.BarDataList,
|
||||||
np.ndarray,
|
np.ndarray,
|
||||||
datetime,
|
datetime,
|
||||||
datetime,
|
datetime,
|
||||||
] | None = None
|
]] = None
|
||||||
result_ready = trio.Event()
|
result_ready = trio.Event()
|
||||||
|
|
||||||
async def query():
|
async def query():
|
||||||
|
@ -312,7 +307,7 @@ async def get_bars(
|
||||||
log.warning(
|
log.warning(
|
||||||
f'History is blank for {dt_duration} from {end_dt}'
|
f'History is blank for {dt_duration} from {end_dt}'
|
||||||
)
|
)
|
||||||
end_dt -= dt_duration
|
end_dt = end_dt.subtract(dt_duration)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if bars_array is None:
|
if bars_array is None:
|
||||||
|
@ -405,10 +400,6 @@ async def get_bars(
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# TODO: make this global across all history task/requests
|
|
||||||
# such that simultaneous symbol queries don't try data resettingn
|
|
||||||
# too fast..
|
|
||||||
unset_resetter: bool = False
|
|
||||||
async with trio.open_nursery() as nurse:
|
async with trio.open_nursery() as nurse:
|
||||||
|
|
||||||
# start history request that we allow
|
# start history request that we allow
|
||||||
|
@ -423,14 +414,6 @@ async def get_bars(
|
||||||
await result_ready.wait()
|
await result_ready.wait()
|
||||||
break
|
break
|
||||||
|
|
||||||
if _data_resetter_task:
|
|
||||||
# don't double invoke the reset hack if another
|
|
||||||
# requester task already has it covered.
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
_data_resetter_task = trio.lowlevel.current_task()
|
|
||||||
unset_resetter = True
|
|
||||||
|
|
||||||
# spawn new data reset task
|
# spawn new data reset task
|
||||||
data_cs, reset_done = await nurse.start(
|
data_cs, reset_done = await nurse.start(
|
||||||
partial(
|
partial(
|
||||||
|
@ -442,7 +425,6 @@ async def get_bars(
|
||||||
# sync wait on reset to complete
|
# sync wait on reset to complete
|
||||||
await reset_done.wait()
|
await reset_done.wait()
|
||||||
|
|
||||||
_data_resetter_task = None if unset_resetter else _data_resetter_task
|
|
||||||
return result, data_cs is not None
|
return result, data_cs is not None
|
||||||
|
|
||||||
|
|
||||||
|
@ -501,9 +483,7 @@ async def _setup_quote_stream(
|
||||||
|
|
||||||
to_trio.send_nowait(None)
|
to_trio.send_nowait(None)
|
||||||
|
|
||||||
async with load_aio_clients(
|
async with load_aio_clients() as accts2clients:
|
||||||
disconnect_on_exit=False,
|
|
||||||
) as accts2clients:
|
|
||||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||||
contract = contract or (await client.find_contract(symbol))
|
contract = contract or (await client.find_contract(symbol))
|
||||||
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
||||||
|
@ -583,8 +563,7 @@ async def open_aio_quote_stream(
|
||||||
from_aio = _quote_streams.get(symbol)
|
from_aio = _quote_streams.get(symbol)
|
||||||
if from_aio:
|
if from_aio:
|
||||||
|
|
||||||
# if we already have a cached feed deliver a rx side clone
|
# if we already have a cached feed deliver a rx side clone to consumer
|
||||||
# to consumer
|
|
||||||
async with broadcast_receiver(
|
async with broadcast_receiver(
|
||||||
from_aio,
|
from_aio,
|
||||||
2**6,
|
2**6,
|
||||||
|
@ -775,97 +754,67 @@ async def stream_quotes(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
return # we never expect feed to come up?
|
return # we never expect feed to come up?
|
||||||
|
|
||||||
cs: Optional[trio.CancelScope] = None
|
async with open_aio_quote_stream(
|
||||||
startup: bool = True
|
symbol=sym,
|
||||||
while (
|
contract=con,
|
||||||
startup
|
) as stream:
|
||||||
or cs.cancel_called
|
|
||||||
):
|
|
||||||
with trio.CancelScope() as cs:
|
|
||||||
async with (
|
|
||||||
trio.open_nursery() as nurse,
|
|
||||||
open_aio_quote_stream(
|
|
||||||
symbol=sym,
|
|
||||||
contract=con,
|
|
||||||
) as stream,
|
|
||||||
):
|
|
||||||
# ugh, clear ticks since we've consumed them
|
|
||||||
# (ahem, ib_insync is stateful trash)
|
|
||||||
first_ticker.ticks = []
|
|
||||||
|
|
||||||
# only on first entry at feed boot up
|
# ugh, clear ticks since we've consumed them
|
||||||
if startup:
|
# (ahem, ib_insync is stateful trash)
|
||||||
startup = False
|
first_ticker.ticks = []
|
||||||
task_status.started((init_msgs, first_quote))
|
|
||||||
|
|
||||||
# start a stream restarter task which monitors the
|
task_status.started((init_msgs, first_quote))
|
||||||
# data feed event.
|
|
||||||
async def reset_on_feed():
|
|
||||||
|
|
||||||
# TODO: this seems to be surpressed from the
|
async with aclosing(stream):
|
||||||
# traceback in ``tractor``?
|
if syminfo.get('no_vlm', False):
|
||||||
# assert 0
|
|
||||||
|
|
||||||
rt_ev = proxy.status_event(
|
# generally speaking these feeds don't
|
||||||
'Market data farm connection is OK:usfarm'
|
# include vlm data.
|
||||||
)
|
atype = syminfo['asset_type']
|
||||||
await rt_ev.wait()
|
log.info(
|
||||||
cs.cancel() # cancel called should now be set
|
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
|
||||||
|
)
|
||||||
|
|
||||||
nurse.start_soon(reset_on_feed)
|
else:
|
||||||
|
# wait for real volume on feed (trading might be closed)
|
||||||
async with aclosing(stream):
|
while True:
|
||||||
if syminfo.get('no_vlm', False):
|
ticker = await stream.receive()
|
||||||
|
|
||||||
# generally speaking these feeds don't
|
|
||||||
# include vlm data.
|
|
||||||
atype = syminfo['asset_type']
|
|
||||||
log.info(
|
|
||||||
f'No-vlm {sym}@{atype}, skipping quote poll'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# for a real volume contract we rait for the first
|
||||||
|
# "real" trade to take place
|
||||||
|
if (
|
||||||
|
# not calc_price
|
||||||
|
# and not ticker.rtTime
|
||||||
|
not ticker.rtTime
|
||||||
|
):
|
||||||
|
# spin consuming tickers until we get a real
|
||||||
|
# market datum
|
||||||
|
log.debug(f"New unsent ticker: {ticker}")
|
||||||
|
continue
|
||||||
else:
|
else:
|
||||||
# wait for real volume on feed (trading might be
|
log.debug("Received first real volume tick")
|
||||||
# closed)
|
|
||||||
while True:
|
|
||||||
ticker = await stream.receive()
|
|
||||||
|
|
||||||
# for a real volume contract we rait for
|
|
||||||
# the first "real" trade to take place
|
|
||||||
if (
|
|
||||||
# not calc_price
|
|
||||||
# and not ticker.rtTime
|
|
||||||
not ticker.rtTime
|
|
||||||
):
|
|
||||||
# spin consuming tickers until we
|
|
||||||
# get a real market datum
|
|
||||||
log.debug(f"New unsent ticker: {ticker}")
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
log.debug("Received first volume tick")
|
|
||||||
# ugh, clear ticks since we've
|
|
||||||
# consumed them (ahem, ib_insync is
|
|
||||||
# truly stateful trash)
|
|
||||||
ticker.ticks = []
|
|
||||||
|
|
||||||
# XXX: this works because we don't use
|
|
||||||
# ``aclosing()`` above?
|
|
||||||
break
|
|
||||||
|
|
||||||
quote = normalize(ticker)
|
|
||||||
log.debug(f"First ticker received {quote}")
|
|
||||||
|
|
||||||
# tell caller quotes are now coming in live
|
|
||||||
feed_is_live.set()
|
|
||||||
|
|
||||||
# last = time.time()
|
|
||||||
async for ticker in stream:
|
|
||||||
quote = normalize(ticker)
|
|
||||||
await send_chan.send({quote['fqsn']: quote})
|
|
||||||
|
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
|
# (ahem, ib_insync is truly stateful trash)
|
||||||
ticker.ticks = []
|
ticker.ticks = []
|
||||||
# last = time.time()
|
|
||||||
|
# XXX: this works because we don't use
|
||||||
|
# ``aclosing()`` above?
|
||||||
|
break
|
||||||
|
|
||||||
|
quote = normalize(ticker)
|
||||||
|
log.debug(f"First ticker received {quote}")
|
||||||
|
|
||||||
|
# tell caller quotes are now coming in live
|
||||||
|
feed_is_live.set()
|
||||||
|
|
||||||
|
# last = time.time()
|
||||||
|
async for ticker in stream:
|
||||||
|
quote = normalize(ticker)
|
||||||
|
await send_chan.send({quote['fqsn']: quote})
|
||||||
|
|
||||||
|
# ugh, clear ticks since we've consumed them
|
||||||
|
ticker.ticks = []
|
||||||
|
# last = time.time()
|
||||||
|
|
||||||
|
|
||||||
async def data_reset_hack(
|
async def data_reset_hack(
|
||||||
|
@ -973,14 +922,7 @@ async def open_symbol_search(
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if (
|
if not pattern or pattern.isspace():
|
||||||
not pattern
|
|
||||||
or pattern.isspace()
|
|
||||||
|
|
||||||
# XXX: not sure if this is a bad assumption but it
|
|
||||||
# seems to make search snappier?
|
|
||||||
or len(pattern) < 1
|
|
||||||
):
|
|
||||||
log.warning('empty pattern received, skipping..')
|
log.warning('empty pattern received, skipping..')
|
||||||
|
|
||||||
# TODO: *BUG* if nothing is returned here the client
|
# TODO: *BUG* if nothing is returned here the client
|
||||||
|
|
|
@ -138,26 +138,25 @@ def cli(ctx, brokers, loglevel, tl, configdir):
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def services(config, tl, names):
|
def services(config, tl, names):
|
||||||
|
|
||||||
from .._daemon import open_piker_runtime
|
|
||||||
|
|
||||||
async def list_services():
|
async def list_services():
|
||||||
async with (
|
|
||||||
open_piker_runtime(
|
async with tractor.get_arbiter(
|
||||||
name='service_query',
|
*_tractor_kwargs['arbiter_addr']
|
||||||
loglevel=config['loglevel'] if tl else None,
|
) as portal:
|
||||||
),
|
|
||||||
tractor.get_arbiter(
|
|
||||||
*_tractor_kwargs['arbiter_addr']
|
|
||||||
) as portal
|
|
||||||
):
|
|
||||||
registry = await portal.run_from_ns('self', 'get_registry')
|
registry = await portal.run_from_ns('self', 'get_registry')
|
||||||
json_d = {}
|
json_d = {}
|
||||||
for key, socket in registry.items():
|
for key, socket in registry.items():
|
||||||
|
# name, uuid = uid
|
||||||
host, port = socket
|
host, port = socket
|
||||||
json_d[key] = f'{host}:{port}'
|
json_d[key] = f'{host}:{port}'
|
||||||
click.echo(f"{colorize_json(json_d)}")
|
click.echo(f"{colorize_json(json_d)}")
|
||||||
|
|
||||||
trio.run(list_services)
|
tractor.run(
|
||||||
|
list_services,
|
||||||
|
name='service_query',
|
||||||
|
loglevel=config['loglevel'] if tl else None,
|
||||||
|
arbiter_addr=_tractor_kwargs['arbiter_addr'],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _load_clis() -> None:
|
def _load_clis() -> None:
|
||||||
|
|
|
@ -333,7 +333,7 @@ async def start_backfill(
|
||||||
# do a decently sized backfill and load it into storage.
|
# do a decently sized backfill and load it into storage.
|
||||||
periods = {
|
periods = {
|
||||||
1: {'days': 6},
|
1: {'days': 6},
|
||||||
60: {'years': 6},
|
60: {'years': 10},
|
||||||
}
|
}
|
||||||
|
|
||||||
kwargs = periods[step_size_s]
|
kwargs = periods[step_size_s]
|
||||||
|
@ -348,45 +348,36 @@ async def start_backfill(
|
||||||
# last retrieved start dt to the next request as
|
# last retrieved start dt to the next request as
|
||||||
# it's end dt.
|
# it's end dt.
|
||||||
starts: set[datetime] = set()
|
starts: set[datetime] = set()
|
||||||
|
|
||||||
while start_dt > last_tsdb_dt:
|
while start_dt > last_tsdb_dt:
|
||||||
|
|
||||||
|
print(f"QUERY end_dt={start_dt}")
|
||||||
try:
|
try:
|
||||||
log.info(
|
log.info(
|
||||||
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
||||||
)
|
)
|
||||||
array, next_start_dt, end_dt = await hist(
|
array, start_dt, end_dt = await hist(
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=start_dt,
|
end_dt=start_dt,
|
||||||
)
|
)
|
||||||
|
|
||||||
if next_start_dt in starts:
|
|
||||||
start_dt = min(starts)
|
|
||||||
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# only update new start point if new
|
|
||||||
start_dt = next_start_dt
|
|
||||||
starts.add(start_dt)
|
|
||||||
|
|
||||||
assert array['time'][0] == start_dt.timestamp()
|
assert array['time'][0] == start_dt.timestamp()
|
||||||
|
|
||||||
except NoData:
|
except NoData:
|
||||||
# XXX: unhandled history gap (shouldn't happen?)
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
||||||
)
|
)
|
||||||
await tractor.breakpoint()
|
return None # discard signal
|
||||||
|
|
||||||
except DataUnavailable: # as duerr:
|
except DataUnavailable as duerr:
|
||||||
# broker is being a bish and we can't pull any more..
|
# broker is being a bish and we can't pull
|
||||||
log.warning(
|
# any more..
|
||||||
f'NO-MORE-DATA: backend {mod.name} halted history!?'
|
log.warning('backend halted on data deliver !?!?')
|
||||||
)
|
|
||||||
|
|
||||||
# ugh, what's a better way?
|
# ugh, what's a better way?
|
||||||
# TODO: fwiw, we probably want a way to signal a throttle
|
# TODO: fwiw, we probably want a way to signal a throttle
|
||||||
# condition (eg. with ib) so that we can halt the
|
# condition (eg. with ib) so that we can halt the
|
||||||
# request loop until the condition is resolved?
|
# request loop until the condition is resolved?
|
||||||
return
|
return duerr
|
||||||
|
|
||||||
diff = end_dt - start_dt
|
diff = end_dt - start_dt
|
||||||
frame_time_diff_s = diff.seconds
|
frame_time_diff_s = diff.seconds
|
||||||
|
@ -403,6 +394,22 @@ async def start_backfill(
|
||||||
f'{diff} ~= {frame_time_diff_s} seconds'
|
f'{diff} ~= {frame_time_diff_s} seconds'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
array, _start_dt, end_dt = await hist(
|
||||||
|
timeframe,
|
||||||
|
end_dt=start_dt,
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
_start_dt in starts
|
||||||
|
):
|
||||||
|
print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
|
||||||
|
start_dt = min(starts)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# only update new start point if new
|
||||||
|
start_dt = _start_dt
|
||||||
|
starts.add(start_dt)
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
@ -491,10 +498,10 @@ async def manage_history(
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
# TODO: history validation
|
# TODO: history validation
|
||||||
# if not opened:
|
if not opened:
|
||||||
# raise RuntimeError(
|
raise RuntimeError(
|
||||||
# "Persistent shm for sym was already open?!"
|
"Persistent shm for sym was already open?!"
|
||||||
# )
|
)
|
||||||
|
|
||||||
rt_shm, opened = maybe_open_shm_array(
|
rt_shm, opened = maybe_open_shm_array(
|
||||||
key=f'{fqsn}_rt',
|
key=f'{fqsn}_rt',
|
||||||
|
@ -506,10 +513,10 @@ async def manage_history(
|
||||||
readonly=False,
|
readonly=False,
|
||||||
size=3*_secs_in_day,
|
size=3*_secs_in_day,
|
||||||
)
|
)
|
||||||
# if not opened:
|
if not opened:
|
||||||
# raise RuntimeError(
|
raise RuntimeError(
|
||||||
# "Persistent shm for sym was already open?!"
|
"Persistent shm for sym was already open?!"
|
||||||
# )
|
)
|
||||||
|
|
||||||
log.info('Scanning for existing `marketstored`')
|
log.info('Scanning for existing `marketstored`')
|
||||||
|
|
||||||
|
|
71
piker/pp.py
71
piker/pp.py
|
@ -20,7 +20,6 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
|
||||||
(looking at you `ib` and dirt-bird friends)
|
(looking at you `ib` and dirt-bird friends)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
|
||||||
from contextlib import contextmanager as cm
|
from contextlib import contextmanager as cm
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import os
|
import os
|
||||||
|
@ -139,31 +138,13 @@ class Position(Struct):
|
||||||
|
|
||||||
# ordered record of known constituent trade messages
|
# ordered record of known constituent trade messages
|
||||||
clears: dict[
|
clears: dict[
|
||||||
str | int, # trade id
|
Union[str, int, Status], # trade id
|
||||||
dict[str, Any], # transaction history summaries
|
dict[str, Any], # transaction history summaries
|
||||||
] = {}
|
] = {}
|
||||||
first_clear_dt: Optional[datetime] = None
|
first_clear_dt: Optional[datetime] = None
|
||||||
|
|
||||||
expiry: Optional[datetime] = None
|
expiry: Optional[datetime] = None
|
||||||
|
|
||||||
# @property
|
|
||||||
# def clears(self) -> dict[
|
|
||||||
# Union[str, int, Status], # trade id
|
|
||||||
# dict[str, Any], # transaction history summaries
|
|
||||||
# ]:
|
|
||||||
# '''
|
|
||||||
# Datetime sorted reference to internal clears table.
|
|
||||||
|
|
||||||
# '''
|
|
||||||
# # self._clears = {}
|
|
||||||
# self._clears = dict(sorted(
|
|
||||||
# self._clears.items(),
|
|
||||||
# key=lambda entry: entry[1]['dt'],
|
|
||||||
# ))
|
|
||||||
# # self._clears[k] = v
|
|
||||||
|
|
||||||
# return self._clears
|
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> dict:
|
||||||
return {
|
return {
|
||||||
f: getattr(self, f)
|
f: getattr(self, f)
|
||||||
|
@ -238,10 +219,6 @@ class Position(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
clears = list(self.clears.values())
|
clears = list(self.clears.values())
|
||||||
if not clears:
|
|
||||||
log.warning(f'No clears table for {self.symbol}!?')
|
|
||||||
return
|
|
||||||
|
|
||||||
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
|
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
|
||||||
last_clear = clears[-1]
|
last_clear = clears[-1]
|
||||||
|
|
||||||
|
@ -646,7 +623,6 @@ class PpTable(Struct):
|
||||||
|
|
||||||
def to_toml(
|
def to_toml(
|
||||||
self,
|
self,
|
||||||
min_clears: bool = True,
|
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
|
|
||||||
active, closed = self.dump_active()
|
active, closed = self.dump_active()
|
||||||
|
@ -659,9 +635,7 @@ class PpTable(Struct):
|
||||||
|
|
||||||
# keep the minimal amount of clears that make up this
|
# keep the minimal amount of clears that make up this
|
||||||
# position since the last net-zero state.
|
# position since the last net-zero state.
|
||||||
if min_clears:
|
pos.minimize_clears()
|
||||||
pos.minimize_clears()
|
|
||||||
|
|
||||||
pos.ensure_state()
|
pos.ensure_state()
|
||||||
|
|
||||||
# serialize to pre-toml form
|
# serialize to pre-toml form
|
||||||
|
@ -708,8 +682,6 @@ def load_pps_from_ledger(
|
||||||
brokername: str,
|
brokername: str,
|
||||||
acctname: str,
|
acctname: str,
|
||||||
|
|
||||||
table: Optional[PpTable] = None,
|
|
||||||
|
|
||||||
# post normalization filter on ledger entries to be processed
|
# post normalization filter on ledger entries to be processed
|
||||||
filter_by: Optional[list[dict]] = None,
|
filter_by: Optional[list[dict]] = None,
|
||||||
|
|
||||||
|
@ -726,6 +698,7 @@ def load_pps_from_ledger(
|
||||||
'''
|
'''
|
||||||
with (
|
with (
|
||||||
open_trade_ledger(brokername, acctname) as ledger,
|
open_trade_ledger(brokername, acctname) as ledger,
|
||||||
|
open_pps(brokername, acctname) as table,
|
||||||
):
|
):
|
||||||
if not ledger:
|
if not ledger:
|
||||||
# null case, no ledger file with content
|
# null case, no ledger file with content
|
||||||
|
@ -743,11 +716,7 @@ def load_pps_from_ledger(
|
||||||
else:
|
else:
|
||||||
records = src_records
|
records = src_records
|
||||||
|
|
||||||
if table is None:
|
updated = table.update_from_trans(records)
|
||||||
with open_pps(brokername, acctname) as table:
|
|
||||||
updated = table.update_from_trans(records)
|
|
||||||
else:
|
|
||||||
updated = table.update_from_trans(records)
|
|
||||||
|
|
||||||
return records, updated
|
return records, updated
|
||||||
|
|
||||||
|
@ -917,27 +886,15 @@ def open_pps(
|
||||||
conf=conf,
|
conf=conf,
|
||||||
)
|
)
|
||||||
|
|
||||||
# first pass populate all missing clears record tables
|
|
||||||
# for fqsn, entry in pps.items():
|
|
||||||
# # convert clears sub-tables (only in this form
|
|
||||||
# # for toml re-presentation) back into a master table.
|
|
||||||
# clears_list = entry.get('clears', [])
|
|
||||||
|
|
||||||
# # attempt to reload from ledger
|
|
||||||
# if not clears_list:
|
|
||||||
# trans, pos = load_pps_from_ledger(
|
|
||||||
# brokername,
|
|
||||||
# acctid,
|
|
||||||
# filter_by=[entry['bsuid']],
|
|
||||||
# table=table,
|
|
||||||
# )
|
|
||||||
# # breakpoint()
|
|
||||||
|
|
||||||
# unmarshal/load ``pps.toml`` config entries into object form
|
# unmarshal/load ``pps.toml`` config entries into object form
|
||||||
# and update `PpTable` obj entries.
|
# and update `PpTable` obj entries.
|
||||||
for fqsn, entry in pps.items():
|
for fqsn, entry in pps.items():
|
||||||
bsuid = entry['bsuid']
|
bsuid = entry['bsuid']
|
||||||
|
|
||||||
|
# convert clears sub-tables (only in this form
|
||||||
|
# for toml re-presentation) back into a master table.
|
||||||
|
clears_list = entry['clears']
|
||||||
|
|
||||||
# index clears entries in "object" form by tid in a top
|
# index clears entries in "object" form by tid in a top
|
||||||
# level dict instead of a list (as is presented in our
|
# level dict instead of a list (as is presented in our
|
||||||
# ``pps.toml``).
|
# ``pps.toml``).
|
||||||
|
@ -949,18 +906,6 @@ def open_pps(
|
||||||
# processing of new clear events.
|
# processing of new clear events.
|
||||||
trans: list[Transaction] = []
|
trans: list[Transaction] = []
|
||||||
|
|
||||||
# convert clears sub-tables (only in this form
|
|
||||||
# for toml re-presentation) back into a master table.
|
|
||||||
clears_list = entry['clears']
|
|
||||||
|
|
||||||
# # attempt to reload from ledger
|
|
||||||
# if not clears_list:
|
|
||||||
# trans, pos = load_pps_from_ledger(
|
|
||||||
# brokername,
|
|
||||||
# acctid,
|
|
||||||
# table=table,
|
|
||||||
# )
|
|
||||||
|
|
||||||
for clears_table in clears_list:
|
for clears_table in clears_list:
|
||||||
tid = clears_table.pop('tid')
|
tid = clears_table.pop('tid')
|
||||||
dtstr = clears_table['dt']
|
dtstr = clears_table['dt']
|
||||||
|
|
Loading…
Reference in New Issue