Merge pull request #401 from pikers/ib_1m_hist

Ib 1m hist
no_signal_pi_overlays
goodboy 2022-10-29 13:14:53 -04:00 committed by GitHub
commit 11ecf9cb09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1334 additions and 1072 deletions

View File

@ -195,9 +195,8 @@ async def open_piker_runtime(
) -> Optional[tractor._portal.Portal]: ) -> Optional[tractor._portal.Portal]:
''' '''
Start a piker actor who's runtime will automatically Start a piker actor who's runtime will automatically sync with
sync with existing piker actors in local network existing piker actors on the local link based on configuration.
based on configuration.
''' '''
global _services global _services

View File

@ -36,7 +36,11 @@ import tractor
import wsproto import wsproto
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import (
resproc,
SymbolNotFound,
DataUnavailable,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data.types import Struct from ..data.types import Struct
@ -388,6 +392,7 @@ async def open_history_client(
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:
async def get_ohlc( async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
@ -396,6 +401,8 @@ async def open_history_client(
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars( array = await client.bars(
symbol, symbol,

View File

@ -43,6 +43,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
import pendulum
import ib_insync as ibis import ib_insync as ibis
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
@ -52,6 +53,7 @@ from ib_insync.contract import (
from ib_insync.order import Order from ib_insync.order import Order
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
from ib_insync.objects import ( from ib_insync.objects import (
BarDataList,
Position, Position,
Fill, Fill,
Execution, Execution,
@ -78,26 +80,11 @@ _time_units = {
'h': ' hours', 'h': ' hours',
} }
_time_frames = { _bar_sizes = {
'1s': '1 Sec', 1: '1 Sec',
'5s': '5 Sec', 60: '1 min',
'30s': '30 Sec', 60*60: '1 hour',
'1m': 'OneMinute', 24*60*60: '1 day',
'2m': 'TwoMinutes',
'3m': 'ThreeMinutes',
'4m': 'FourMinutes',
'5m': 'FiveMinutes',
'10m': 'TenMinutes',
'15m': 'FifteenMinutes',
'20m': 'TwentyMinutes',
'30m': 'HalfHour',
'1h': 'OneHour',
'2h': 'TwoHours',
'4h': 'FourHours',
'D': 'OneDay',
'W': 'OneWeek',
'M': 'OneMonth',
'Y': 'OneYear',
} }
_show_wap_in_history: bool = False _show_wap_in_history: bool = False
@ -199,7 +186,8 @@ _adhoc_futes_set = {
'lb.nymex', # random len lumber 'lb.nymex', # random len lumber
# metals # metals
'xauusd.cmdty', # gold spot # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'gc.nymex', 'gc.nymex',
'mgc.nymex', # micro 'mgc.nymex', # micro
@ -257,14 +245,12 @@ _exch_skip_list = {
'PSE', 'PSE',
} }
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
_enters = 0 _enters = 0
def bars_to_np(bars: list) -> np.ndarray: def bars_to_np(bars: list) -> np.ndarray:
''' '''
Convert a "bars list thing" (``BarsList`` type from ibis) Convert a "bars list thing" (``BarDataList`` type from ibis)
into a numpy struct array. into a numpy struct array.
''' '''
@ -284,6 +270,27 @@ def bars_to_np(bars: list) -> np.ndarray:
return nparr return nparr
# NOTE: pacing violations exist for higher sample rates:
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations
# Also see note on duration limits being lifted on 1m+ periods,
# but they say "use with discretion":
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
_samplings: dict[int, tuple[str, str]] = {
1: (
'1 secs',
f'{int(2e3)} S',
pendulum.duration(seconds=2e3),
),
# TODO: benchmark >1 D duration on query to see if
# throughput can be made faster during backfilling.
60: (
'1 min',
'1 D',
pendulum.duration(days=1),
),
}
class Client: class Client:
''' '''
IB wrapped for our broker backend API. IB wrapped for our broker backend API.
@ -338,19 +345,32 @@ class Client:
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str] = "", end_dt: Union[datetime, str] = "",
sample_period_s: str = 1, # ohlc sample period # ohlc sample period in seconds
period_count: int = int(2e3), # <- max per 1s sample query sample_period_s: int = 1,
) -> list[dict[str, Any]]: # optional "duration of time" equal to the
# length of the returned history frame.
duration: Optional[str] = None,
**kwargs,
) -> tuple[BarDataList, np.ndarray, pendulum.Duration]:
''' '''
Retreive OHLCV bars for a fqsn over a range to the present. Retreive OHLCV bars for a fqsn over a range to the present.
''' '''
# See API docs here:
# https://interactivebrokers.github.io/tws-api/historical_data.html
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
bars_kwargs.update(kwargs)
bar_size, duration, dt_duration = _samplings[sample_period_s]
global _enters global _enters
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
print(f'REQUESTING BARS {_enters} @ end={end_dt}') print(
f"REQUESTING {duration}'s worth {bar_size} BARS\n"
f'{_enters} @ end={end_dt}"'
)
if not end_dt: if not end_dt:
end_dt = '' end_dt = ''
@ -360,30 +380,20 @@ class Client:
contract = (await self.find_contracts(fqsn))[0] contract = (await self.find_contracts(fqsn))[0]
bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
contract, contract,
endDateTime=end_dt, endDateTime=end_dt,
formatDate=2, formatDate=2,
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values: # OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins, # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins, # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs', barSizeSetting=bar_size,
# durationStr='{count} S'.format(count=15000 * 5), # time history length values format:
# durationStr='{count} D'.format(count=1), # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# barSizeSetting='5 secs', durationStr=duration,
durationStr='{count} S'.format(count=period_count),
# barSizeSetting='5 secs',
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# always use extended hours # always use extended hours
useRTH=False, useRTH=False,
@ -394,11 +404,21 @@ class Client:
# whatToShow='TRADES', # whatToShow='TRADES',
) )
if not bars: if not bars:
# TODO: raise underlying error here # NOTE: there's 2 cases here to handle (and this should be
raise ValueError(f"No bars retreived for {fqsn}?") # read alongside the implementation of
# ``.reqHistoricalDataAsync()``):
# - no data is returned for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
nparr = bars_to_np(bars) nparr = bars_to_np(bars)
return bars, nparr return bars, nparr, dt_duration
async def con_deats( async def con_deats(
self, self,
@ -463,7 +483,7 @@ class Client:
self, self,
pattern: str, pattern: str,
# how many contracts to search "up to" # how many contracts to search "up to"
upto: int = 6, upto: int = 16,
asdicts: bool = True, asdicts: bool = True,
) -> dict[str, ContractDetails]: ) -> dict[str, ContractDetails]:
@ -498,6 +518,16 @@ class Client:
exch = tract.exchange exch = tract.exchange
if exch not in _exch_skip_list: if exch not in _exch_skip_list:
# try to lookup any contracts from our adhoc set
# since often the exchange/venue is named slightly
# different (eg. BRR.CMECRYPTO` instead of just
# `.CME`).
info = _adhoc_symbol_map.get(sym)
if info:
con_kwargs, bars_kwargs = info
exch = con_kwargs['exchange']
# try get all possible contracts for symbol as per, # try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = ibis.Future( con = ibis.Future(
@ -748,11 +778,14 @@ class Client:
async def get_head_time( async def get_head_time(
self, self,
contract: Contract, fqsn: str,
) -> datetime:
"""Return the first datetime stamp for ``contract``.
""" ) -> datetime:
'''
Return the first datetime stamp for ``contract``.
'''
contract = (await self.find_contracts(fqsn))[0]
return await self.ib.reqHeadTimeStampAsync( return await self.ib.reqHeadTimeStampAsync(
contract, contract,
whatToShow='TRADES', whatToShow='TRADES',
@ -822,9 +855,7 @@ class Client:
# async to be consistent for the client proxy, and cuz why not. # async to be consistent for the client proxy, and cuz why not.
def submit_limit( def submit_limit(
self, self,
# ignored since ib doesn't support defining your oid: str, # ignored since doesn't support defining your own
# own order id
oid: str,
symbol: str, symbol: str,
price: float, price: float,
action: str, action: str,
@ -840,6 +871,9 @@ class Client:
''' '''
Place an order and return integer request id provided by client. Place an order and return integer request id provided by client.
Relevant docs:
- https://interactivebrokers.github.io/tws-api/order_limitations.html
''' '''
try: try:
contract = self._contracts[symbol] contract = self._contracts[symbol]
@ -865,6 +899,9 @@ class Client:
optOutSmartRouting=True, optOutSmartRouting=True,
routeMarketableToBbo=True, routeMarketableToBbo=True,
designatedLocation='SMART', designatedLocation='SMART',
# TODO: make all orders GTC?
# https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba
# goodTillDate=f"yyyyMMdd-HH:mm:ss",
), ),
) )
except AssertionError: # errrg insync.. except AssertionError: # errrg insync..
@ -1066,6 +1103,7 @@ async def load_aio_clients(
# retry a few times to get the client going.. # retry a few times to get the client going..
connect_retries: int = 3, connect_retries: int = 3,
connect_timeout: float = 0.5, connect_timeout: float = 0.5,
disconnect_on_exit: bool = True,
) -> dict[str, Client]: ) -> dict[str, Client]:
''' '''
@ -1207,6 +1245,7 @@ async def load_aio_clients(
finally: finally:
# TODO: for re-scans we'll want to not teardown clients which # TODO: for re-scans we'll want to not teardown clients which
# are up and stable right? # are up and stable right?
if disconnect_on_exit:
for acct, client in _accounts2clients.items(): for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}') log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect() client.ib.disconnect()

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch entry['listingExchange'] = pexch
conf = get_config() conf = get_config()
entries = trades_to_ledger_entries( entries = api_trades_to_ledger_entries(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
@ -362,7 +362,7 @@ async def update_and_audit_msgs(
# if ib reports a lesser pp it's not as bad since we can # if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we # presume we're at least not more in the shit then we
# thought. # thought.
if diff: if diff and pikersize:
reverse_split_ratio = pikersize / ibsize reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio split_ratio = 1/reverse_split_ratio
@ -372,6 +372,7 @@ async def update_and_audit_msgs(
entry = f'split_ratio = 1/{int(reverse_split_ratio)}' entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError( raise ValueError(
# log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n' f'ib: {ibppmsg}\n'
f'piker: {msg}\n' f'piker: {msg}\n'
@ -1122,18 +1123,16 @@ def norm_trade_records(
continue continue
# timestamping is way different in API records # timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date') date = record.get('date')
if not date: flex_dtstr = record.get('dateTime')
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
else: if dtstr or date:
# epoch_dt = pendulum.from_timestamp(record.get('time')) dt = pendulum.parse(dtstr or date)
dt = pendulum.parse(date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from # special handling of symbol extraction from
# flex records using some ad-hoc schema parsing. # flex records using some ad-hoc schema parsing.
@ -1182,41 +1181,29 @@ def norm_trade_records(
return {r.tid: r for r in records} return {r.tid: r for r in records}
def trades_to_ledger_entries( def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
accounts: bidict, accounts: bidict,
trade_entries: list[object], trade_entries: list[object],
source_type: str = 'api',
) -> dict: ) -> dict:
''' '''
Convert either of API execution objects or flex report Convert API execution objects entry objects into ``dict`` form,
entry objects into ``dict`` form, pretty much straight up pretty much straight up without modification except add
without modification. a `pydatetime` field from the parsed timestamp.
''' '''
trades_by_account = {} trades_by_account = {}
for t in trade_entries: for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
# NOTE: example of schema we pull from the API client. # NOTE: example of schema we pull from the API client.
# { # {
# 'commissionReport': CommissionReport(... # 'commissionReport': CommissionReport(...
@ -1243,7 +1230,8 @@ def trades_to_ledger_entries(
tid = str(entry['execId']) tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time']) dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str? # TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt) entry['pydatetime'] = dt
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']] acctid = accounts[entry['acctNumber']]
if not tid: if not tid:
@ -1262,6 +1250,73 @@ def trades_to_ledger_entries(
acctid, {} acctid, {}
)[tid] = entry )[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account return trades_by_account
@ -1308,15 +1363,16 @@ def load_flex_trades(
ln = len(trade_entries) ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query') log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries( trades_by_account = flex_records_to_ledger_entries(
# get reverse map to user account names conf['accounts'].inverse, # reverse map to user account names
conf['accounts'].inverse,
trade_entries, trade_entries,
source_type='flex',
) )
ledger_dict: Optional[dict] = None
for acctid in trades_by_account: for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid] trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict: with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict) tid_delta = set(trades_by_id) - set(ledger_dict)
log.info( log.info(
@ -1324,9 +1380,11 @@ def load_flex_trades(
f'{pformat(tid_delta)}' f'{pformat(tid_delta)}'
) )
if tid_delta: if tid_delta:
ledger_dict.update( sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta} {tid: trades_by_id[tid] for tid in tid_delta}.items(),
) key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)
return ledger_dict return ledger_dict

View File

@ -22,6 +22,7 @@ import asyncio
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial
from math import isnan from math import isnan
import time import time
from typing import ( from typing import (
@ -38,8 +39,11 @@ import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from piker.data._sharedmem import ShmArray from .._util import (
from .._util import SymbolNotFound, NoData NoData,
DataUnavailable,
SymbolNotFound,
)
from .api import ( from .api import (
# _adhoc_futes_set, # _adhoc_futes_set,
con2fqsn, con2fqsn,
@ -103,7 +107,7 @@ async def open_data_client() -> MethodProxy:
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, fqsn: str,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
''' '''
@ -111,26 +115,65 @@ async def open_history_client(
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
''' '''
# TODO:
# - add logic to handle tradable hours and only grab
# valid bars in the range?
# - we want to avoid overrunning the underlying shm array buffer and
# we should probably calc the number of calls to make depending on
# that until we have the `marketstore` daemon in place in which case
# the shm size will be driven by user config and available sys
# memory.
async with open_data_client() as proxy: async with open_data_client() as proxy:
max_timeout: float = 2.
mean: float = 0
count: int = 0
head_dt = await proxy.get_head_time(fqsn=fqsn)
async def get_hist( async def get_hist(
timeframe: float,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
) -> tuple[np.ndarray, str]: ) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
out, fails = await get_bars(proxy, symbol, end_dt=end_dt) query_start = time.time()
out, timedout = await get_bars(
proxy,
fqsn,
timeframe,
end_dt=end_dt,
)
latency = time.time() - query_start
if (
not timedout
# and latency <= max_timeout
):
count += 1
mean += latency / count
print(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
# TODO: add logic here to handle tradable hours and only grab if (
# valid bars in the range out is None
if out is None: ):
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?") log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData( raise NoData(
f'{end_dt}', f'{end_dt}',
frame_size=2000, # frame_size=2000,
) )
if (
end_dt and end_dt <= head_dt
):
raise DataUnavailable(f'First timestamp is {head_dt}')
bars, bars_array, first_dt, last_dt = out bars, bars_array, first_dt, last_dt = out
# volume cleaning since there's -ve entries, # volume cleaning since there's -ve entries,
@ -145,7 +188,7 @@ async def open_history_client(
# quite sure why.. needs some tinkering and probably # quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe # a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side? # we have to do the batch queries on the `asyncio` side?
yield get_hist, {'erlangs': 1, 'rate': 6} yield get_hist, {'erlangs': 1, 'rate': 3}
_pacing: str = ( _pacing: str = (
@ -154,102 +197,19 @@ _pacing: str = (
) )
async def get_bars( async def wait_on_data_reset(
proxy: MethodProxy, proxy: MethodProxy,
fqsn: str, reset_type: str = 'data',
timeout: float = 16,
# blank to start which tells ib to look up the latest datum task_status: TaskStatus[
end_dt: str = '', tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
) -> (dict, np.ndarray):
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
fails = 0
bars: Optional[list] = None
first_dt: datetime = None
last_dt: datetime = None
if end_dt:
last_dt = pendulum.from_timestamp(end_dt.timestamp())
for _ in range(10):
try:
out = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
)
if out:
bars, bars_array = out
else:
await tractor.breakpoint()
if bars_array is None:
raise SymbolNotFound(fqsn)
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
last_dt = pendulum.from_timestamp(
bars[-1].date.timestamp())
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
except RequestError as err:
msg = err.message
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
elif (
err.code == 162 and
'HMDS query returned no data' in err.message
):
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
# index since the upper layer may be doing so
# concurrently and we don't want to be delivering frames
# that weren't asked for.
log.warning(
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
raise NoData(
f'Symbol: {fqsn}',
frame_size=2000,
)
# elif (
# err.code == 162 and
# 'Trading TWS session is connected from a different IP
# address' in err.message
# ):
# log.warning("ignoring ip address warning")
# continue
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
# TODO: we might have to put a task lock around this # TODO: we might have to put a task lock around this
# method.. # method..
hist_ev = proxy.status_event( hist_ev = proxy.status_event(
@ -265,150 +225,259 @@ async def get_bars(
# live_ev = proxy.status_event( # live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture' # 'Market data farm connection is OK:usfuture'
# ) # )
# try to wait on the reset event(s) to arrive, a timeout # try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now). # will trigger a retry up to 6 times (for now).
tries: int = 2
timeout: float = 10
# try 3 time with a data reset then fail over to done = trio.Event()
# a connection reset. with trio.move_on_after(timeout) as cs:
for i in range(1, tries):
task_status.started((cs, done))
log.warning('Sending DATA RESET request') log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data') res = await data_reset_hack(reset_type=reset_type)
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
break
if cs.cancelled_caught:
fails += 1
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else:
log.warning('Sending CONNECTION RESET')
res = await data_reset_hack(reset_type='connection')
if not res: if not res:
log.warning( log.warning(
'NO VNC DETECTED!\n' 'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app' 'Manually press ctrl-alt-f on your IB java app'
) )
# break done.set()
return False
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events # TODO: not sure if waiting on other events
# is all that useful here or not. in theory # is all that useful here or not.
# you could wait on one of the ones above # - in theory you could wait on one of the ones above first
# first to verify the reset request was # to verify the reset request was sent?
# sent? # - we need the same for real-time quote feeds which can
# sometimes flake out and stop delivering..
for name, ev in [
('history', hist_ev), ('history', hist_ev),
]: ]:
await ev.wait() await ev.wait()
log.info(f"{name} DATA RESET") log.info(f"{name} DATA RESET")
done.set()
return True
if cs.cancelled_caught: if cs.cancel_called:
fails += 1 log.warning(
log.warning('Data CONNECTION RESET timeout!?') 'Data reset task canceled?'
)
done.set()
return False
_data_resetter_task: trio.Task | None = None
async def get_bars(
proxy: MethodProxy,
fqsn: str,
timeframe: int,
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency?
# how long before we trigger a feed reset (seconds)
feed_reset_timeout: float = 3,
# how many days to subtract before giving up on further
# history queries for instrument, presuming that most don't
# not trade for a week XD
max_nodatas: int = 6,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> (dict, np.ndarray):
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
global _data_resetter_task
nodatas_count: int = 0
data_cs: trio.CancelScope | None = None
result: tuple[
ibis.objects.BarDataList,
np.ndarray,
datetime,
datetime,
] | None = None
result_ready = trio.Event()
async def query():
nonlocal result, data_cs, end_dt, nodatas_count
while True:
try:
out = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
sample_period_s=timeframe,
# ideally we cancel the request just before we
# cancel on the ``trio``-side and trigger a data
# reset hack.. the problem is there's no way (with
# current impl) to detect a cancel case.
# timeout=timeout,
)
if out is None:
raise NoData(f'{end_dt}')
bars, bars_array, dt_duration = out
if not bars:
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
if bars_array is None:
raise SymbolNotFound(fqsn)
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
last_dt = pendulum.from_timestamp(
bars[-1].date.timestamp())
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived {first_dt} -> {last_dt}'
)
if data_cs:
data_cs.cancel()
result = (bars, bars_array, first_dt, last_dt)
# signal data reset loop parent task
result_ready.set()
return result
except RequestError as err:
msg = err.message
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
elif err.code == 162:
if (
'HMDS query returned no data' in msg
):
# XXX: this is now done in the storage mgmt
# layer and we shouldn't implicitly decrement
# the frame dt index since the upper layer may
# be doing so concurrently and we don't want to
# be delivering frames that weren't asked for.
# try to decrement start point and look further back
# end_dt = end_dt.subtract(seconds=2000)
logmsg = "SUBTRACTING DAY from DT index"
if end_dt is not None:
end_dt = end_dt.subtract(days=1)
elif end_dt is None:
end_dt = pendulum.now().subtract(days=1)
log.warning(
f'NO DATA found ending @ {end_dt}\n'
+ logmsg
)
if nodatas_count >= max_nodatas:
raise DataUnavailable(
f'Presuming {fqsn} has no further history '
f'after {max_nodatas} tries..'
)
nodatas_count += 1
continue
elif 'API historical data query cancelled' in err.message:
log.warning(
'Query cancelled by IB (:eyeroll:):\n'
f'{err.message}'
)
continue
elif (
'Trading TWS session is connected from a different IP'
in err.message
):
log.warning("ignoring ip address warning")
continue
# XXX: more or less same as above timeout case
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
# cancel any existing reset task
if data_cs:
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
timeout=float('inf'),
reset_type='connection'
)
)
continue
else: else:
raise raise
return None, None # TODO: make this global across all history task/requests
# else: # throttle wasn't fixed so error out immediately # such that simultaneous symbol queries don't try data resettingn
# raise _err # too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
async def backfill_bars( # start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
while not result_ready.is_set():
fqsn: str, with trio.move_on_after(feed_reset_timeout):
shm: ShmArray, # type: ignore # noqa await result_ready.wait()
break
# TODO: we want to avoid overrunning the underlying shm array buffer if _data_resetter_task:
# and we should probably calc the number of calls to make depending # don't double invoke the reset hack if another
# on that until we have the `marketstore` daemon in place in which # requester task already has it covered.
# case the shm size will be driven by user config and available sys
# memory.
count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
'''
# last_dt1 = None
last_dt = None
with trio.CancelScope() as cs:
async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, first_dt, last_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
last_dt = first_dt
# write historical data to buffer
shm.push(bars_array)
task_status.started(cs)
i = 0
while i < count:
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
if out is None:
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
# only grab valid bars in the range
log.error(f"Can't grab bars starting at {first_dt}!?!?")
# XXX: get_bars() should internally decrement dt by
# 2k seconds and try again.
continue continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter = True
(first_bars, bars_array, first_dt, last_dt) = out # spawn new data reset task
# last_dt1 = last_dt data_cs, reset_done = await nurse.start(
# last_dt = first_dt partial(
wait_on_data_reset,
proxy,
timeout=float('inf'),
)
)
# sync wait on reset to complete
await reset_done.wait()
# volume cleaning since there's -ve entries, _data_resetter_task = None if unset_resetter else _data_resetter_task
# wood luv to know what crookery that is.. return result, data_cs is not None
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True)
i += 1
asset_type_map = { asset_type_map = {
@ -466,7 +535,9 @@ async def _setup_quote_stream(
to_trio.send_nowait(None) to_trio.send_nowait(None)
async with load_aio_clients() as accts2clients: async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients) caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -512,10 +583,11 @@ async def _setup_quote_stream(
# Manually do the dereg ourselves. # Manually do the dereg ourselves.
teardown() teardown()
except trio.WouldBlock: except trio.WouldBlock:
log.warning( # log.warning(
f'channel is blocking symbol feed for {symbol}?' # f'channel is blocking symbol feed for {symbol}?'
f'\n{to_trio.statistics}' # f'\n{to_trio.statistics}'
) # )
pass
# except trio.WouldBlock: # except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt # # for slow debugging purposes to avoid clobbering prompt
@ -545,7 +617,8 @@ async def open_aio_quote_stream(
from_aio = _quote_streams.get(symbol) from_aio = _quote_streams.get(symbol)
if from_aio: if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer # if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver( async with broadcast_receiver(
from_aio, from_aio,
2**6, 2**6,
@ -736,17 +809,45 @@ async def stream_quotes(
await trio.sleep_forever() await trio.sleep_forever()
return # we never expect feed to come up? return # we never expect feed to come up?
async with open_aio_quote_stream( cs: Optional[trio.CancelScope] = None
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as stream: ) as stream,
):
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup = False
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream): async with aclosing(stream):
if syminfo.get('no_vlm', False): if syminfo.get('no_vlm', False):
@ -754,29 +855,31 @@ async def stream_quotes(
# include vlm data. # include vlm data.
atype = syminfo['asset_type'] atype = syminfo['asset_type']
log.info( log.info(
f'Non-vlm asset {sym}@{atype}, skipping quote poll...' f'No-vlm {sym}@{atype}, skipping quote poll'
) )
else: else:
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be
# closed)
while True: while True:
ticker = await stream.receive() ticker = await stream.receive()
# for a real volume contract we rait for the first # for a real volume contract we rait for
# "real" trade to take place # the first "real" trade to take place
if ( if (
# not calc_price # not calc_price
# and not ticker.rtTime # and not ticker.rtTime
not ticker.rtTime not ticker.rtTime
): ):
# spin consuming tickers until we get a real # spin consuming tickers until we
# market datum # get a real market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
log.debug("Received first real volume tick") log.debug("Received first volume tick")
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've
# (ahem, ib_insync is truly stateful trash) # consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# XXX: this works because we don't use # XXX: this works because we don't use
@ -904,7 +1007,14 @@ async def open_symbol_search(
except trio.WouldBlock: except trio.WouldBlock:
pass pass
if not pattern or pattern.isspace(): if (
not pattern
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
or len(pattern) < 1
):
log.warning('empty pattern received, skipping..') log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client # TODO: *BUG* if nothing is returned here the client

View File

@ -259,6 +259,7 @@ async def open_history_client(
queries: int = 0 queries: int = 0
async def get_ohlc( async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,

View File

@ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir):
@click.pass_obj @click.pass_obj
def services(config, tl, names): def services(config, tl, names):
async def list_services(): from .._daemon import open_piker_runtime
async with tractor.get_arbiter( async def list_services():
async with (
open_piker_runtime(
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr'] *_tractor_kwargs['arbiter_addr']
) as portal: ) as portal
):
registry = await portal.run_from_ns('self', 'get_registry') registry = await portal.run_from_ns('self', 'get_registry')
json_d = {} json_d = {}
for key, socket in registry.items(): for key, socket in registry.items():
# name, uuid = uid
host, port = socket host, port = socket
json_d[key] = f'{host}:{port}' json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}") click.echo(f"{colorize_json(json_d)}")
tractor.run( trio.run(list_services)
list_services,
name='service_query',
loglevel=config['loglevel'] if tl else None,
arbiter_addr=_tractor_kwargs['arbiter_addr'],
)
def _load_clis() -> None: def _load_clis() -> None:

File diff suppressed because it is too large Load Diff

View File

@ -387,50 +387,57 @@ class Storage:
async def load( async def load(
self, self,
fqsn: str, fqsn: str,
timeframe: int,
) -> tuple[ ) -> tuple[
dict[int, np.ndarray], # timeframe (in secs) to series np.ndarray, # timeframe sampled array-series
Optional[datetime], # first dt Optional[datetime], # first dt
Optional[datetime], # last dt Optional[datetime], # last dt
]: ]:
first_tsdb_dt, last_tsdb_dt = None, None first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv( hist = await self.read_ohlcv(
fqsn, fqsn,
# on first load we don't need to pull the max # on first load we don't need to pull the max
# history per request size worth. # history per request size worth.
limit=3000, limit=3000,
timeframe=timeframe,
) )
log.info(f'Loaded tsdb history {tsdb_arrays}') log.info(f'Loaded tsdb history {hist}')
if tsdb_arrays: if len(hist):
fastest = list(tsdb_arrays.values())[0] times = hist['Epoch']
times = fastest['Epoch']
first, last = times[0], times[-1] first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map( first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last] pendulum.from_timestamp, [first, last]
) )
return tsdb_arrays, first_tsdb_dt, last_tsdb_dt return (
hist, # array-data
first_tsdb_dt, # start of query-frame
last_tsdb_dt, # most recent
)
async def read_ohlcv( async def read_ohlcv(
self, self,
fqsn: str, fqsn: str,
timeframe: Optional[Union[int, str]] = None, timeframe: int | str,
end: Optional[int] = None, end: Optional[int] = None,
limit: int = int(800e3), limit: int = int(800e3),
) -> tuple[ ) -> dict[
MarketstoreClient, int,
Union[dict, np.ndarray] Union[dict, np.ndarray],
]: ]:
client = self.client client = self.client
syms = await client.list_symbols() syms = await client.list_symbols()
if fqsn not in syms: if fqsn not in syms:
return {} return {}
tfstr = tf_in_1s[1] # use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
params = Params( params = Params(
symbols=fqsn, symbols=fqsn,
@ -444,58 +451,68 @@ class Storage:
limit=limit, limit=limit,
) )
if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity
for tfstr in tf_in_1s.values():
try: try:
log.info(f'querying for {tfstr}@{fqsn}')
params.set('timeframe', tfstr)
result = await client.query(params) result = await client.query(params)
break
except purerpc.grpclib.exceptions.UnknownError: except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and # indicate there is no history for this timeframe
# thus shows up through `marketstored` logs relay.
# log.warning(f'{tfstr}@{fqsn} not found')
continue
else:
return {} return {}
else:
result = await client.query(params)
# TODO: it turns out column access on recarrays is actually slower: # TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays? # it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map data_set = result.by_symbols()[fqsn]
arrays = {} array = data_set.array
for fqsn, data_set in result.by_symbols().items():
arrays.setdefault(fqsn, {})[
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] # XXX: ensure sample rate is as expected
time = data_set.array['Epoch']
if len(time) > 1:
time_step = time[-1] - time[-2]
ts = tf_in_1s.inverse[data_set.timeframe]
if time_step != ts:
log.warning(
f'MKTS BUG: wrong timeframe loaded: {time_step}'
'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG'
f'WIPING HISTORY FOR {ts}s'
)
await self.delete_ts(fqsn, timeframe)
# try reading again..
return await self.read_ohlcv(
fqsn,
timeframe,
end,
limit,
)
return array
async def delete_ts( async def delete_ts(
self, self,
key: str, key: str,
timeframe: Optional[Union[int, str]] = None, timeframe: Optional[Union[int, str]] = None,
fmt: str = 'OHLCV',
) -> bool: ) -> bool:
client = self.client client = self.client
syms = await client.list_symbols() syms = await client.list_symbols()
print(syms) print(syms)
# if key not in syms: if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?') raise KeyError(f'`{key}` table key not found in\n{syms}?')
return await client.destroy(tbk=key) tbk = mk_tbk((
key,
tf_in_1s.get(timeframe, tf_in_1s[60]),
fmt,
))
return await client.destroy(tbk=tbk)
async def write_ohlcv( async def write_ohlcv(
self, self,
fqsn: str, fqsn: str,
ohlcv: np.ndarray, ohlcv: np.ndarray,
timeframe: int,
append_and_duplicate: bool = True, append_and_duplicate: bool = True,
limit: int = int(800e3), limit: int = int(800e3),
@ -519,17 +536,18 @@ class Storage:
m, r = divmod(len(mkts_array), limit) m, r = divmod(len(mkts_array), limit)
tfkey = tf_in_1s[timeframe]
for i in range(m, 1): for i in range(m, 1):
to_push = mkts_array[i-1:i*limit] to_push = mkts_array[i-1:i*limit]
# write to db # write to db
resp = await self.client.write( resp = await self.client.write(
to_push, to_push,
tbk=f'{fqsn}/1Sec/OHLCV', tbk=f'{fqsn}/{tfkey}/OHLCV',
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# TODO: pre deduplicate? # TODO: pre-deduplicate?
isvariablelength=append_and_duplicate, isvariablelength=append_and_duplicate,
) )
@ -548,7 +566,7 @@ class Storage:
# write to db # write to db
resp = await self.client.write( resp = await self.client.write(
to_push, to_push,
tbk=f'{fqsn}/1Sec/OHLCV', tbk=f'{fqsn}/{tfkey}/OHLCV',
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
@ -577,6 +595,7 @@ class Storage:
# def delete_range(self, start_dt, end_dt) -> None: # def delete_range(self, start_dt, end_dt) -> None:
# ... # ...
@acm @acm
async def open_storage_client( async def open_storage_client(
fqsn: str, fqsn: str,
@ -642,8 +661,8 @@ async def tsdb_history_update(
): ):
profiler(f'opened feed for {fqsn}') profiler(f'opened feed for {fqsn}')
to_append = feed.shm.array # to_append = feed.hist_shm.array
to_prepend = None # to_prepend = None
if fqsn: if fqsn:
symbol = feed.symbols.get(fqsn) symbol = feed.symbols.get(fqsn)
@ -651,21 +670,21 @@ async def tsdb_history_update(
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions # diff db history with shm and only write the missing portions
ohlcv = feed.shm.array # ohlcv = feed.hist_shm.array
# TODO: use pg profiler # TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn) # for secs in (1, 60):
# hist diffing # tsdb_array = await storage.read_ohlcv(
if tsdb_arrays: # fqsn,
for secs in (1, 60): # timeframe=timeframe,
ts = tsdb_arrays.get(secs) # )
if ts is not None and len(ts): # # hist diffing:
# these aren't currently used but can be referenced from # # these aren't currently used but can be referenced from
# within the embedded ipython shell below. # # within the embedded ipython shell below.
to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] # to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] # to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]]
profiler('Finished db arrays diffs') # profiler('Finished db arrays diffs')
syms = await storage.client.list_symbols() syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')

View File

@ -49,7 +49,9 @@ from ._fsp import (
has_vlm, has_vlm,
open_vlm_displays, open_vlm_displays,
) )
from ..data._sharedmem import ShmArray from ..data._sharedmem import (
ShmArray,
)
from ..data._source import tf_in_1s from ..data._source import tf_in_1s
from ._forms import ( from ._forms import (
FieldsForm, FieldsForm,
@ -249,14 +251,14 @@ async def graphics_update_loop(
linked: LinkedSplits = godwidget.rt_linked linked: LinkedSplits = godwidget.rt_linked
display_rate = godwidget.window.current_screen().refreshRate() display_rate = godwidget.window.current_screen().refreshRate()
chart = linked.chart fast_chart = linked.chart
hist_chart = godwidget.hist_linked.chart hist_chart = godwidget.hist_linked.chart
ohlcv = feed.rt_shm ohlcv = feed.rt_shm
hist_ohlcv = feed.hist_shm hist_ohlcv = feed.hist_shm
# update last price sticky # update last price sticky
last_price_sticky = chart._ysticks[chart.name] last_price_sticky = fast_chart._ysticks[fast_chart.name]
last_price_sticky.update_from_data( last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']] *ohlcv.array[-1][['index', 'close']]
) )
@ -268,7 +270,7 @@ async def graphics_update_loop(
maxmin = partial( maxmin = partial(
chart_maxmin, chart_maxmin,
chart, fast_chart,
ohlcv, ohlcv,
vlm_chart, vlm_chart,
) )
@ -282,15 +284,15 @@ async def graphics_update_loop(
last, volume = ohlcv.array[-1][['close', 'volume']] last, volume = ohlcv.array[-1][['close', 'volume']]
symbol = chart.linked.symbol symbol = fast_chart.linked.symbol
l1 = L1Labels( l1 = L1Labels(
chart, fast_chart,
# determine precision/decimal lengths # determine precision/decimal lengths
digits=symbol.tick_size_digits, digits=symbol.tick_size_digits,
size_digits=symbol.lot_size_digits, size_digits=symbol.lot_size_digits,
) )
chart._l1_labels = l1 fast_chart._l1_labels = l1
# TODO: # TODO:
# - in theory we should be able to read buffer data faster # - in theory we should be able to read buffer data faster
@ -300,10 +302,10 @@ async def graphics_update_loop(
# levels this might be dark volume we need to # levels this might be dark volume we need to
# present differently -> likely dark vlm # present differently -> likely dark vlm
tick_size = chart.linked.symbol.tick_size tick_size = fast_chart.linked.symbol.tick_size
tick_margin = 3 * tick_size tick_margin = 3 * tick_size
chart.show() fast_chart.show()
last_quote = time.time() last_quote = time.time()
i_last = ohlcv.index i_last = ohlcv.index
@ -313,7 +315,7 @@ async def graphics_update_loop(
'maxmin': maxmin, 'maxmin': maxmin,
'ohlcv': ohlcv, 'ohlcv': ohlcv,
'hist_ohlcv': hist_ohlcv, 'hist_ohlcv': hist_ohlcv,
'chart': chart, 'chart': fast_chart,
'last_price_sticky': last_price_sticky, 'last_price_sticky': last_price_sticky,
'hist_last_price_sticky': hist_last_price_sticky, 'hist_last_price_sticky': hist_last_price_sticky,
'l1': l1, 'l1': l1,
@ -333,7 +335,7 @@ async def graphics_update_loop(
ds.vlm_chart = vlm_chart ds.vlm_chart = vlm_chart
ds.vlm_sticky = vlm_sticky ds.vlm_sticky = vlm_sticky
chart.default_view() fast_chart.default_view()
# TODO: probably factor this into some kinda `DisplayState` # TODO: probably factor this into some kinda `DisplayState`
# API that can be reused at least in terms of pulling view # API that can be reused at least in terms of pulling view
@ -410,16 +412,16 @@ async def graphics_update_loop(
last_quote = time.time() last_quote = time.time()
# chart isn't active/shown so skip render cycle and pause feed(s) # chart isn't active/shown so skip render cycle and pause feed(s)
if chart.linked.isHidden(): if fast_chart.linked.isHidden():
# print('skipping update') # print('skipping update')
chart.pause_all_feeds() fast_chart.pause_all_feeds()
continue continue
ic = chart.view._ic # ic = fast_chart.view._ic
if ic: # if ic:
chart.pause_all_feeds() # fast_chart.pause_all_feeds()
await ic.wait() # await ic.wait()
chart.resume_all_feeds() # fast_chart.resume_all_feeds()
# sync call to update all graphics/UX components. # sync call to update all graphics/UX components.
graphics_update_cycle(ds) graphics_update_cycle(ds)
@ -502,6 +504,7 @@ def graphics_update_cycle(
or trigger_all or trigger_all
): ):
chart.increment_view(steps=i_diff) chart.increment_view(steps=i_diff)
chart.view._set_yrange(yrange=(mn, mx))
if vlm_chart: if vlm_chart:
vlm_chart.increment_view(steps=i_diff) vlm_chart.increment_view(steps=i_diff)
@ -806,6 +809,140 @@ def graphics_update_cycle(
flow.draw_last(array_key=curve_name) flow.draw_last(array_key=curve_name)
async def link_views_with_region(
rt_chart: ChartPlotWidget,
hist_chart: ChartPlotWidget,
feed: Feed,
) -> None:
# these value are be only pulled once during shm init/startup
izero_hist = feed.izero_hist
izero_rt = feed.izero_rt
# Add the LinearRegionItem to the ViewBox, but tell the ViewBox
# to exclude this item when doing auto-range calculations.
rt_pi = rt_chart.plotItem
hist_pi = hist_chart.plotItem
region = pg.LinearRegionItem(
movable=False,
# color scheme that matches sidepane styling
pen=pg.mkPen(hcolor('gunmetal')),
brush=pg.mkBrush(hcolor('default_darkest')),
)
region.setZValue(10) # put linear region "in front" in layer terms
hist_pi.addItem(region, ignoreBounds=True)
flow = rt_chart._flows[hist_chart.name]
assert flow
# XXX: no idea why this doesn't work but it's causing
# a weird placement of the region on the way-far-left..
# region.setClipItem(flow.graphics)
# poll for datums load and timestep detection
for _ in range(100):
try:
_, _, ratio = feed.get_ds_info()
break
except IndexError:
await trio.sleep(0.01)
continue
else:
raise RuntimeError(
'Failed to detect sampling periods from shm!?')
# sampling rate transform math:
# -----------------------------
# define the fast chart to slow chart as a linear mapping
# over the fast index domain `i` to the slow index domain
# `j` as:
#
# j = i - i_offset
# ------------ + j_offset
# j/i
#
# conversely the inverse function is:
#
# i = j/i * (j - j_offset) + i_offset
#
# Where `j_offset` is our ``izero_hist`` and `i_offset` is our
# `izero_rt`, the ``ShmArray`` offsets which correspond to the
# indexes in each array where the "current" time is indexed at init.
# AKA the index where new data is "appended to" and historical data
# if "prepended from".
#
# more practically (and by default) `i` is normally an index
# into 1s samples and `j` is an index into 60s samples (aka 1m).
# in the below handlers ``ratio`` is the `j/i` and ``mn``/``mx``
# are the low and high index input from the source index domain.
def update_region_from_pi(
window,
viewRange: tuple[tuple, tuple],
is_manual: bool = True,
) -> None:
# put linear region "in front" in layer terms
region.setZValue(10)
# set the region on the history chart
# to the range currently viewed in the
# HFT/real-time chart.
mn, mx = viewRange[0]
ds_mn = (mn - izero_rt)/ratio
ds_mx = (mx - izero_rt)/ratio
lhmn = ds_mn + izero_hist
lhmx = ds_mx + izero_hist
# print(
# f'rt_view_range: {(mn, mx)}\n'
# f'ds_mn, ds_mx: {(ds_mn, ds_mx)}\n'
# f'lhmn, lhmx: {(lhmn, lhmx)}\n'
# )
region.setRegion((
lhmn,
lhmx,
))
# TODO: if we want to have the slow chart adjust range to
# match the fast chart's selection -> results in the
# linear region expansion never can go "outside of view".
# hmn, hmx = hvr = hist_chart.view.state['viewRange'][0]
# print((hmn, hmx))
# if (
# hvr
# and (lhmn < hmn or lhmx > hmx)
# ):
# hist_pi.setXRange(
# lhmn,
# lhmx,
# padding=0,
# )
# hist_linked.graphics_cycle()
# connect region to be updated on plotitem interaction.
rt_pi.sigRangeChanged.connect(update_region_from_pi)
def update_pi_from_region():
region.setZValue(10)
mn, mx = region.getRegion()
# print(f'region_x: {(mn, mx)}')
rt_pi.setXRange(
((mn - izero_hist) * ratio) + izero_rt,
((mx - izero_hist) * ratio) + izero_rt,
padding=0,
)
# TODO BUG XXX: seems to cause a real perf hit and a recursion error
# (but used to work before generalizing for 1s ohlc offset?)..
# something to do with the label callback handlers?
# region.sigRegionChanged.connect(update_pi_from_region)
# region.sigRegionChangeFinished.connect(update_pi_from_region)
async def display_symbol_data( async def display_symbol_data(
godwidget: GodWidget, godwidget: GodWidget,
provider: str, provider: str,
@ -850,10 +987,6 @@ async def display_symbol_data(
ohlcv: ShmArray = feed.rt_shm ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm hist_ohlcv: ShmArray = feed.hist_shm
# this value needs to be pulled once and only once during
# startup
end_index = feed.startup_hist_index
symbol = feed.symbols[sym] symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
@ -917,91 +1050,6 @@ async def display_symbol_data(
# add_label=False, # add_label=False,
# ) # )
# Add the LinearRegionItem to the ViewBox, but tell the ViewBox
# to exclude this item when doing auto-range calculations.
rt_pi = chart.plotItem
hist_pi = hist_chart.plotItem
region = pg.LinearRegionItem(
# color scheme that matches sidepane styling
pen=pg.mkPen(hcolor('gunmetal')),
brush=pg.mkBrush(hcolor('default_darkest')),
)
region.setZValue(10) # put linear region "in front" in layer terms
hist_pi.addItem(region, ignoreBounds=True)
flow = chart._flows[hist_chart.name]
assert flow
# XXX: no idea why this doesn't work but it's causing
# a weird placement of the region on the way-far-left..
# region.setClipItem(flow.graphics)
# poll for datums load and timestep detection
for _ in range(100):
try:
_, _, ratio = feed.get_ds_info()
break
except IndexError:
await trio.sleep(0.01)
continue
else:
raise RuntimeError(
'Failed to detect sampling periods from shm!?')
def update_pi_from_region():
region.setZValue(10)
mn, mx = region.getRegion()
# print(f'region_x: {(mn, mx)}')
# XXX: seems to cause a real perf hit?
rt_pi.setXRange(
(mn - end_index) * ratio,
(mx - end_index) * ratio,
padding=0,
)
region.sigRegionChanged.connect(update_pi_from_region)
def update_region_from_pi(
window,
viewRange: tuple[tuple, tuple],
is_manual: bool = True,
) -> None:
# set the region on the history chart
# to the range currently viewed in the
# HFT/real-time chart.
mn, mx = viewRange[0]
ds_mn = mn/ratio
ds_mx = mx/ratio
# print(
# f'rt_view_range: {(mn, mx)}\n'
# f'ds_mn, ds_mx: {(ds_mn, ds_mx)}\n'
# )
lhmn = ds_mn + end_index
lhmx = ds_mx + end_index
region.setRegion((
lhmn,
lhmx,
))
# TODO: if we want to have the slow chart adjust range to
# match the fast chart's selection -> results in the
# linear region expansion never can go "outside of view".
# hmn, hmx = hvr = hist_chart.view.state['viewRange'][0]
# print((hmn, hmx))
# if (
# hvr
# and (lhmn < hmn or lhmx > hmx)
# ):
# hist_pi.setXRange(
# lhmn,
# lhmx,
# padding=0,
# )
# hist_linked.graphics_cycle()
# connect region to be updated on plotitem interaction.
rt_pi.sigRangeChanged.connect(update_region_from_pi)
# NOTE: we must immediately tell Qt to show the OHLC chart # NOTE: we must immediately tell Qt to show the OHLC chart
# to avoid a race where the subplots get added/shown to # to avoid a race where the subplots get added/shown to
# the linked set *before* the main price chart! # the linked set *before* the main price chart!
@ -1069,6 +1117,12 @@ async def display_symbol_data(
godwidget.resize_all() godwidget.resize_all()
await link_views_with_region(
chart,
hist_chart,
feed,
)
mode: OrderMode mode: OrderMode
async with ( async with (
open_order_mode( open_order_mode(

View File

@ -58,8 +58,11 @@ async def notify_from_ems_status_msg(
if is_subproc: if is_subproc:
global _dbus_uid global _dbus_uid
if not _dbus_uid: su = os.environ.get('SUDO_USER')
su = os.environ['SUDO_USER'] if (
not _dbus_uid
and su
):
# TODO: use `trio` but we need to use nursery.start() # TODO: use `trio` but we need to use nursery.start()
# to use pipes? # to use pipes?

View File

@ -517,7 +517,9 @@ class OrderMode:
_, _, ratio = self.feed.get_ds_info() _, _, ratio = self.feed.get_ds_info()
for i, chart in [ for i, chart in [
(arrow_index, self.chart), (arrow_index, self.chart),
(self.feed.startup_hist_index + round(arrow_index/ratio), (self.feed.izero_hist
+
round((arrow_index - self.feed.izero_rt)/ratio),
self.hist_chart) self.hist_chart)
]: ]:
self.arrows.add( self.arrows.add(