Compare commits
23 Commits
gitea_feat
...
clears_tab
| Author | SHA1 | Date |
|---|---|---|
|
|
c53071e43a | |
|
|
41ffccc59e | |
|
|
55856f5e8b | |
|
|
e8ab28e456 | |
|
|
d2b6216994 | |
|
|
eb743759a4 | |
|
|
74910ba56c | |
|
|
28535fa977 | |
|
|
1d7e642dbd | |
|
|
69be65237f | |
|
|
96f5a8abb8 | |
|
|
13e886c967 | |
|
|
2c6b832e50 | |
|
|
d5c3124722 | |
|
|
c939e75ef9 | |
|
|
844f8beaa7 | |
|
|
ac7ba500be | |
|
|
3301619647 | |
|
|
7f498766af | |
|
|
9270391e01 | |
|
|
0c061d8957 | |
|
|
87f7a03dbe | |
|
|
1adf5fb9c0 |
|
|
@ -195,9 +195,8 @@ async def open_piker_runtime(
|
||||||
|
|
||||||
) -> Optional[tractor._portal.Portal]:
|
) -> Optional[tractor._portal.Portal]:
|
||||||
'''
|
'''
|
||||||
Start a piker actor who's runtime will automatically
|
Start a piker actor who's runtime will automatically sync with
|
||||||
sync with existing piker actors in local network
|
existing piker actors on the local link based on configuration.
|
||||||
based on configuration.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _services
|
global _services
|
||||||
|
|
|
||||||
|
|
@ -388,6 +388,7 @@ async def open_history_client(
|
||||||
async with open_cached_client('binance') as client:
|
async with open_cached_client('binance') as client:
|
||||||
|
|
||||||
async def get_ohlc(
|
async def get_ohlc(
|
||||||
|
timeframe: float,
|
||||||
end_dt: Optional[datetime] = None,
|
end_dt: Optional[datetime] = None,
|
||||||
start_dt: Optional[datetime] = None,
|
start_dt: Optional[datetime] = None,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ from bidict import bidict
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import to_asyncio
|
from tractor import to_asyncio
|
||||||
|
import pendulum
|
||||||
import ib_insync as ibis
|
import ib_insync as ibis
|
||||||
from ib_insync.contract import (
|
from ib_insync.contract import (
|
||||||
Contract,
|
Contract,
|
||||||
|
|
@ -52,6 +53,7 @@ from ib_insync.contract import (
|
||||||
from ib_insync.order import Order
|
from ib_insync.order import Order
|
||||||
from ib_insync.ticker import Ticker
|
from ib_insync.ticker import Ticker
|
||||||
from ib_insync.objects import (
|
from ib_insync.objects import (
|
||||||
|
BarDataList,
|
||||||
Position,
|
Position,
|
||||||
Fill,
|
Fill,
|
||||||
Execution,
|
Execution,
|
||||||
|
|
@ -78,26 +80,11 @@ _time_units = {
|
||||||
'h': ' hours',
|
'h': ' hours',
|
||||||
}
|
}
|
||||||
|
|
||||||
_time_frames = {
|
_bar_sizes = {
|
||||||
'1s': '1 Sec',
|
1: '1 Sec',
|
||||||
'5s': '5 Sec',
|
60: '1 min',
|
||||||
'30s': '30 Sec',
|
60*60: '1 hour',
|
||||||
'1m': 'OneMinute',
|
24*60*60: '1 day',
|
||||||
'2m': 'TwoMinutes',
|
|
||||||
'3m': 'ThreeMinutes',
|
|
||||||
'4m': 'FourMinutes',
|
|
||||||
'5m': 'FiveMinutes',
|
|
||||||
'10m': 'TenMinutes',
|
|
||||||
'15m': 'FifteenMinutes',
|
|
||||||
'20m': 'TwentyMinutes',
|
|
||||||
'30m': 'HalfHour',
|
|
||||||
'1h': 'OneHour',
|
|
||||||
'2h': 'TwoHours',
|
|
||||||
'4h': 'FourHours',
|
|
||||||
'D': 'OneDay',
|
|
||||||
'W': 'OneWeek',
|
|
||||||
'M': 'OneMonth',
|
|
||||||
'Y': 'OneYear',
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_show_wap_in_history: bool = False
|
_show_wap_in_history: bool = False
|
||||||
|
|
@ -199,7 +186,8 @@ _adhoc_futes_set = {
|
||||||
'lb.nymex', # random len lumber
|
'lb.nymex', # random len lumber
|
||||||
|
|
||||||
# metals
|
# metals
|
||||||
'xauusd.cmdty', # gold spot
|
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
|
||||||
|
'xauusd.cmdty', # london gold spot ^
|
||||||
'gc.nymex',
|
'gc.nymex',
|
||||||
'mgc.nymex', # micro
|
'mgc.nymex', # micro
|
||||||
|
|
||||||
|
|
@ -257,14 +245,12 @@ _exch_skip_list = {
|
||||||
'PSE',
|
'PSE',
|
||||||
}
|
}
|
||||||
|
|
||||||
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
|
|
||||||
|
|
||||||
_enters = 0
|
_enters = 0
|
||||||
|
|
||||||
|
|
||||||
def bars_to_np(bars: list) -> np.ndarray:
|
def bars_to_np(bars: list) -> np.ndarray:
|
||||||
'''
|
'''
|
||||||
Convert a "bars list thing" (``BarsList`` type from ibis)
|
Convert a "bars list thing" (``BarDataList`` type from ibis)
|
||||||
into a numpy struct array.
|
into a numpy struct array.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
@ -284,6 +270,27 @@ def bars_to_np(bars: list) -> np.ndarray:
|
||||||
return nparr
|
return nparr
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE: pacing violations exist for higher sample rates:
|
||||||
|
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations
|
||||||
|
# Also see note on duration limits being lifted on 1m+ periods,
|
||||||
|
# but they say "use with discretion":
|
||||||
|
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
|
||||||
|
_samplings: dict[int, tuple[str, str]] = {
|
||||||
|
1: (
|
||||||
|
'1 secs',
|
||||||
|
f'{int(2e3)} S',
|
||||||
|
pendulum.duration(seconds=2e3),
|
||||||
|
),
|
||||||
|
# TODO: benchmark >1 D duration on query to see if
|
||||||
|
# throughput can be made faster during backfilling.
|
||||||
|
60: (
|
||||||
|
'1 min',
|
||||||
|
'1 D',
|
||||||
|
pendulum.duration(days=1),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
class Client:
|
||||||
'''
|
'''
|
||||||
IB wrapped for our broker backend API.
|
IB wrapped for our broker backend API.
|
||||||
|
|
@ -338,19 +345,32 @@ class Client:
|
||||||
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
|
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
|
||||||
end_dt: Union[datetime, str] = "",
|
end_dt: Union[datetime, str] = "",
|
||||||
|
|
||||||
sample_period_s: str = 1, # ohlc sample period
|
# ohlc sample period in seconds
|
||||||
period_count: int = int(2e3), # <- max per 1s sample query
|
sample_period_s: int = 1,
|
||||||
|
|
||||||
) -> list[dict[str, Any]]:
|
# optional "duration of time" equal to the
|
||||||
|
# length of the returned history frame.
|
||||||
|
duration: Optional[str] = None,
|
||||||
|
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> tuple[BarDataList, np.ndarray, pendulum.Duration]:
|
||||||
'''
|
'''
|
||||||
Retreive OHLCV bars for a fqsn over a range to the present.
|
Retreive OHLCV bars for a fqsn over a range to the present.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# See API docs here:
|
||||||
|
# https://interactivebrokers.github.io/tws-api/historical_data.html
|
||||||
bars_kwargs = {'whatToShow': 'TRADES'}
|
bars_kwargs = {'whatToShow': 'TRADES'}
|
||||||
|
bars_kwargs.update(kwargs)
|
||||||
|
bar_size, duration, dt_duration = _samplings[sample_period_s]
|
||||||
|
|
||||||
global _enters
|
global _enters
|
||||||
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
|
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
|
||||||
print(f'REQUESTING BARS {_enters} @ end={end_dt}')
|
print(
|
||||||
|
f"REQUESTING {duration}'s worth {bar_size} BARS\n"
|
||||||
|
f'{_enters} @ end={end_dt}"'
|
||||||
|
)
|
||||||
|
|
||||||
if not end_dt:
|
if not end_dt:
|
||||||
end_dt = ''
|
end_dt = ''
|
||||||
|
|
@ -360,30 +380,20 @@ class Client:
|
||||||
contract = (await self.find_contracts(fqsn))[0]
|
contract = (await self.find_contracts(fqsn))[0]
|
||||||
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
|
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
|
||||||
|
|
||||||
# _min = min(2000*100, count)
|
|
||||||
bars = await self.ib.reqHistoricalDataAsync(
|
bars = await self.ib.reqHistoricalDataAsync(
|
||||||
contract,
|
contract,
|
||||||
endDateTime=end_dt,
|
endDateTime=end_dt,
|
||||||
formatDate=2,
|
formatDate=2,
|
||||||
|
|
||||||
# time history length values format:
|
|
||||||
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
|
|
||||||
|
|
||||||
# OHLC sampling values:
|
# OHLC sampling values:
|
||||||
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
|
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
|
||||||
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
|
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
|
||||||
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
|
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
|
||||||
# barSizeSetting='1 secs',
|
barSizeSetting=bar_size,
|
||||||
|
|
||||||
# durationStr='{count} S'.format(count=15000 * 5),
|
# time history length values format:
|
||||||
# durationStr='{count} D'.format(count=1),
|
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
|
||||||
# barSizeSetting='5 secs',
|
durationStr=duration,
|
||||||
|
|
||||||
durationStr='{count} S'.format(count=period_count),
|
|
||||||
# barSizeSetting='5 secs',
|
|
||||||
barSizeSetting='1 secs',
|
|
||||||
|
|
||||||
# barSizeSetting='1 min',
|
|
||||||
|
|
||||||
# always use extended hours
|
# always use extended hours
|
||||||
useRTH=False,
|
useRTH=False,
|
||||||
|
|
@ -394,11 +404,21 @@ class Client:
|
||||||
# whatToShow='TRADES',
|
# whatToShow='TRADES',
|
||||||
)
|
)
|
||||||
if not bars:
|
if not bars:
|
||||||
# TODO: raise underlying error here
|
# NOTE: there's 2 cases here to handle (and this should be
|
||||||
raise ValueError(f"No bars retreived for {fqsn}?")
|
# read alongside the implementation of
|
||||||
|
# ``.reqHistoricalDataAsync()``):
|
||||||
|
# - no data is returned for the period likely due to
|
||||||
|
# a weekend, holiday or other non-trading period prior to
|
||||||
|
# ``end_dt`` which exceeds the ``duration``,
|
||||||
|
# - a timeout occurred in which case insync internals return
|
||||||
|
# an empty list thing with bars.clear()...
|
||||||
|
return [], np.empty(0), dt_duration
|
||||||
|
# TODO: we could maybe raise ``NoData`` instead if we
|
||||||
|
# rewrite the method in the first case? right now there's no
|
||||||
|
# way to detect a timeout.
|
||||||
|
|
||||||
nparr = bars_to_np(bars)
|
nparr = bars_to_np(bars)
|
||||||
return bars, nparr
|
return bars, nparr, dt_duration
|
||||||
|
|
||||||
async def con_deats(
|
async def con_deats(
|
||||||
self,
|
self,
|
||||||
|
|
@ -463,7 +483,7 @@ class Client:
|
||||||
self,
|
self,
|
||||||
pattern: str,
|
pattern: str,
|
||||||
# how many contracts to search "up to"
|
# how many contracts to search "up to"
|
||||||
upto: int = 6,
|
upto: int = 16,
|
||||||
asdicts: bool = True,
|
asdicts: bool = True,
|
||||||
|
|
||||||
) -> dict[str, ContractDetails]:
|
) -> dict[str, ContractDetails]:
|
||||||
|
|
@ -498,6 +518,16 @@ class Client:
|
||||||
|
|
||||||
exch = tract.exchange
|
exch = tract.exchange
|
||||||
if exch not in _exch_skip_list:
|
if exch not in _exch_skip_list:
|
||||||
|
|
||||||
|
# try to lookup any contracts from our adhoc set
|
||||||
|
# since often the exchange/venue is named slightly
|
||||||
|
# different (eg. BRR.CMECRYPTO` instead of just
|
||||||
|
# `.CME`).
|
||||||
|
info = _adhoc_symbol_map.get(sym)
|
||||||
|
if info:
|
||||||
|
con_kwargs, bars_kwargs = info
|
||||||
|
exch = con_kwargs['exchange']
|
||||||
|
|
||||||
# try get all possible contracts for symbol as per,
|
# try get all possible contracts for symbol as per,
|
||||||
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
|
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
|
||||||
con = ibis.Future(
|
con = ibis.Future(
|
||||||
|
|
@ -1066,6 +1096,7 @@ async def load_aio_clients(
|
||||||
# retry a few times to get the client going..
|
# retry a few times to get the client going..
|
||||||
connect_retries: int = 3,
|
connect_retries: int = 3,
|
||||||
connect_timeout: float = 0.5,
|
connect_timeout: float = 0.5,
|
||||||
|
disconnect_on_exit: bool = True,
|
||||||
|
|
||||||
) -> dict[str, Client]:
|
) -> dict[str, Client]:
|
||||||
'''
|
'''
|
||||||
|
|
@ -1207,10 +1238,11 @@ async def load_aio_clients(
|
||||||
finally:
|
finally:
|
||||||
# TODO: for re-scans we'll want to not teardown clients which
|
# TODO: for re-scans we'll want to not teardown clients which
|
||||||
# are up and stable right?
|
# are up and stable right?
|
||||||
for acct, client in _accounts2clients.items():
|
if disconnect_on_exit:
|
||||||
log.info(f'Disconnecting {acct}@{client}')
|
for acct, client in _accounts2clients.items():
|
||||||
client.ib.disconnect()
|
log.info(f'Disconnecting {acct}@{client}')
|
||||||
_client_cache.pop((host, port), None)
|
client.ib.disconnect()
|
||||||
|
_client_cache.pop((host, port), None)
|
||||||
|
|
||||||
|
|
||||||
async def load_clients_for_trio(
|
async def load_clients_for_trio(
|
||||||
|
|
|
||||||
|
|
@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
|
||||||
entry['listingExchange'] = pexch
|
entry['listingExchange'] = pexch
|
||||||
|
|
||||||
conf = get_config()
|
conf = get_config()
|
||||||
entries = trades_to_ledger_entries(
|
entries = api_trades_to_ledger_entries(
|
||||||
conf['accounts'].inverse,
|
conf['accounts'].inverse,
|
||||||
trade_entries,
|
trade_entries,
|
||||||
)
|
)
|
||||||
|
|
@ -362,7 +362,7 @@ async def update_and_audit_msgs(
|
||||||
# if ib reports a lesser pp it's not as bad since we can
|
# if ib reports a lesser pp it's not as bad since we can
|
||||||
# presume we're at least not more in the shit then we
|
# presume we're at least not more in the shit then we
|
||||||
# thought.
|
# thought.
|
||||||
if diff:
|
if diff and pikersize:
|
||||||
reverse_split_ratio = pikersize / ibsize
|
reverse_split_ratio = pikersize / ibsize
|
||||||
split_ratio = 1/reverse_split_ratio
|
split_ratio = 1/reverse_split_ratio
|
||||||
|
|
||||||
|
|
@ -372,6 +372,7 @@ async def update_and_audit_msgs(
|
||||||
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
||||||
|
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
|
# log.error(
|
||||||
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
||||||
f'ib: {ibppmsg}\n'
|
f'ib: {ibppmsg}\n'
|
||||||
f'piker: {msg}\n'
|
f'piker: {msg}\n'
|
||||||
|
|
@ -1122,18 +1123,16 @@ def norm_trade_records(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# timestamping is way different in API records
|
# timestamping is way different in API records
|
||||||
|
dtstr = record.get('datetime')
|
||||||
date = record.get('date')
|
date = record.get('date')
|
||||||
if not date:
|
flex_dtstr = record.get('dateTime')
|
||||||
# probably a flex record with a wonky non-std timestamp..
|
|
||||||
date, ts = record['dateTime'].split(';')
|
|
||||||
dt = pendulum.parse(date)
|
|
||||||
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
|
||||||
tsdt = pendulum.parse(ts)
|
|
||||||
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
|
||||||
|
|
||||||
else:
|
if dtstr or date:
|
||||||
# epoch_dt = pendulum.from_timestamp(record.get('time'))
|
dt = pendulum.parse(dtstr or date)
|
||||||
dt = pendulum.parse(date)
|
|
||||||
|
elif flex_dtstr:
|
||||||
|
# probably a flex record with a wonky non-std timestamp..
|
||||||
|
dt = parse_flex_dt(record['dateTime'])
|
||||||
|
|
||||||
# special handling of symbol extraction from
|
# special handling of symbol extraction from
|
||||||
# flex records using some ad-hoc schema parsing.
|
# flex records using some ad-hoc schema parsing.
|
||||||
|
|
@ -1182,69 +1181,58 @@ def norm_trade_records(
|
||||||
return {r.tid: r for r in records}
|
return {r.tid: r for r in records}
|
||||||
|
|
||||||
|
|
||||||
def trades_to_ledger_entries(
|
def parse_flex_dt(
|
||||||
|
record: str,
|
||||||
|
) -> pendulum.datetime:
|
||||||
|
date, ts = record.split(';')
|
||||||
|
dt = pendulum.parse(date)
|
||||||
|
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
||||||
|
tsdt = pendulum.parse(ts)
|
||||||
|
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
||||||
|
|
||||||
|
|
||||||
|
def api_trades_to_ledger_entries(
|
||||||
accounts: bidict,
|
accounts: bidict,
|
||||||
trade_entries: list[object],
|
trade_entries: list[object],
|
||||||
source_type: str = 'api',
|
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
'''
|
'''
|
||||||
Convert either of API execution objects or flex report
|
Convert API execution objects entry objects into ``dict`` form,
|
||||||
entry objects into ``dict`` form, pretty much straight up
|
pretty much straight up without modification except add
|
||||||
without modification.
|
a `pydatetime` field from the parsed timestamp.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
trades_by_account = {}
|
trades_by_account = {}
|
||||||
|
|
||||||
for t in trade_entries:
|
for t in trade_entries:
|
||||||
if source_type == 'flex':
|
# NOTE: example of schema we pull from the API client.
|
||||||
entry = t.__dict__
|
# {
|
||||||
|
# 'commissionReport': CommissionReport(...
|
||||||
|
# 'contract': {...
|
||||||
|
# 'execution': Execution(...
|
||||||
|
# 'time': 1654801166.0
|
||||||
|
# }
|
||||||
|
|
||||||
# XXX: LOL apparently ``toml`` has a bug
|
# flatten all sub-dicts and values into one top level entry.
|
||||||
# where a section key error will show up in the write
|
entry = {}
|
||||||
# if you leave a table key as an `int`? So i guess
|
for section, val in t.items():
|
||||||
# cast to strs for all keys..
|
match section:
|
||||||
|
case 'contract' | 'execution' | 'commissionReport':
|
||||||
|
# sub-dict cases
|
||||||
|
entry.update(val)
|
||||||
|
|
||||||
# oddly for some so-called "BookTrade" entries
|
case 'time':
|
||||||
# this field seems to be blank, no cuckin clue.
|
# ib has wack ns timestamps, or is that us?
|
||||||
# trade['ibExecID']
|
continue
|
||||||
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
|
||||||
# date = str(entry['tradeDate'])
|
|
||||||
|
|
||||||
# XXX: is it going to cause problems if a account name
|
case _:
|
||||||
# get's lost? The user should be able to find it based
|
entry[section] = val
|
||||||
# on the actual exec history right?
|
|
||||||
acctid = accounts[str(entry['accountId'])]
|
|
||||||
|
|
||||||
elif source_type == 'api':
|
tid = str(entry['execId'])
|
||||||
# NOTE: example of schema we pull from the API client.
|
dt = pendulum.from_timestamp(entry['time'])
|
||||||
# {
|
# TODO: why isn't this showing seconds in the str?
|
||||||
# 'commissionReport': CommissionReport(...
|
entry['pydatetime'] = dt
|
||||||
# 'contract': {...
|
entry['datetime'] = str(dt)
|
||||||
# 'execution': Execution(...
|
acctid = accounts[entry['acctNumber']]
|
||||||
# 'time': 1654801166.0
|
|
||||||
# }
|
|
||||||
|
|
||||||
# flatten all sub-dicts and values into one top level entry.
|
|
||||||
entry = {}
|
|
||||||
for section, val in t.items():
|
|
||||||
match section:
|
|
||||||
case 'contract' | 'execution' | 'commissionReport':
|
|
||||||
# sub-dict cases
|
|
||||||
entry.update(val)
|
|
||||||
|
|
||||||
case 'time':
|
|
||||||
# ib has wack ns timestamps, or is that us?
|
|
||||||
continue
|
|
||||||
|
|
||||||
case _:
|
|
||||||
entry[section] = val
|
|
||||||
|
|
||||||
tid = str(entry['execId'])
|
|
||||||
dt = pendulum.from_timestamp(entry['time'])
|
|
||||||
# TODO: why isn't this showing seconds in the str?
|
|
||||||
entry['date'] = str(dt)
|
|
||||||
acctid = accounts[entry['acctNumber']]
|
|
||||||
|
|
||||||
if not tid:
|
if not tid:
|
||||||
# this is likely some kind of internal adjustment
|
# this is likely some kind of internal adjustment
|
||||||
|
|
@ -1262,6 +1250,73 @@ def trades_to_ledger_entries(
|
||||||
acctid, {}
|
acctid, {}
|
||||||
)[tid] = entry
|
)[tid] = entry
|
||||||
|
|
||||||
|
# sort entries in output by python based datetime
|
||||||
|
for acctid in trades_by_account:
|
||||||
|
trades_by_account[acctid] = dict(sorted(
|
||||||
|
trades_by_account[acctid].items(),
|
||||||
|
key=lambda entry: entry[1].pop('pydatetime'),
|
||||||
|
))
|
||||||
|
|
||||||
|
return trades_by_account
|
||||||
|
|
||||||
|
|
||||||
|
def flex_records_to_ledger_entries(
|
||||||
|
accounts: bidict,
|
||||||
|
trade_entries: list[object],
|
||||||
|
|
||||||
|
) -> dict:
|
||||||
|
'''
|
||||||
|
Convert flex report entry objects into ``dict`` form, pretty much
|
||||||
|
straight up without modification except add a `pydatetime` field
|
||||||
|
from the parsed timestamp.
|
||||||
|
|
||||||
|
'''
|
||||||
|
trades_by_account = {}
|
||||||
|
for t in trade_entries:
|
||||||
|
entry = t.__dict__
|
||||||
|
|
||||||
|
# XXX: LOL apparently ``toml`` has a bug
|
||||||
|
# where a section key error will show up in the write
|
||||||
|
# if you leave a table key as an `int`? So i guess
|
||||||
|
# cast to strs for all keys..
|
||||||
|
|
||||||
|
# oddly for some so-called "BookTrade" entries
|
||||||
|
# this field seems to be blank, no cuckin clue.
|
||||||
|
# trade['ibExecID']
|
||||||
|
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
||||||
|
# date = str(entry['tradeDate'])
|
||||||
|
|
||||||
|
# XXX: is it going to cause problems if a account name
|
||||||
|
# get's lost? The user should be able to find it based
|
||||||
|
# on the actual exec history right?
|
||||||
|
acctid = accounts[str(entry['accountId'])]
|
||||||
|
|
||||||
|
# probably a flex record with a wonky non-std timestamp..
|
||||||
|
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
|
||||||
|
entry['datetime'] = str(dt)
|
||||||
|
|
||||||
|
if not tid:
|
||||||
|
# this is likely some kind of internal adjustment
|
||||||
|
# transaction, likely one of the following:
|
||||||
|
# - an expiry event that will show a "book trade" indicating
|
||||||
|
# some adjustment to cash balances: zeroing or itm settle.
|
||||||
|
# - a manual cash balance position adjustment likely done by
|
||||||
|
# the user from the accounts window in TWS where they can
|
||||||
|
# manually set the avg price and size:
|
||||||
|
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
|
||||||
|
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
|
||||||
|
continue
|
||||||
|
|
||||||
|
trades_by_account.setdefault(
|
||||||
|
acctid, {}
|
||||||
|
)[tid] = entry
|
||||||
|
|
||||||
|
for acctid in trades_by_account:
|
||||||
|
trades_by_account[acctid] = dict(sorted(
|
||||||
|
trades_by_account[acctid].items(),
|
||||||
|
key=lambda entry: entry[1]['pydatetime'],
|
||||||
|
))
|
||||||
|
|
||||||
return trades_by_account
|
return trades_by_account
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1308,15 +1363,16 @@ def load_flex_trades(
|
||||||
ln = len(trade_entries)
|
ln = len(trade_entries)
|
||||||
log.info(f'Loaded {ln} trades from flex query')
|
log.info(f'Loaded {ln} trades from flex query')
|
||||||
|
|
||||||
trades_by_account = trades_to_ledger_entries(
|
trades_by_account = flex_records_to_ledger_entries(
|
||||||
# get reverse map to user account names
|
conf['accounts'].inverse, # reverse map to user account names
|
||||||
conf['accounts'].inverse,
|
|
||||||
trade_entries,
|
trade_entries,
|
||||||
source_type='flex',
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ledger_dict: Optional[dict] = None
|
||||||
|
|
||||||
for acctid in trades_by_account:
|
for acctid in trades_by_account:
|
||||||
trades_by_id = trades_by_account[acctid]
|
trades_by_id = trades_by_account[acctid]
|
||||||
|
|
||||||
with open_trade_ledger('ib', acctid) as ledger_dict:
|
with open_trade_ledger('ib', acctid) as ledger_dict:
|
||||||
tid_delta = set(trades_by_id) - set(ledger_dict)
|
tid_delta = set(trades_by_id) - set(ledger_dict)
|
||||||
log.info(
|
log.info(
|
||||||
|
|
@ -1324,9 +1380,11 @@ def load_flex_trades(
|
||||||
f'{pformat(tid_delta)}'
|
f'{pformat(tid_delta)}'
|
||||||
)
|
)
|
||||||
if tid_delta:
|
if tid_delta:
|
||||||
ledger_dict.update(
|
sorted_delta = dict(sorted(
|
||||||
{tid: trades_by_id[tid] for tid in tid_delta}
|
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
|
||||||
)
|
key=lambda entry: entry[1].pop('pydatetime'),
|
||||||
|
))
|
||||||
|
ledger_dict.update(sorted_delta)
|
||||||
|
|
||||||
return ledger_dict
|
return ledger_dict
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import asyncio
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from functools import partial
|
||||||
from math import isnan
|
from math import isnan
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
|
|
@ -38,7 +39,6 @@ import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
|
||||||
from piker.data._sharedmem import ShmArray
|
|
||||||
from .._util import SymbolNotFound, NoData
|
from .._util import SymbolNotFound, NoData
|
||||||
from .api import (
|
from .api import (
|
||||||
# _adhoc_futes_set,
|
# _adhoc_futes_set,
|
||||||
|
|
@ -111,24 +111,54 @@ async def open_history_client(
|
||||||
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
|
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# TODO:
|
||||||
|
# - add logic to handle tradable hours and only grab
|
||||||
|
# valid bars in the range?
|
||||||
|
# - we want to avoid overrunning the underlying shm array buffer and
|
||||||
|
# we should probably calc the number of calls to make depending on
|
||||||
|
# that until we have the `marketstore` daemon in place in which case
|
||||||
|
# the shm size will be driven by user config and available sys
|
||||||
|
# memory.
|
||||||
|
|
||||||
async with open_data_client() as proxy:
|
async with open_data_client() as proxy:
|
||||||
|
|
||||||
|
max_timeout: float = 2.
|
||||||
|
mean: float = 0
|
||||||
|
count: int = 0
|
||||||
|
|
||||||
async def get_hist(
|
async def get_hist(
|
||||||
|
timeframe: float,
|
||||||
end_dt: Optional[datetime] = None,
|
end_dt: Optional[datetime] = None,
|
||||||
start_dt: Optional[datetime] = None,
|
start_dt: Optional[datetime] = None,
|
||||||
|
|
||||||
) -> tuple[np.ndarray, str]:
|
) -> tuple[np.ndarray, str]:
|
||||||
|
nonlocal max_timeout, mean, count
|
||||||
|
|
||||||
out, fails = await get_bars(proxy, symbol, end_dt=end_dt)
|
query_start = time.time()
|
||||||
|
out, timedout = await get_bars(
|
||||||
|
proxy,
|
||||||
|
symbol,
|
||||||
|
timeframe,
|
||||||
|
end_dt=end_dt,
|
||||||
|
)
|
||||||
|
latency = time.time() - query_start
|
||||||
|
if (
|
||||||
|
not timedout
|
||||||
|
# and latency <= max_timeout
|
||||||
|
):
|
||||||
|
count += 1
|
||||||
|
mean += latency / count
|
||||||
|
print(
|
||||||
|
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
|
||||||
|
f'mean: {mean}'
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: add logic here to handle tradable hours and only grab
|
|
||||||
# valid bars in the range
|
|
||||||
if out is None:
|
if out is None:
|
||||||
# could be trying to retreive bars over weekend
|
# could be trying to retreive bars over weekend
|
||||||
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
||||||
raise NoData(
|
raise NoData(
|
||||||
f'{end_dt}',
|
f'{end_dt}',
|
||||||
frame_size=2000,
|
# frame_size=2000,
|
||||||
)
|
)
|
||||||
|
|
||||||
bars, bars_array, first_dt, last_dt = out
|
bars, bars_array, first_dt, last_dt = out
|
||||||
|
|
@ -145,7 +175,7 @@ async def open_history_client(
|
||||||
# quite sure why.. needs some tinkering and probably
|
# quite sure why.. needs some tinkering and probably
|
||||||
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
|
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
|
||||||
# we have to do the batch queries on the `asyncio` side?
|
# we have to do the batch queries on the `asyncio` side?
|
||||||
yield get_hist, {'erlangs': 1, 'rate': 6}
|
yield get_hist, {'erlangs': 1, 'rate': 3}
|
||||||
|
|
||||||
|
|
||||||
_pacing: str = (
|
_pacing: str = (
|
||||||
|
|
@ -154,261 +184,266 @@ _pacing: str = (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def wait_on_data_reset(
|
||||||
|
proxy: MethodProxy,
|
||||||
|
reset_type: str = 'data',
|
||||||
|
timeout: float = 16,
|
||||||
|
|
||||||
|
task_status: TaskStatus[
|
||||||
|
tuple[
|
||||||
|
trio.CancelScope,
|
||||||
|
trio.Event,
|
||||||
|
]
|
||||||
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
# TODO: we might have to put a task lock around this
|
||||||
|
# method..
|
||||||
|
hist_ev = proxy.status_event(
|
||||||
|
'HMDS data farm connection is OK:ushmds'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: other event messages we might want to try and
|
||||||
|
# wait for but i wasn't able to get any of this
|
||||||
|
# reliable..
|
||||||
|
# reconnect_start = proxy.status_event(
|
||||||
|
# 'Market data farm is connecting:usfuture'
|
||||||
|
# )
|
||||||
|
# live_ev = proxy.status_event(
|
||||||
|
# 'Market data farm connection is OK:usfuture'
|
||||||
|
# )
|
||||||
|
# try to wait on the reset event(s) to arrive, a timeout
|
||||||
|
# will trigger a retry up to 6 times (for now).
|
||||||
|
|
||||||
|
done = trio.Event()
|
||||||
|
with trio.move_on_after(timeout) as cs:
|
||||||
|
|
||||||
|
task_status.started((cs, done))
|
||||||
|
|
||||||
|
log.warning('Sending DATA RESET request')
|
||||||
|
res = await data_reset_hack(reset_type=reset_type)
|
||||||
|
|
||||||
|
if not res:
|
||||||
|
log.warning(
|
||||||
|
'NO VNC DETECTED!\n'
|
||||||
|
'Manually press ctrl-alt-f on your IB java app'
|
||||||
|
)
|
||||||
|
done.set()
|
||||||
|
return False
|
||||||
|
|
||||||
|
# TODO: not sure if waiting on other events
|
||||||
|
# is all that useful here or not.
|
||||||
|
# - in theory you could wait on one of the ones above first
|
||||||
|
# to verify the reset request was sent?
|
||||||
|
# - we need the same for real-time quote feeds which can
|
||||||
|
# sometimes flake out and stop delivering..
|
||||||
|
for name, ev in [
|
||||||
|
('history', hist_ev),
|
||||||
|
]:
|
||||||
|
await ev.wait()
|
||||||
|
log.info(f"{name} DATA RESET")
|
||||||
|
done.set()
|
||||||
|
return True
|
||||||
|
|
||||||
|
if cs.cancel_called:
|
||||||
|
log.warning(
|
||||||
|
'Data reset task canceled?'
|
||||||
|
)
|
||||||
|
|
||||||
|
done.set()
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
_data_resetter_task: trio.Task | None = None
|
||||||
|
|
||||||
|
|
||||||
async def get_bars(
|
async def get_bars(
|
||||||
|
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
|
timeframe: int,
|
||||||
|
|
||||||
# blank to start which tells ib to look up the latest datum
|
# blank to start which tells ib to look up the latest datum
|
||||||
end_dt: str = '',
|
end_dt: str = '',
|
||||||
|
|
||||||
|
# TODO: make this more dynamic based on measured frame rx latency..
|
||||||
|
timeout: float = 3, # how long before we trigger a feed reset
|
||||||
|
|
||||||
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> (dict, np.ndarray):
|
) -> (dict, np.ndarray):
|
||||||
'''
|
'''
|
||||||
Retrieve historical data from a ``trio``-side task using
|
Retrieve historical data from a ``trio``-side task using
|
||||||
a ``MethoProxy``.
|
a ``MethoProxy``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
fails = 0
|
global _data_resetter_task
|
||||||
bars: Optional[list] = None
|
|
||||||
first_dt: datetime = None
|
|
||||||
last_dt: datetime = None
|
|
||||||
|
|
||||||
if end_dt:
|
data_cs: trio.CancelScope | None = None
|
||||||
last_dt = pendulum.from_timestamp(end_dt.timestamp())
|
result: tuple[
|
||||||
|
ibis.objects.BarDataList,
|
||||||
|
np.ndarray,
|
||||||
|
datetime,
|
||||||
|
datetime,
|
||||||
|
] | None = None
|
||||||
|
result_ready = trio.Event()
|
||||||
|
|
||||||
for _ in range(10):
|
async def query():
|
||||||
try:
|
nonlocal result, data_cs, end_dt
|
||||||
out = await proxy.bars(
|
while True:
|
||||||
fqsn=fqsn,
|
try:
|
||||||
end_dt=end_dt,
|
out = await proxy.bars(
|
||||||
)
|
fqsn=fqsn,
|
||||||
if out:
|
end_dt=end_dt,
|
||||||
bars, bars_array = out
|
sample_period_s=timeframe,
|
||||||
|
|
||||||
else:
|
# ideally we cancel the request just before we
|
||||||
await tractor.breakpoint()
|
# cancel on the ``trio``-side and trigger a data
|
||||||
|
# reset hack.. the problem is there's no way (with
|
||||||
if bars_array is None:
|
# current impl) to detect a cancel case.
|
||||||
raise SymbolNotFound(fqsn)
|
# timeout=timeout,
|
||||||
|
|
||||||
first_dt = pendulum.from_timestamp(
|
|
||||||
bars[0].date.timestamp())
|
|
||||||
|
|
||||||
last_dt = pendulum.from_timestamp(
|
|
||||||
bars[-1].date.timestamp())
|
|
||||||
|
|
||||||
time = bars_array['time']
|
|
||||||
assert time[-1] == last_dt.timestamp()
|
|
||||||
assert time[0] == first_dt.timestamp()
|
|
||||||
log.info(
|
|
||||||
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
|
|
||||||
)
|
|
||||||
|
|
||||||
return (bars, bars_array, first_dt, last_dt), fails
|
|
||||||
|
|
||||||
except RequestError as err:
|
|
||||||
msg = err.message
|
|
||||||
|
|
||||||
if 'No market data permissions for' in msg:
|
|
||||||
# TODO: signalling for no permissions searches
|
|
||||||
raise NoData(
|
|
||||||
f'Symbol: {fqsn}',
|
|
||||||
)
|
)
|
||||||
|
|
||||||
elif (
|
|
||||||
err.code == 162 and
|
|
||||||
'HMDS query returned no data' in err.message
|
|
||||||
):
|
|
||||||
# XXX: this is now done in the storage mgmt layer
|
|
||||||
# and we shouldn't implicitly decrement the frame dt
|
|
||||||
# index since the upper layer may be doing so
|
|
||||||
# concurrently and we don't want to be delivering frames
|
|
||||||
# that weren't asked for.
|
|
||||||
log.warning(
|
|
||||||
f'NO DATA found ending @ {end_dt}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# try to decrement start point and look further back
|
|
||||||
# end_dt = last_dt = last_dt.subtract(seconds=2000)
|
|
||||||
|
|
||||||
raise NoData(
|
|
||||||
f'Symbol: {fqsn}',
|
|
||||||
frame_size=2000,
|
|
||||||
)
|
|
||||||
|
|
||||||
# elif (
|
|
||||||
# err.code == 162 and
|
|
||||||
# 'Trading TWS session is connected from a different IP
|
|
||||||
# address' in err.message
|
|
||||||
# ):
|
|
||||||
# log.warning("ignoring ip address warning")
|
|
||||||
# continue
|
|
||||||
|
|
||||||
elif _pacing in msg:
|
|
||||||
|
|
||||||
log.warning(
|
|
||||||
'History throttle rate reached!\n'
|
|
||||||
'Resetting farms with `ctrl-alt-f` hack\n'
|
|
||||||
)
|
|
||||||
# TODO: we might have to put a task lock around this
|
|
||||||
# method..
|
|
||||||
hist_ev = proxy.status_event(
|
|
||||||
'HMDS data farm connection is OK:ushmds'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: other event messages we might want to try and
|
|
||||||
# wait for but i wasn't able to get any of this
|
|
||||||
# reliable..
|
|
||||||
# reconnect_start = proxy.status_event(
|
|
||||||
# 'Market data farm is connecting:usfuture'
|
|
||||||
# )
|
|
||||||
# live_ev = proxy.status_event(
|
|
||||||
# 'Market data farm connection is OK:usfuture'
|
|
||||||
# )
|
|
||||||
|
|
||||||
# try to wait on the reset event(s) to arrive, a timeout
|
|
||||||
# will trigger a retry up to 6 times (for now).
|
|
||||||
tries: int = 2
|
|
||||||
timeout: float = 10
|
|
||||||
|
|
||||||
# try 3 time with a data reset then fail over to
|
|
||||||
# a connection reset.
|
|
||||||
for i in range(1, tries):
|
|
||||||
|
|
||||||
log.warning('Sending DATA RESET request')
|
|
||||||
await data_reset_hack(reset_type='data')
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
|
||||||
for name, ev in [
|
|
||||||
# TODO: not sure if waiting on other events
|
|
||||||
# is all that useful here or not. in theory
|
|
||||||
# you could wait on one of the ones above
|
|
||||||
# first to verify the reset request was
|
|
||||||
# sent?
|
|
||||||
('history', hist_ev),
|
|
||||||
]:
|
|
||||||
await ev.wait()
|
|
||||||
log.info(f"{name} DATA RESET")
|
|
||||||
break
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
|
||||||
fails += 1
|
|
||||||
log.warning(
|
|
||||||
f'Data reset {name} timeout, retrying {i}.'
|
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
|
|
||||||
log.warning('Sending CONNECTION RESET')
|
|
||||||
res = await data_reset_hack(reset_type='connection')
|
|
||||||
if not res:
|
|
||||||
log.warning(
|
|
||||||
'NO VNC DETECTED!\n'
|
|
||||||
'Manually press ctrl-alt-f on your IB java app'
|
|
||||||
)
|
|
||||||
# break
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
|
||||||
for name, ev in [
|
|
||||||
# TODO: not sure if waiting on other events
|
|
||||||
# is all that useful here or not. in theory
|
|
||||||
# you could wait on one of the ones above
|
|
||||||
# first to verify the reset request was
|
|
||||||
# sent?
|
|
||||||
('history', hist_ev),
|
|
||||||
]:
|
|
||||||
await ev.wait()
|
|
||||||
log.info(f"{name} DATA RESET")
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
|
||||||
fails += 1
|
|
||||||
log.warning('Data CONNECTION RESET timeout!?')
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
return None, None
|
|
||||||
# else: # throttle wasn't fixed so error out immediately
|
|
||||||
# raise _err
|
|
||||||
|
|
||||||
|
|
||||||
async def backfill_bars(
|
|
||||||
|
|
||||||
fqsn: str,
|
|
||||||
shm: ShmArray, # type: ignore # noqa
|
|
||||||
|
|
||||||
# TODO: we want to avoid overrunning the underlying shm array buffer
|
|
||||||
# and we should probably calc the number of calls to make depending
|
|
||||||
# on that until we have the `marketstore` daemon in place in which
|
|
||||||
# case the shm size will be driven by user config and available sys
|
|
||||||
# memory.
|
|
||||||
count: int = 16,
|
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Fill historical bars into shared mem / storage afap.
|
|
||||||
|
|
||||||
TODO: avoid pacing constraints:
|
|
||||||
https://github.com/pikers/piker/issues/128
|
|
||||||
|
|
||||||
'''
|
|
||||||
# last_dt1 = None
|
|
||||||
last_dt = None
|
|
||||||
|
|
||||||
with trio.CancelScope() as cs:
|
|
||||||
|
|
||||||
async with open_data_client() as proxy:
|
|
||||||
|
|
||||||
out, fails = await get_bars(proxy, fqsn)
|
|
||||||
|
|
||||||
if out is None:
|
|
||||||
raise RuntimeError("Could not pull currrent history?!")
|
|
||||||
|
|
||||||
(first_bars, bars_array, first_dt, last_dt) = out
|
|
||||||
vlm = bars_array['volume']
|
|
||||||
vlm[vlm < 0] = 0
|
|
||||||
last_dt = first_dt
|
|
||||||
|
|
||||||
# write historical data to buffer
|
|
||||||
shm.push(bars_array)
|
|
||||||
|
|
||||||
task_status.started(cs)
|
|
||||||
|
|
||||||
i = 0
|
|
||||||
while i < count:
|
|
||||||
|
|
||||||
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
|
|
||||||
|
|
||||||
if out is None:
|
if out is None:
|
||||||
# could be trying to retreive bars over weekend
|
raise NoData(f'{end_dt}')
|
||||||
# TODO: add logic here to handle tradable hours and
|
|
||||||
# only grab valid bars in the range
|
|
||||||
log.error(f"Can't grab bars starting at {first_dt}!?!?")
|
|
||||||
|
|
||||||
# XXX: get_bars() should internally decrement dt by
|
bars, bars_array, dt_duration = out
|
||||||
# 2k seconds and try again.
|
|
||||||
|
if not bars:
|
||||||
|
log.warning(
|
||||||
|
f'History is blank for {dt_duration} from {end_dt}'
|
||||||
|
)
|
||||||
|
end_dt -= dt_duration
|
||||||
continue
|
continue
|
||||||
|
|
||||||
(first_bars, bars_array, first_dt, last_dt) = out
|
if bars_array is None:
|
||||||
# last_dt1 = last_dt
|
raise SymbolNotFound(fqsn)
|
||||||
# last_dt = first_dt
|
|
||||||
|
|
||||||
# volume cleaning since there's -ve entries,
|
first_dt = pendulum.from_timestamp(
|
||||||
# wood luv to know what crookery that is..
|
bars[0].date.timestamp())
|
||||||
vlm = bars_array['volume']
|
|
||||||
vlm[vlm < 0] = 0
|
|
||||||
|
|
||||||
# TODO we should probably dig into forums to see what peeps
|
last_dt = pendulum.from_timestamp(
|
||||||
# think this data "means" and then use it as an indicator of
|
bars[-1].date.timestamp())
|
||||||
# sorts? dinkus has mentioned that $vlms for the day dont'
|
|
||||||
# match other platforms nor the summary stat tws shows in
|
|
||||||
# the monitor - it's probably worth investigating.
|
|
||||||
|
|
||||||
shm.push(bars_array, prepend=True)
|
time = bars_array['time']
|
||||||
i += 1
|
assert time[-1] == last_dt.timestamp()
|
||||||
|
assert time[0] == first_dt.timestamp()
|
||||||
|
log.info(
|
||||||
|
f'{len(bars)} bars retreived {first_dt} -> {last_dt}'
|
||||||
|
)
|
||||||
|
|
||||||
|
if data_cs:
|
||||||
|
data_cs.cancel()
|
||||||
|
|
||||||
|
result = (bars, bars_array, first_dt, last_dt)
|
||||||
|
|
||||||
|
# signal data reset loop parent task
|
||||||
|
result_ready.set()
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except RequestError as err:
|
||||||
|
msg = err.message
|
||||||
|
|
||||||
|
if 'No market data permissions for' in msg:
|
||||||
|
# TODO: signalling for no permissions searches
|
||||||
|
raise NoData(
|
||||||
|
f'Symbol: {fqsn}',
|
||||||
|
)
|
||||||
|
|
||||||
|
elif err.code == 162:
|
||||||
|
if 'HMDS query returned no data' in err.message:
|
||||||
|
# XXX: this is now done in the storage mgmt
|
||||||
|
# layer and we shouldn't implicitly decrement
|
||||||
|
# the frame dt index since the upper layer may
|
||||||
|
# be doing so concurrently and we don't want to
|
||||||
|
# be delivering frames that weren't asked for.
|
||||||
|
log.warning(
|
||||||
|
f'NO DATA found ending @ {end_dt}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# try to decrement start point and look further back
|
||||||
|
# end_dt = end_dt.subtract(seconds=2000)
|
||||||
|
end_dt = end_dt.subtract(days=1)
|
||||||
|
print("SUBTRACTING DAY")
|
||||||
|
continue
|
||||||
|
|
||||||
|
elif 'API historical data query cancelled' in err.message:
|
||||||
|
log.warning(
|
||||||
|
'Query cancelled by IB (:eyeroll:):\n'
|
||||||
|
f'{err.message}'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
elif (
|
||||||
|
'Trading TWS session is connected from a different IP'
|
||||||
|
in err.message
|
||||||
|
):
|
||||||
|
log.warning("ignoring ip address warning")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# XXX: more or less same as above timeout case
|
||||||
|
elif _pacing in msg:
|
||||||
|
log.warning(
|
||||||
|
'History throttle rate reached!\n'
|
||||||
|
'Resetting farms with `ctrl-alt-f` hack\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# cancel any existing reset task
|
||||||
|
if data_cs:
|
||||||
|
data_cs.cancel()
|
||||||
|
|
||||||
|
# spawn new data reset task
|
||||||
|
data_cs, reset_done = await nurse.start(
|
||||||
|
partial(
|
||||||
|
wait_on_data_reset,
|
||||||
|
proxy,
|
||||||
|
timeout=float('inf'),
|
||||||
|
reset_type='connection'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# TODO: make this global across all history task/requests
|
||||||
|
# such that simultaneous symbol queries don't try data resettingn
|
||||||
|
# too fast..
|
||||||
|
unset_resetter: bool = False
|
||||||
|
async with trio.open_nursery() as nurse:
|
||||||
|
|
||||||
|
# start history request that we allow
|
||||||
|
# to run indefinitely until a result is acquired
|
||||||
|
nurse.start_soon(query)
|
||||||
|
|
||||||
|
# start history reset loop which waits up to the timeout
|
||||||
|
# for a result before triggering a data feed reset.
|
||||||
|
while not result_ready.is_set():
|
||||||
|
|
||||||
|
with trio.move_on_after(timeout):
|
||||||
|
await result_ready.wait()
|
||||||
|
break
|
||||||
|
|
||||||
|
if _data_resetter_task:
|
||||||
|
# don't double invoke the reset hack if another
|
||||||
|
# requester task already has it covered.
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
_data_resetter_task = trio.lowlevel.current_task()
|
||||||
|
unset_resetter = True
|
||||||
|
|
||||||
|
# spawn new data reset task
|
||||||
|
data_cs, reset_done = await nurse.start(
|
||||||
|
partial(
|
||||||
|
wait_on_data_reset,
|
||||||
|
proxy,
|
||||||
|
timeout=float('inf'),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# sync wait on reset to complete
|
||||||
|
await reset_done.wait()
|
||||||
|
|
||||||
|
_data_resetter_task = None if unset_resetter else _data_resetter_task
|
||||||
|
return result, data_cs is not None
|
||||||
|
|
||||||
|
|
||||||
asset_type_map = {
|
asset_type_map = {
|
||||||
|
|
@ -466,7 +501,9 @@ async def _setup_quote_stream(
|
||||||
|
|
||||||
to_trio.send_nowait(None)
|
to_trio.send_nowait(None)
|
||||||
|
|
||||||
async with load_aio_clients() as accts2clients:
|
async with load_aio_clients(
|
||||||
|
disconnect_on_exit=False,
|
||||||
|
) as accts2clients:
|
||||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||||
contract = contract or (await client.find_contract(symbol))
|
contract = contract or (await client.find_contract(symbol))
|
||||||
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
||||||
|
|
@ -512,10 +549,11 @@ async def _setup_quote_stream(
|
||||||
# Manually do the dereg ourselves.
|
# Manually do the dereg ourselves.
|
||||||
teardown()
|
teardown()
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
log.warning(
|
# log.warning(
|
||||||
f'channel is blocking symbol feed for {symbol}?'
|
# f'channel is blocking symbol feed for {symbol}?'
|
||||||
f'\n{to_trio.statistics}'
|
# f'\n{to_trio.statistics}'
|
||||||
)
|
# )
|
||||||
|
pass
|
||||||
|
|
||||||
# except trio.WouldBlock:
|
# except trio.WouldBlock:
|
||||||
# # for slow debugging purposes to avoid clobbering prompt
|
# # for slow debugging purposes to avoid clobbering prompt
|
||||||
|
|
@ -545,7 +583,8 @@ async def open_aio_quote_stream(
|
||||||
from_aio = _quote_streams.get(symbol)
|
from_aio = _quote_streams.get(symbol)
|
||||||
if from_aio:
|
if from_aio:
|
||||||
|
|
||||||
# if we already have a cached feed deliver a rx side clone to consumer
|
# if we already have a cached feed deliver a rx side clone
|
||||||
|
# to consumer
|
||||||
async with broadcast_receiver(
|
async with broadcast_receiver(
|
||||||
from_aio,
|
from_aio,
|
||||||
2**6,
|
2**6,
|
||||||
|
|
@ -736,67 +775,97 @@ async def stream_quotes(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
return # we never expect feed to come up?
|
return # we never expect feed to come up?
|
||||||
|
|
||||||
async with open_aio_quote_stream(
|
cs: Optional[trio.CancelScope] = None
|
||||||
symbol=sym,
|
startup: bool = True
|
||||||
contract=con,
|
while (
|
||||||
) as stream:
|
startup
|
||||||
|
or cs.cancel_called
|
||||||
# ugh, clear ticks since we've consumed them
|
):
|
||||||
# (ahem, ib_insync is stateful trash)
|
with trio.CancelScope() as cs:
|
||||||
first_ticker.ticks = []
|
async with (
|
||||||
|
trio.open_nursery() as nurse,
|
||||||
task_status.started((init_msgs, first_quote))
|
open_aio_quote_stream(
|
||||||
|
symbol=sym,
|
||||||
async with aclosing(stream):
|
contract=con,
|
||||||
if syminfo.get('no_vlm', False):
|
) as stream,
|
||||||
|
):
|
||||||
# generally speaking these feeds don't
|
|
||||||
# include vlm data.
|
|
||||||
atype = syminfo['asset_type']
|
|
||||||
log.info(
|
|
||||||
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# wait for real volume on feed (trading might be closed)
|
|
||||||
while True:
|
|
||||||
ticker = await stream.receive()
|
|
||||||
|
|
||||||
# for a real volume contract we rait for the first
|
|
||||||
# "real" trade to take place
|
|
||||||
if (
|
|
||||||
# not calc_price
|
|
||||||
# and not ticker.rtTime
|
|
||||||
not ticker.rtTime
|
|
||||||
):
|
|
||||||
# spin consuming tickers until we get a real
|
|
||||||
# market datum
|
|
||||||
log.debug(f"New unsent ticker: {ticker}")
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
log.debug("Received first real volume tick")
|
|
||||||
# ugh, clear ticks since we've consumed them
|
|
||||||
# (ahem, ib_insync is truly stateful trash)
|
|
||||||
ticker.ticks = []
|
|
||||||
|
|
||||||
# XXX: this works because we don't use
|
|
||||||
# ``aclosing()`` above?
|
|
||||||
break
|
|
||||||
|
|
||||||
quote = normalize(ticker)
|
|
||||||
log.debug(f"First ticker received {quote}")
|
|
||||||
|
|
||||||
# tell caller quotes are now coming in live
|
|
||||||
feed_is_live.set()
|
|
||||||
|
|
||||||
# last = time.time()
|
|
||||||
async for ticker in stream:
|
|
||||||
quote = normalize(ticker)
|
|
||||||
await send_chan.send({quote['fqsn']: quote})
|
|
||||||
|
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
ticker.ticks = []
|
# (ahem, ib_insync is stateful trash)
|
||||||
# last = time.time()
|
first_ticker.ticks = []
|
||||||
|
|
||||||
|
# only on first entry at feed boot up
|
||||||
|
if startup:
|
||||||
|
startup = False
|
||||||
|
task_status.started((init_msgs, first_quote))
|
||||||
|
|
||||||
|
# start a stream restarter task which monitors the
|
||||||
|
# data feed event.
|
||||||
|
async def reset_on_feed():
|
||||||
|
|
||||||
|
# TODO: this seems to be surpressed from the
|
||||||
|
# traceback in ``tractor``?
|
||||||
|
# assert 0
|
||||||
|
|
||||||
|
rt_ev = proxy.status_event(
|
||||||
|
'Market data farm connection is OK:usfarm'
|
||||||
|
)
|
||||||
|
await rt_ev.wait()
|
||||||
|
cs.cancel() # cancel called should now be set
|
||||||
|
|
||||||
|
nurse.start_soon(reset_on_feed)
|
||||||
|
|
||||||
|
async with aclosing(stream):
|
||||||
|
if syminfo.get('no_vlm', False):
|
||||||
|
|
||||||
|
# generally speaking these feeds don't
|
||||||
|
# include vlm data.
|
||||||
|
atype = syminfo['asset_type']
|
||||||
|
log.info(
|
||||||
|
f'No-vlm {sym}@{atype}, skipping quote poll'
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# wait for real volume on feed (trading might be
|
||||||
|
# closed)
|
||||||
|
while True:
|
||||||
|
ticker = await stream.receive()
|
||||||
|
|
||||||
|
# for a real volume contract we rait for
|
||||||
|
# the first "real" trade to take place
|
||||||
|
if (
|
||||||
|
# not calc_price
|
||||||
|
# and not ticker.rtTime
|
||||||
|
not ticker.rtTime
|
||||||
|
):
|
||||||
|
# spin consuming tickers until we
|
||||||
|
# get a real market datum
|
||||||
|
log.debug(f"New unsent ticker: {ticker}")
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
log.debug("Received first volume tick")
|
||||||
|
# ugh, clear ticks since we've
|
||||||
|
# consumed them (ahem, ib_insync is
|
||||||
|
# truly stateful trash)
|
||||||
|
ticker.ticks = []
|
||||||
|
|
||||||
|
# XXX: this works because we don't use
|
||||||
|
# ``aclosing()`` above?
|
||||||
|
break
|
||||||
|
|
||||||
|
quote = normalize(ticker)
|
||||||
|
log.debug(f"First ticker received {quote}")
|
||||||
|
|
||||||
|
# tell caller quotes are now coming in live
|
||||||
|
feed_is_live.set()
|
||||||
|
|
||||||
|
# last = time.time()
|
||||||
|
async for ticker in stream:
|
||||||
|
quote = normalize(ticker)
|
||||||
|
await send_chan.send({quote['fqsn']: quote})
|
||||||
|
|
||||||
|
# ugh, clear ticks since we've consumed them
|
||||||
|
ticker.ticks = []
|
||||||
|
# last = time.time()
|
||||||
|
|
||||||
|
|
||||||
async def data_reset_hack(
|
async def data_reset_hack(
|
||||||
|
|
@ -904,7 +973,14 @@ async def open_symbol_search(
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if not pattern or pattern.isspace():
|
if (
|
||||||
|
not pattern
|
||||||
|
or pattern.isspace()
|
||||||
|
|
||||||
|
# XXX: not sure if this is a bad assumption but it
|
||||||
|
# seems to make search snappier?
|
||||||
|
or len(pattern) < 1
|
||||||
|
):
|
||||||
log.warning('empty pattern received, skipping..')
|
log.warning('empty pattern received, skipping..')
|
||||||
|
|
||||||
# TODO: *BUG* if nothing is returned here the client
|
# TODO: *BUG* if nothing is returned here the client
|
||||||
|
|
|
||||||
|
|
@ -258,6 +258,7 @@ async def open_history_client(
|
||||||
queries: int = 0
|
queries: int = 0
|
||||||
|
|
||||||
async def get_ohlc(
|
async def get_ohlc(
|
||||||
|
timeframe: float,
|
||||||
end_dt: Optional[datetime] = None,
|
end_dt: Optional[datetime] = None,
|
||||||
start_dt: Optional[datetime] = None,
|
start_dt: Optional[datetime] = None,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir):
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def services(config, tl, names):
|
def services(config, tl, names):
|
||||||
|
|
||||||
async def list_services():
|
from .._daemon import open_piker_runtime
|
||||||
|
|
||||||
async with tractor.get_arbiter(
|
async def list_services():
|
||||||
*_tractor_kwargs['arbiter_addr']
|
async with (
|
||||||
) as portal:
|
open_piker_runtime(
|
||||||
|
name='service_query',
|
||||||
|
loglevel=config['loglevel'] if tl else None,
|
||||||
|
),
|
||||||
|
tractor.get_arbiter(
|
||||||
|
*_tractor_kwargs['arbiter_addr']
|
||||||
|
) as portal
|
||||||
|
):
|
||||||
registry = await portal.run_from_ns('self', 'get_registry')
|
registry = await portal.run_from_ns('self', 'get_registry')
|
||||||
json_d = {}
|
json_d = {}
|
||||||
for key, socket in registry.items():
|
for key, socket in registry.items():
|
||||||
# name, uuid = uid
|
|
||||||
host, port = socket
|
host, port = socket
|
||||||
json_d[key] = f'{host}:{port}'
|
json_d[key] = f'{host}:{port}'
|
||||||
click.echo(f"{colorize_json(json_d)}")
|
click.echo(f"{colorize_json(json_d)}")
|
||||||
|
|
||||||
tractor.run(
|
trio.run(list_services)
|
||||||
list_services,
|
|
||||||
name='service_query',
|
|
||||||
loglevel=config['loglevel'] if tl else None,
|
|
||||||
arbiter_addr=_tractor_kwargs['arbiter_addr'],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _load_clis() -> None:
|
def _load_clis() -> None:
|
||||||
|
|
|
||||||
|
|
@ -21,16 +21,19 @@ This module is enabled for ``brokerd`` daemons.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from datetime import datetime
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from dataclasses import (
|
||||||
|
dataclass,
|
||||||
|
field,
|
||||||
|
)
|
||||||
|
from datetime import datetime
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pprint import pformat
|
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator, Optional,
|
AsyncIterator,
|
||||||
Generator,
|
Callable,
|
||||||
|
Optional,
|
||||||
Awaitable,
|
Awaitable,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Union,
|
Union,
|
||||||
|
|
@ -39,7 +42,6 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from trio.abc import ReceiveChannel
|
from trio.abc import ReceiveChannel
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import trimeter
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import maybe_open_context
|
from tractor.trionics import maybe_open_context
|
||||||
import pendulum
|
import pendulum
|
||||||
|
|
@ -252,6 +254,7 @@ async def start_backfill(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
bfqsn: str,
|
bfqsn: str,
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
|
timeframe: float,
|
||||||
|
|
||||||
last_tsdb_dt: Optional[datetime] = None,
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
storage: Optional[Storage] = None,
|
storage: Optional[Storage] = None,
|
||||||
|
|
@ -262,11 +265,19 @@ async def start_backfill(
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
|
|
||||||
|
hist: Callable[
|
||||||
|
[int, datetime, datetime],
|
||||||
|
tuple[np.ndarray, str]
|
||||||
|
]
|
||||||
|
config: dict[str, int]
|
||||||
async with mod.open_history_client(bfqsn) as (hist, config):
|
async with mod.open_history_client(bfqsn) as (hist, config):
|
||||||
|
|
||||||
# get latest query's worth of history all the way
|
# get latest query's worth of history all the way
|
||||||
# back to what is recorded in the tsdb
|
# back to what is recorded in the tsdb
|
||||||
array, start_dt, end_dt = await hist(end_dt=None)
|
array, start_dt, end_dt = await hist(
|
||||||
|
timeframe,
|
||||||
|
end_dt=None,
|
||||||
|
)
|
||||||
|
|
||||||
times = array['time']
|
times = array['time']
|
||||||
|
|
||||||
|
|
@ -289,6 +300,9 @@ async def start_backfill(
|
||||||
log.info(f'Pushing {to_push.size} to shm!')
|
log.info(f'Pushing {to_push.size} to shm!')
|
||||||
shm.push(to_push)
|
shm.push(to_push)
|
||||||
|
|
||||||
|
# TODO: *** THIS IS A BUG ***
|
||||||
|
# we need to only broadcast to subscribers for this fqsn..
|
||||||
|
# otherwise all fsps get reset on every chart..
|
||||||
for delay_s in sampler.subscribers:
|
for delay_s in sampler.subscribers:
|
||||||
await broadcast(delay_s)
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
|
@ -304,8 +318,8 @@ async def start_backfill(
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'`piker` only needs to support 1m and 1s sampling '
|
'`piker` only needs to support 1m and 1s sampling '
|
||||||
'but ur api is trying to deliver a longer '
|
'but ur api is trying to deliver a longer '
|
||||||
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun '
|
f'timeframe of {step_size_s} seconds..\n'
|
||||||
'do dat brudder.'
|
'So yuh.. dun do dat brudder.'
|
||||||
)
|
)
|
||||||
|
|
||||||
# when no tsdb "last datum" is provided, we just load
|
# when no tsdb "last datum" is provided, we just load
|
||||||
|
|
@ -319,96 +333,60 @@ async def start_backfill(
|
||||||
# do a decently sized backfill and load it into storage.
|
# do a decently sized backfill and load it into storage.
|
||||||
periods = {
|
periods = {
|
||||||
1: {'days': 6},
|
1: {'days': 6},
|
||||||
60: {'years': 2},
|
60: {'years': 6},
|
||||||
}
|
}
|
||||||
|
|
||||||
kwargs = periods[step_size_s]
|
kwargs = periods[step_size_s]
|
||||||
last_tsdb_dt = start_dt.subtract(**kwargs)
|
last_tsdb_dt = start_dt.subtract(**kwargs)
|
||||||
|
|
||||||
# configure async query throttling
|
# configure async query throttling
|
||||||
erlangs = config.get('erlangs', 1)
|
# rate = config.get('rate', 1)
|
||||||
rate = config.get('rate', 1)
|
# XXX: legacy from ``trimeter`` code but unsupported now.
|
||||||
frames = {}
|
# erlangs = config.get('erlangs', 1)
|
||||||
|
|
||||||
def iter_dts(start: datetime):
|
|
||||||
|
|
||||||
while True:
|
|
||||||
|
|
||||||
hist_period = pendulum.period(
|
|
||||||
start,
|
|
||||||
last_tsdb_dt,
|
|
||||||
)
|
|
||||||
dtrange = list(hist_period.range('seconds', frame_size_s))
|
|
||||||
log.debug(f'New datetime index:\n{pformat(dtrange)}')
|
|
||||||
|
|
||||||
for end_dt in dtrange:
|
|
||||||
log.info(f'Yielding next frame start {end_dt}')
|
|
||||||
start = yield end_dt
|
|
||||||
|
|
||||||
# if caller sends a new start date, reset to that
|
|
||||||
if start is not None:
|
|
||||||
log.warning(f'Resetting date range: {start}')
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
# from while
|
|
||||||
return
|
|
||||||
|
|
||||||
# pull new history frames until we hit latest
|
|
||||||
# already in the tsdb or a max count.
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
# NOTE: when gaps are detected in the retreived history (by
|
|
||||||
# comparisor of the end - start versus the expected "frame size"
|
|
||||||
# in seconds) we need a way to alert the async request code not
|
|
||||||
# to continue to query for data "within the gap". This var is
|
|
||||||
# set in such cases such that further requests in that period
|
|
||||||
# are discarded and further we reset the "datetimem query frame
|
|
||||||
# index" in such cases to avoid needless noop requests.
|
|
||||||
earliest_end_dt: Optional[datetime] = start_dt
|
|
||||||
|
|
||||||
async def get_ohlc_frame(
|
|
||||||
input_end_dt: datetime,
|
|
||||||
iter_dts_gen: Generator[datetime],
|
|
||||||
|
|
||||||
) -> np.ndarray:
|
|
||||||
|
|
||||||
nonlocal count, frames, earliest_end_dt, frame_size_s
|
|
||||||
count += 1
|
|
||||||
|
|
||||||
if input_end_dt > earliest_end_dt:
|
|
||||||
# if a request comes in for an inter-gap frame we
|
|
||||||
# discard it since likely this request is still
|
|
||||||
# lingering from before the reset of ``iter_dts()`` via
|
|
||||||
# ``.send()`` below.
|
|
||||||
log.info(f'Discarding request history ending @ {input_end_dt}')
|
|
||||||
|
|
||||||
# signals to ``trimeter`` loop to discard and
|
|
||||||
# ``continue`` in it's schedule loop.
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
# inline sequential loop where we simply pass the
|
||||||
|
# last retrieved start dt to the next request as
|
||||||
|
# it's end dt.
|
||||||
|
starts: set[datetime] = set()
|
||||||
|
while start_dt > last_tsdb_dt:
|
||||||
try:
|
try:
|
||||||
log.info(
|
log.info(
|
||||||
f'Requesting {step_size_s}s frame ending in {input_end_dt}'
|
f'Requesting {step_size_s}s frame ending in {start_dt}'
|
||||||
)
|
)
|
||||||
array, start_dt, end_dt = await hist(end_dt=input_end_dt)
|
array, next_start_dt, end_dt = await hist(
|
||||||
|
timeframe,
|
||||||
|
end_dt=start_dt,
|
||||||
|
)
|
||||||
|
|
||||||
|
if next_start_dt in starts:
|
||||||
|
start_dt = min(starts)
|
||||||
|
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# only update new start point if new
|
||||||
|
start_dt = next_start_dt
|
||||||
|
starts.add(start_dt)
|
||||||
|
|
||||||
assert array['time'][0] == start_dt.timestamp()
|
assert array['time'][0] == start_dt.timestamp()
|
||||||
|
|
||||||
except NoData:
|
except NoData:
|
||||||
|
# XXX: unhandled history gap (shouldn't happen?)
|
||||||
log.warning(
|
log.warning(
|
||||||
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?'
|
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
|
||||||
)
|
)
|
||||||
return None # discard signal
|
await tractor.breakpoint()
|
||||||
|
|
||||||
except DataUnavailable as duerr:
|
except DataUnavailable: # as duerr:
|
||||||
# broker is being a bish and we can't pull
|
# broker is being a bish and we can't pull any more..
|
||||||
# any more..
|
log.warning(
|
||||||
log.warning('backend halted on data deliver !?!?')
|
f'NO-MORE-DATA: backend {mod.name} halted history!?'
|
||||||
|
)
|
||||||
|
|
||||||
# ugh, what's a better way?
|
# ugh, what's a better way?
|
||||||
# TODO: fwiw, we probably want a way to signal a throttle
|
# TODO: fwiw, we probably want a way to signal a throttle
|
||||||
# condition (eg. with ib) so that we can halt the
|
# condition (eg. with ib) so that we can halt the
|
||||||
# request loop until the condition is resolved?
|
# request loop until the condition is resolved?
|
||||||
return duerr
|
return
|
||||||
|
|
||||||
diff = end_dt - start_dt
|
diff = end_dt - start_dt
|
||||||
frame_time_diff_s = diff.seconds
|
frame_time_diff_s = diff.seconds
|
||||||
|
|
@ -419,42 +397,12 @@ async def start_backfill(
|
||||||
# XXX: query result includes a start point prior to our
|
# XXX: query result includes a start point prior to our
|
||||||
# expected "frame size" and thus is likely some kind of
|
# expected "frame size" and thus is likely some kind of
|
||||||
# history gap (eg. market closed period, outage, etc.)
|
# history gap (eg. market closed period, outage, etc.)
|
||||||
# so indicate to the request loop that this gap is
|
# so just report it to console for now.
|
||||||
# expected by both,
|
|
||||||
# - resetting the ``iter_dts()`` generator to start at
|
|
||||||
# the new start point delivered in this result
|
|
||||||
# - setting the non-locally scoped ``earliest_end_dt``
|
|
||||||
# to this new value so that the request loop doesn't
|
|
||||||
# get tripped up thinking there's an out of order
|
|
||||||
# request-result condition.
|
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'History frame ending @ {end_dt} appears to have a gap:\n'
|
f'History frame ending @ {end_dt} appears to have a gap:\n'
|
||||||
f'{diff} ~= {frame_time_diff_s} seconds'
|
f'{diff} ~= {frame_time_diff_s} seconds'
|
||||||
)
|
)
|
||||||
|
|
||||||
# reset dtrange gen to new start point
|
|
||||||
try:
|
|
||||||
next_end = iter_dts_gen.send(start_dt)
|
|
||||||
log.info(
|
|
||||||
f'Reset frame index to start at {start_dt}\n'
|
|
||||||
f'Was at {next_end}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: manually set "earliest end datetime" index-value
|
|
||||||
# to avoid the request loop getting confused about
|
|
||||||
# new frames that are earlier in history - i.e. this
|
|
||||||
# **is not** the case of out-of-order frames from
|
|
||||||
# an async batch request.
|
|
||||||
earliest_end_dt = start_dt
|
|
||||||
|
|
||||||
except StopIteration:
|
|
||||||
# gen already terminated meaning we probably already
|
|
||||||
# exhausted it via frame requests.
|
|
||||||
log.info(
|
|
||||||
"Datetime index already exhausted, can't reset.."
|
|
||||||
)
|
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
|
@ -464,194 +412,53 @@ async def start_backfill(
|
||||||
ln = len(to_push)
|
ln = len(to_push)
|
||||||
if ln:
|
if ln:
|
||||||
log.info(f'{ln} bars for {start_dt} -> {end_dt}')
|
log.info(f'{ln} bars for {start_dt} -> {end_dt}')
|
||||||
frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt)
|
|
||||||
return to_push, start_dt, end_dt
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
|
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
|
||||||
)
|
)
|
||||||
return None
|
|
||||||
|
|
||||||
# initial dt index starts at the start of the first query result
|
# bail gracefully on shm allocation overrun/full condition
|
||||||
idts = iter_dts(start_dt)
|
try:
|
||||||
|
shm.push(to_push, prepend=True)
|
||||||
|
except ValueError:
|
||||||
|
log.info(
|
||||||
|
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
async with trimeter.amap(
|
log.info(
|
||||||
partial(
|
f'Shm pushed {ln} frame:\n'
|
||||||
get_ohlc_frame,
|
f'{start_dt} -> {end_dt}'
|
||||||
# we close in the ``iter_dt()`` gen in so we can send
|
)
|
||||||
# reset signals as needed for gap dection in the
|
|
||||||
# history.
|
|
||||||
iter_dts_gen=idts,
|
|
||||||
),
|
|
||||||
idts,
|
|
||||||
|
|
||||||
capture_outcome=True,
|
if (
|
||||||
include_value=True,
|
storage is not None
|
||||||
|
and write_tsdb
|
||||||
|
):
|
||||||
|
log.info(
|
||||||
|
f'Writing {ln} frame to storage:\n'
|
||||||
|
f'{start_dt} -> {end_dt}'
|
||||||
|
)
|
||||||
|
await storage.write_ohlcv(
|
||||||
|
f'{bfqsn}.{mod.name}', # lul..
|
||||||
|
to_push,
|
||||||
|
timeframe,
|
||||||
|
)
|
||||||
|
|
||||||
# better technical names bruv...
|
# TODO: can we only trigger this if the respective
|
||||||
max_at_once=erlangs,
|
# history in "in view"?!?
|
||||||
max_per_second=rate,
|
|
||||||
|
|
||||||
) as outcomes:
|
# XXX: extremely important, there can be no checkpoints
|
||||||
|
# in the block above to avoid entering new ``frames``
|
||||||
# Then iterate over the return values, as they become available
|
# values while we're pipelining the current ones to
|
||||||
# (i.e., not necessarily in the original order)
|
# memory...
|
||||||
async for input_end_dt, outcome in outcomes:
|
for delay_s in sampler.subscribers:
|
||||||
|
await broadcast(delay_s)
|
||||||
try:
|
|
||||||
out = outcome.unwrap()
|
|
||||||
|
|
||||||
if out is None:
|
|
||||||
# skip signal
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif isinstance(out, DataUnavailable):
|
|
||||||
# no data available case signal.. so just kill
|
|
||||||
# further requests and basically just stop
|
|
||||||
# trying...
|
|
||||||
break
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
log.exception('uhh trimeter bail')
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
to_push, start_dt, end_dt = out
|
|
||||||
|
|
||||||
if not len(to_push):
|
|
||||||
# diff returned no new data (i.e. we probablyl hit
|
|
||||||
# the ``last_tsdb_dt`` point).
|
|
||||||
# TODO: raise instead?
|
|
||||||
log.warning(f'No history for range {start_dt} -> {end_dt}')
|
|
||||||
continue
|
|
||||||
|
|
||||||
# pipeline-style pull frames until we need to wait for
|
|
||||||
# the next in order to arrive.
|
|
||||||
# i = end_dts.index(input_end_dt)
|
|
||||||
# print(f'latest end_dt {end_dt} found at index {i}')
|
|
||||||
|
|
||||||
epochs = list(reversed(sorted(frames)))
|
|
||||||
for epoch in epochs:
|
|
||||||
|
|
||||||
start = shm.array['time'][0]
|
|
||||||
last_shm_prepend_dt = pendulum.from_timestamp(start)
|
|
||||||
earliest_frame_queue_dt = pendulum.from_timestamp(epoch)
|
|
||||||
|
|
||||||
diff = start - epoch
|
|
||||||
|
|
||||||
if diff < 0:
|
|
||||||
log.warning(
|
|
||||||
'Discarding out of order frame:\n'
|
|
||||||
f'{earliest_frame_queue_dt}'
|
|
||||||
)
|
|
||||||
frames.pop(epoch)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if diff > step_size_s:
|
|
||||||
|
|
||||||
if earliest_end_dt < earliest_frame_queue_dt:
|
|
||||||
# XXX: an expected gap was encountered (see
|
|
||||||
# logic in ``get_ohlc_frame()``, so allow
|
|
||||||
# this frame through to the storage layer.
|
|
||||||
log.warning(
|
|
||||||
f'Expected history gap of {diff}s:\n'
|
|
||||||
f'{earliest_frame_queue_dt} <- '
|
|
||||||
f'{earliest_end_dt}'
|
|
||||||
)
|
|
||||||
|
|
||||||
elif (
|
|
||||||
erlangs > 1
|
|
||||||
):
|
|
||||||
# we don't yet have the next frame to push
|
|
||||||
# so break back to the async request loop
|
|
||||||
# while we wait for more async frame-results
|
|
||||||
# to arrive.
|
|
||||||
if len(frames) >= erlangs:
|
|
||||||
log.warning(
|
|
||||||
'Frame count in async-queue is greater '
|
|
||||||
'then erlangs?\n'
|
|
||||||
'There seems to be a gap between:\n'
|
|
||||||
f'{earliest_frame_queue_dt} <- '
|
|
||||||
f'{last_shm_prepend_dt}\n'
|
|
||||||
'Conducting manual call for frame ending: '
|
|
||||||
f'{last_shm_prepend_dt}'
|
|
||||||
)
|
|
||||||
(
|
|
||||||
to_push,
|
|
||||||
start_dt,
|
|
||||||
end_dt,
|
|
||||||
) = await get_ohlc_frame(
|
|
||||||
input_end_dt=last_shm_prepend_dt,
|
|
||||||
iter_dts_gen=idts,
|
|
||||||
)
|
|
||||||
last_epoch = to_push['time'][-1]
|
|
||||||
diff = start - last_epoch
|
|
||||||
|
|
||||||
if diff > step_size_s:
|
|
||||||
await tractor.breakpoint()
|
|
||||||
raise DataUnavailable(
|
|
||||||
'An awkward frame was found:\n'
|
|
||||||
f'{start_dt} -> {end_dt}:\n{to_push}'
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
frames[last_epoch] = (
|
|
||||||
to_push, start_dt, end_dt)
|
|
||||||
break
|
|
||||||
|
|
||||||
expect_end = pendulum.from_timestamp(start)
|
|
||||||
expect_start = expect_end.subtract(
|
|
||||||
seconds=frame_size_s)
|
|
||||||
log.warning(
|
|
||||||
'waiting on out-of-order history frame:\n'
|
|
||||||
f'{expect_end - expect_start}'
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
to_push, start_dt, end_dt = frames.pop(epoch)
|
|
||||||
ln = len(to_push)
|
|
||||||
|
|
||||||
# bail gracefully on shm allocation overrun/full condition
|
|
||||||
try:
|
|
||||||
shm.push(to_push, prepend=True)
|
|
||||||
except ValueError:
|
|
||||||
log.info(
|
|
||||||
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
|
|
||||||
)
|
|
||||||
break
|
|
||||||
|
|
||||||
log.info(
|
|
||||||
f'Shm pushed {ln} frame:\n'
|
|
||||||
f'{start_dt} -> {end_dt}'
|
|
||||||
)
|
|
||||||
# keep track of most recent "prepended" ``start_dt``
|
|
||||||
# both for detecting gaps and ensuring async
|
|
||||||
# frame-result order.
|
|
||||||
earliest_end_dt = start_dt
|
|
||||||
|
|
||||||
if (
|
|
||||||
storage is not None
|
|
||||||
and write_tsdb
|
|
||||||
):
|
|
||||||
log.info(
|
|
||||||
f'Writing {ln} frame to storage:\n'
|
|
||||||
f'{start_dt} -> {end_dt}'
|
|
||||||
)
|
|
||||||
await storage.write_ohlcv(
|
|
||||||
f'{bfqsn}.{mod.name}', # lul..
|
|
||||||
to_push,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: can we only trigger this if the respective
|
|
||||||
# history in "in view"?!?
|
|
||||||
# XXX: extremely important, there can be no checkpoints
|
|
||||||
# in the block above to avoid entering new ``frames``
|
|
||||||
# values while we're pipelining the current ones to
|
|
||||||
# memory...
|
|
||||||
for delay_s in sampler.subscribers:
|
|
||||||
await broadcast(delay_s)
|
|
||||||
|
|
||||||
|
# short-circuit (for now)
|
||||||
bf_done.set()
|
bf_done.set()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
|
|
@ -660,6 +467,7 @@ async def manage_history(
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
some_data_ready: trio.Event,
|
some_data_ready: trio.Event,
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
|
timeframe: float = 60, # in seconds
|
||||||
|
|
||||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
|
@ -683,10 +491,10 @@ async def manage_history(
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
# TODO: history validation
|
# TODO: history validation
|
||||||
if not opened:
|
# if not opened:
|
||||||
raise RuntimeError(
|
# raise RuntimeError(
|
||||||
"Persistent shm for sym was already open?!"
|
# "Persistent shm for sym was already open?!"
|
||||||
)
|
# )
|
||||||
|
|
||||||
rt_shm, opened = maybe_open_shm_array(
|
rt_shm, opened = maybe_open_shm_array(
|
||||||
key=f'{fqsn}_rt',
|
key=f'{fqsn}_rt',
|
||||||
|
|
@ -698,10 +506,10 @@ async def manage_history(
|
||||||
readonly=False,
|
readonly=False,
|
||||||
size=3*_secs_in_day,
|
size=3*_secs_in_day,
|
||||||
)
|
)
|
||||||
if not opened:
|
# if not opened:
|
||||||
raise RuntimeError(
|
# raise RuntimeError(
|
||||||
"Persistent shm for sym was already open?!"
|
# "Persistent shm for sym was already open?!"
|
||||||
)
|
# )
|
||||||
|
|
||||||
log.info('Scanning for existing `marketstored`')
|
log.info('Scanning for existing `marketstored`')
|
||||||
|
|
||||||
|
|
@ -726,7 +534,10 @@ async def manage_history(
|
||||||
# shm backfiller approach below.
|
# shm backfiller approach below.
|
||||||
|
|
||||||
# start history anal and load missing new data via backend.
|
# start history anal and load missing new data via backend.
|
||||||
series, _, last_tsdb_dt = await storage.load(fqsn)
|
series, _, last_tsdb_dt = await storage.load(
|
||||||
|
fqsn,
|
||||||
|
timeframe=timeframe,
|
||||||
|
)
|
||||||
|
|
||||||
broker, symbol, expiry = unpack_fqsn(fqsn)
|
broker, symbol, expiry = unpack_fqsn(fqsn)
|
||||||
(
|
(
|
||||||
|
|
@ -739,6 +550,7 @@ async def manage_history(
|
||||||
mod,
|
mod,
|
||||||
bfqsn,
|
bfqsn,
|
||||||
hist_shm,
|
hist_shm,
|
||||||
|
timeframe=timeframe,
|
||||||
last_tsdb_dt=last_tsdb_dt,
|
last_tsdb_dt=last_tsdb_dt,
|
||||||
tsdb_is_up=True,
|
tsdb_is_up=True,
|
||||||
storage=storage,
|
storage=storage,
|
||||||
|
|
@ -769,7 +581,6 @@ async def manage_history(
|
||||||
else:
|
else:
|
||||||
dt_diff_s = 0
|
dt_diff_s = 0
|
||||||
|
|
||||||
# await trio.sleep_forever()
|
|
||||||
# TODO: see if there's faster multi-field reads:
|
# TODO: see if there's faster multi-field reads:
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
# re-index with a `time` and index field
|
# re-index with a `time` and index field
|
||||||
|
|
@ -804,6 +615,7 @@ async def manage_history(
|
||||||
series = await storage.read_ohlcv(
|
series = await storage.read_ohlcv(
|
||||||
fqsn,
|
fqsn,
|
||||||
end=end,
|
end=end,
|
||||||
|
timeframe=timeframe,
|
||||||
)
|
)
|
||||||
history = list(series.values())
|
history = list(series.values())
|
||||||
fastest = history[0]
|
fastest = history[0]
|
||||||
|
|
@ -856,6 +668,7 @@ async def manage_history(
|
||||||
mod,
|
mod,
|
||||||
bfqsn,
|
bfqsn,
|
||||||
hist_shm,
|
hist_shm,
|
||||||
|
timeframe=timeframe,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -387,6 +387,7 @@ class Storage:
|
||||||
async def load(
|
async def load(
|
||||||
self,
|
self,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
|
timeframe: int,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
dict[int, np.ndarray], # timeframe (in secs) to series
|
dict[int, np.ndarray], # timeframe (in secs) to series
|
||||||
|
|
@ -400,12 +401,16 @@ class Storage:
|
||||||
# on first load we don't need to pull the max
|
# on first load we don't need to pull the max
|
||||||
# history per request size worth.
|
# history per request size worth.
|
||||||
limit=3000,
|
limit=3000,
|
||||||
|
timeframe=timeframe,
|
||||||
)
|
)
|
||||||
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
||||||
|
|
||||||
if tsdb_arrays:
|
if len(tsdb_arrays):
|
||||||
fastest = list(tsdb_arrays.values())[0]
|
# fastest = list(tsdb_arrays.values())[0]
|
||||||
times = fastest['Epoch']
|
# slowest = list(tsdb_arrays.values())[-1]
|
||||||
|
hist = tsdb_arrays[timeframe]
|
||||||
|
|
||||||
|
times = hist['Epoch']
|
||||||
first, last = times[0], times[-1]
|
first, last = times[0], times[-1]
|
||||||
first_tsdb_dt, last_tsdb_dt = map(
|
first_tsdb_dt, last_tsdb_dt = map(
|
||||||
pendulum.from_timestamp, [first, last]
|
pendulum.from_timestamp, [first, last]
|
||||||
|
|
@ -420,9 +425,9 @@ class Storage:
|
||||||
end: Optional[int] = None,
|
end: Optional[int] = None,
|
||||||
limit: int = int(800e3),
|
limit: int = int(800e3),
|
||||||
|
|
||||||
) -> tuple[
|
) -> dict[
|
||||||
MarketstoreClient,
|
int,
|
||||||
Union[dict, np.ndarray]
|
Union[dict, np.ndarray],
|
||||||
]:
|
]:
|
||||||
client = self.client
|
client = self.client
|
||||||
syms = await client.list_symbols()
|
syms = await client.list_symbols()
|
||||||
|
|
@ -430,7 +435,8 @@ class Storage:
|
||||||
if fqsn not in syms:
|
if fqsn not in syms:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
tfstr = tf_in_1s[1]
|
# use the provided timeframe or 1s by default
|
||||||
|
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
|
||||||
|
|
||||||
params = Params(
|
params = Params(
|
||||||
symbols=fqsn,
|
symbols=fqsn,
|
||||||
|
|
@ -463,39 +469,52 @@ class Storage:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
result = await client.query(params)
|
params.set('timeframe', tfstr)
|
||||||
|
try:
|
||||||
|
result = await client.query(params)
|
||||||
|
except purerpc.grpclib.exceptions.UnknownError:
|
||||||
|
# indicate there is no history for this timeframe
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Fill out a `numpy` array-results map keyed by timeframe
|
||||||
|
arrays = {}
|
||||||
|
|
||||||
# TODO: it turns out column access on recarrays is actually slower:
|
# TODO: it turns out column access on recarrays is actually slower:
|
||||||
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
||||||
# it might make sense to make these structured arrays?
|
# it might make sense to make these structured arrays?
|
||||||
# Fill out a `numpy` array-results map
|
|
||||||
arrays = {}
|
|
||||||
for fqsn, data_set in result.by_symbols().items():
|
for fqsn, data_set in result.by_symbols().items():
|
||||||
arrays.setdefault(fqsn, {})[
|
arrays.setdefault(fqsn, {})[
|
||||||
tf_in_1s.inverse[data_set.timeframe]
|
tf_in_1s.inverse[data_set.timeframe]
|
||||||
] = data_set.array
|
] = data_set.array
|
||||||
|
|
||||||
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
|
return arrays[fqsn]
|
||||||
|
|
||||||
async def delete_ts(
|
async def delete_ts(
|
||||||
self,
|
self,
|
||||||
key: str,
|
key: str,
|
||||||
timeframe: Optional[Union[int, str]] = None,
|
timeframe: Optional[Union[int, str]] = None,
|
||||||
|
fmt: str = 'OHLCV',
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
|
||||||
client = self.client
|
client = self.client
|
||||||
syms = await client.list_symbols()
|
syms = await client.list_symbols()
|
||||||
print(syms)
|
print(syms)
|
||||||
# if key not in syms:
|
if key not in syms:
|
||||||
# raise KeyError(f'`{fqsn}` table key not found?')
|
raise KeyError(f'`{key}` table key not found in\n{syms}?')
|
||||||
|
|
||||||
return await client.destroy(tbk=key)
|
tbk = mk_tbk((
|
||||||
|
key,
|
||||||
|
tf_in_1s.get(timeframe, tf_in_1s[60]),
|
||||||
|
fmt,
|
||||||
|
))
|
||||||
|
return await client.destroy(tbk=tbk)
|
||||||
|
|
||||||
async def write_ohlcv(
|
async def write_ohlcv(
|
||||||
self,
|
self,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
ohlcv: np.ndarray,
|
ohlcv: np.ndarray,
|
||||||
|
timeframe: int,
|
||||||
append_and_duplicate: bool = True,
|
append_and_duplicate: bool = True,
|
||||||
limit: int = int(800e3),
|
limit: int = int(800e3),
|
||||||
|
|
||||||
|
|
@ -525,7 +544,7 @@ class Storage:
|
||||||
# write to db
|
# write to db
|
||||||
resp = await self.client.write(
|
resp = await self.client.write(
|
||||||
to_push,
|
to_push,
|
||||||
tbk=f'{fqsn}/1Sec/OHLCV',
|
tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV',
|
||||||
|
|
||||||
# NOTE: will will append duplicates
|
# NOTE: will will append duplicates
|
||||||
# for the same timestamp-index.
|
# for the same timestamp-index.
|
||||||
|
|
@ -577,6 +596,7 @@ class Storage:
|
||||||
# def delete_range(self, start_dt, end_dt) -> None:
|
# def delete_range(self, start_dt, end_dt) -> None:
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_storage_client(
|
async def open_storage_client(
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
|
|
@ -642,7 +662,7 @@ async def tsdb_history_update(
|
||||||
):
|
):
|
||||||
profiler(f'opened feed for {fqsn}')
|
profiler(f'opened feed for {fqsn}')
|
||||||
|
|
||||||
to_append = feed.shm.array
|
to_append = feed.hist_shm.array
|
||||||
to_prepend = None
|
to_prepend = None
|
||||||
|
|
||||||
if fqsn:
|
if fqsn:
|
||||||
|
|
@ -651,7 +671,7 @@ async def tsdb_history_update(
|
||||||
fqsn = symbol.front_fqsn()
|
fqsn = symbol.front_fqsn()
|
||||||
|
|
||||||
# diff db history with shm and only write the missing portions
|
# diff db history with shm and only write the missing portions
|
||||||
ohlcv = feed.shm.array
|
ohlcv = feed.hist_shm.array
|
||||||
|
|
||||||
# TODO: use pg profiler
|
# TODO: use pg profiler
|
||||||
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
|
|
|
||||||
71
piker/pp.py
71
piker/pp.py
|
|
@ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
|
||||||
(looking at you `ib` and dirt-bird friends)
|
(looking at you `ib` and dirt-bird friends)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
from contextlib import contextmanager as cm
|
from contextlib import contextmanager as cm
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import os
|
import os
|
||||||
|
|
@ -138,13 +139,31 @@ class Position(Struct):
|
||||||
|
|
||||||
# ordered record of known constituent trade messages
|
# ordered record of known constituent trade messages
|
||||||
clears: dict[
|
clears: dict[
|
||||||
Union[str, int, Status], # trade id
|
str | int, # trade id
|
||||||
dict[str, Any], # transaction history summaries
|
dict[str, Any], # transaction history summaries
|
||||||
] = {}
|
] = {}
|
||||||
first_clear_dt: Optional[datetime] = None
|
first_clear_dt: Optional[datetime] = None
|
||||||
|
|
||||||
expiry: Optional[datetime] = None
|
expiry: Optional[datetime] = None
|
||||||
|
|
||||||
|
# @property
|
||||||
|
# def clears(self) -> dict[
|
||||||
|
# Union[str, int, Status], # trade id
|
||||||
|
# dict[str, Any], # transaction history summaries
|
||||||
|
# ]:
|
||||||
|
# '''
|
||||||
|
# Datetime sorted reference to internal clears table.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# # self._clears = {}
|
||||||
|
# self._clears = dict(sorted(
|
||||||
|
# self._clears.items(),
|
||||||
|
# key=lambda entry: entry[1]['dt'],
|
||||||
|
# ))
|
||||||
|
# # self._clears[k] = v
|
||||||
|
|
||||||
|
# return self._clears
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> dict:
|
||||||
return {
|
return {
|
||||||
f: getattr(self, f)
|
f: getattr(self, f)
|
||||||
|
|
@ -219,6 +238,10 @@ class Position(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
clears = list(self.clears.values())
|
clears = list(self.clears.values())
|
||||||
|
if not clears:
|
||||||
|
log.warning(f'No clears table for {self.symbol}!?')
|
||||||
|
return
|
||||||
|
|
||||||
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
|
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
|
||||||
last_clear = clears[-1]
|
last_clear = clears[-1]
|
||||||
|
|
||||||
|
|
@ -623,6 +646,7 @@ class PpTable(Struct):
|
||||||
|
|
||||||
def to_toml(
|
def to_toml(
|
||||||
self,
|
self,
|
||||||
|
min_clears: bool = True,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
|
|
||||||
active, closed = self.dump_active()
|
active, closed = self.dump_active()
|
||||||
|
|
@ -635,7 +659,9 @@ class PpTable(Struct):
|
||||||
|
|
||||||
# keep the minimal amount of clears that make up this
|
# keep the minimal amount of clears that make up this
|
||||||
# position since the last net-zero state.
|
# position since the last net-zero state.
|
||||||
pos.minimize_clears()
|
if min_clears:
|
||||||
|
pos.minimize_clears()
|
||||||
|
|
||||||
pos.ensure_state()
|
pos.ensure_state()
|
||||||
|
|
||||||
# serialize to pre-toml form
|
# serialize to pre-toml form
|
||||||
|
|
@ -682,6 +708,8 @@ def load_pps_from_ledger(
|
||||||
brokername: str,
|
brokername: str,
|
||||||
acctname: str,
|
acctname: str,
|
||||||
|
|
||||||
|
table: Optional[PpTable] = None,
|
||||||
|
|
||||||
# post normalization filter on ledger entries to be processed
|
# post normalization filter on ledger entries to be processed
|
||||||
filter_by: Optional[list[dict]] = None,
|
filter_by: Optional[list[dict]] = None,
|
||||||
|
|
||||||
|
|
@ -698,7 +726,6 @@ def load_pps_from_ledger(
|
||||||
'''
|
'''
|
||||||
with (
|
with (
|
||||||
open_trade_ledger(brokername, acctname) as ledger,
|
open_trade_ledger(brokername, acctname) as ledger,
|
||||||
open_pps(brokername, acctname) as table,
|
|
||||||
):
|
):
|
||||||
if not ledger:
|
if not ledger:
|
||||||
# null case, no ledger file with content
|
# null case, no ledger file with content
|
||||||
|
|
@ -716,7 +743,11 @@ def load_pps_from_ledger(
|
||||||
else:
|
else:
|
||||||
records = src_records
|
records = src_records
|
||||||
|
|
||||||
updated = table.update_from_trans(records)
|
if table is None:
|
||||||
|
with open_pps(brokername, acctname) as table:
|
||||||
|
updated = table.update_from_trans(records)
|
||||||
|
else:
|
||||||
|
updated = table.update_from_trans(records)
|
||||||
|
|
||||||
return records, updated
|
return records, updated
|
||||||
|
|
||||||
|
|
@ -886,15 +917,27 @@ def open_pps(
|
||||||
conf=conf,
|
conf=conf,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# first pass populate all missing clears record tables
|
||||||
|
# for fqsn, entry in pps.items():
|
||||||
|
# # convert clears sub-tables (only in this form
|
||||||
|
# # for toml re-presentation) back into a master table.
|
||||||
|
# clears_list = entry.get('clears', [])
|
||||||
|
|
||||||
|
# # attempt to reload from ledger
|
||||||
|
# if not clears_list:
|
||||||
|
# trans, pos = load_pps_from_ledger(
|
||||||
|
# brokername,
|
||||||
|
# acctid,
|
||||||
|
# filter_by=[entry['bsuid']],
|
||||||
|
# table=table,
|
||||||
|
# )
|
||||||
|
# # breakpoint()
|
||||||
|
|
||||||
# unmarshal/load ``pps.toml`` config entries into object form
|
# unmarshal/load ``pps.toml`` config entries into object form
|
||||||
# and update `PpTable` obj entries.
|
# and update `PpTable` obj entries.
|
||||||
for fqsn, entry in pps.items():
|
for fqsn, entry in pps.items():
|
||||||
bsuid = entry['bsuid']
|
bsuid = entry['bsuid']
|
||||||
|
|
||||||
# convert clears sub-tables (only in this form
|
|
||||||
# for toml re-presentation) back into a master table.
|
|
||||||
clears_list = entry['clears']
|
|
||||||
|
|
||||||
# index clears entries in "object" form by tid in a top
|
# index clears entries in "object" form by tid in a top
|
||||||
# level dict instead of a list (as is presented in our
|
# level dict instead of a list (as is presented in our
|
||||||
# ``pps.toml``).
|
# ``pps.toml``).
|
||||||
|
|
@ -906,6 +949,18 @@ def open_pps(
|
||||||
# processing of new clear events.
|
# processing of new clear events.
|
||||||
trans: list[Transaction] = []
|
trans: list[Transaction] = []
|
||||||
|
|
||||||
|
# convert clears sub-tables (only in this form
|
||||||
|
# for toml re-presentation) back into a master table.
|
||||||
|
clears_list = entry['clears']
|
||||||
|
|
||||||
|
# # attempt to reload from ledger
|
||||||
|
# if not clears_list:
|
||||||
|
# trans, pos = load_pps_from_ledger(
|
||||||
|
# brokername,
|
||||||
|
# acctid,
|
||||||
|
# table=table,
|
||||||
|
# )
|
||||||
|
|
||||||
for clears_table in clears_list:
|
for clears_table in clears_list:
|
||||||
tid = clears_table.pop('tid')
|
tid = clears_table.pop('tid')
|
||||||
dtstr = clears_table['dt']
|
dtstr = clears_table['dt']
|
||||||
|
|
|
||||||
|
|
@ -249,14 +249,14 @@ async def graphics_update_loop(
|
||||||
linked: LinkedSplits = godwidget.rt_linked
|
linked: LinkedSplits = godwidget.rt_linked
|
||||||
display_rate = godwidget.window.current_screen().refreshRate()
|
display_rate = godwidget.window.current_screen().refreshRate()
|
||||||
|
|
||||||
chart = linked.chart
|
fast_chart = linked.chart
|
||||||
hist_chart = godwidget.hist_linked.chart
|
hist_chart = godwidget.hist_linked.chart
|
||||||
|
|
||||||
ohlcv = feed.rt_shm
|
ohlcv = feed.rt_shm
|
||||||
hist_ohlcv = feed.hist_shm
|
hist_ohlcv = feed.hist_shm
|
||||||
|
|
||||||
# update last price sticky
|
# update last price sticky
|
||||||
last_price_sticky = chart._ysticks[chart.name]
|
last_price_sticky = fast_chart._ysticks[fast_chart.name]
|
||||||
last_price_sticky.update_from_data(
|
last_price_sticky.update_from_data(
|
||||||
*ohlcv.array[-1][['index', 'close']]
|
*ohlcv.array[-1][['index', 'close']]
|
||||||
)
|
)
|
||||||
|
|
@ -268,7 +268,7 @@ async def graphics_update_loop(
|
||||||
|
|
||||||
maxmin = partial(
|
maxmin = partial(
|
||||||
chart_maxmin,
|
chart_maxmin,
|
||||||
chart,
|
fast_chart,
|
||||||
ohlcv,
|
ohlcv,
|
||||||
vlm_chart,
|
vlm_chart,
|
||||||
)
|
)
|
||||||
|
|
@ -282,15 +282,15 @@ async def graphics_update_loop(
|
||||||
|
|
||||||
last, volume = ohlcv.array[-1][['close', 'volume']]
|
last, volume = ohlcv.array[-1][['close', 'volume']]
|
||||||
|
|
||||||
symbol = chart.linked.symbol
|
symbol = fast_chart.linked.symbol
|
||||||
|
|
||||||
l1 = L1Labels(
|
l1 = L1Labels(
|
||||||
chart,
|
fast_chart,
|
||||||
# determine precision/decimal lengths
|
# determine precision/decimal lengths
|
||||||
digits=symbol.tick_size_digits,
|
digits=symbol.tick_size_digits,
|
||||||
size_digits=symbol.lot_size_digits,
|
size_digits=symbol.lot_size_digits,
|
||||||
)
|
)
|
||||||
chart._l1_labels = l1
|
fast_chart._l1_labels = l1
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# - in theory we should be able to read buffer data faster
|
# - in theory we should be able to read buffer data faster
|
||||||
|
|
@ -300,10 +300,10 @@ async def graphics_update_loop(
|
||||||
# levels this might be dark volume we need to
|
# levels this might be dark volume we need to
|
||||||
# present differently -> likely dark vlm
|
# present differently -> likely dark vlm
|
||||||
|
|
||||||
tick_size = chart.linked.symbol.tick_size
|
tick_size = fast_chart.linked.symbol.tick_size
|
||||||
tick_margin = 3 * tick_size
|
tick_margin = 3 * tick_size
|
||||||
|
|
||||||
chart.show()
|
fast_chart.show()
|
||||||
last_quote = time.time()
|
last_quote = time.time()
|
||||||
i_last = ohlcv.index
|
i_last = ohlcv.index
|
||||||
|
|
||||||
|
|
@ -313,7 +313,7 @@ async def graphics_update_loop(
|
||||||
'maxmin': maxmin,
|
'maxmin': maxmin,
|
||||||
'ohlcv': ohlcv,
|
'ohlcv': ohlcv,
|
||||||
'hist_ohlcv': hist_ohlcv,
|
'hist_ohlcv': hist_ohlcv,
|
||||||
'chart': chart,
|
'chart': fast_chart,
|
||||||
'last_price_sticky': last_price_sticky,
|
'last_price_sticky': last_price_sticky,
|
||||||
'hist_last_price_sticky': hist_last_price_sticky,
|
'hist_last_price_sticky': hist_last_price_sticky,
|
||||||
'l1': l1,
|
'l1': l1,
|
||||||
|
|
@ -333,7 +333,7 @@ async def graphics_update_loop(
|
||||||
ds.vlm_chart = vlm_chart
|
ds.vlm_chart = vlm_chart
|
||||||
ds.vlm_sticky = vlm_sticky
|
ds.vlm_sticky = vlm_sticky
|
||||||
|
|
||||||
chart.default_view()
|
fast_chart.default_view()
|
||||||
|
|
||||||
# TODO: probably factor this into some kinda `DisplayState`
|
# TODO: probably factor this into some kinda `DisplayState`
|
||||||
# API that can be reused at least in terms of pulling view
|
# API that can be reused at least in terms of pulling view
|
||||||
|
|
@ -410,16 +410,16 @@ async def graphics_update_loop(
|
||||||
last_quote = time.time()
|
last_quote = time.time()
|
||||||
|
|
||||||
# chart isn't active/shown so skip render cycle and pause feed(s)
|
# chart isn't active/shown so skip render cycle and pause feed(s)
|
||||||
if chart.linked.isHidden():
|
if fast_chart.linked.isHidden():
|
||||||
# print('skipping update')
|
# print('skipping update')
|
||||||
chart.pause_all_feeds()
|
fast_chart.pause_all_feeds()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
ic = chart.view._ic
|
# ic = fast_chart.view._ic
|
||||||
if ic:
|
# if ic:
|
||||||
chart.pause_all_feeds()
|
# fast_chart.pause_all_feeds()
|
||||||
await ic.wait()
|
# await ic.wait()
|
||||||
chart.resume_all_feeds()
|
# fast_chart.resume_all_feeds()
|
||||||
|
|
||||||
# sync call to update all graphics/UX components.
|
# sync call to update all graphics/UX components.
|
||||||
graphics_update_cycle(ds)
|
graphics_update_cycle(ds)
|
||||||
|
|
@ -502,6 +502,7 @@ def graphics_update_cycle(
|
||||||
or trigger_all
|
or trigger_all
|
||||||
):
|
):
|
||||||
chart.increment_view(steps=i_diff)
|
chart.increment_view(steps=i_diff)
|
||||||
|
chart.view._set_yrange(yrange=(mn, mx))
|
||||||
|
|
||||||
if vlm_chart:
|
if vlm_chart:
|
||||||
vlm_chart.increment_view(steps=i_diff)
|
vlm_chart.increment_view(steps=i_diff)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue