Compare commits

...

23 Commits

Author SHA1 Message Date
Tyler Goodlet c53071e43a WIP adding draft-commented code to try and get splits workin.. 2022-10-19 13:18:19 -04:00
Tyler Goodlet 41ffccc59e Add data-reset-task global state var
Allows keeping mutex state around data reset requests which (if more
then one are sent) can cause a throttling condition where ib's servers
will get slower and slower to conduct a reconnect. With this you can
have multiple ongoing contract requests without hitting that issue and
we can go back to having a nice 3s timeout on the history queries before
activating the hack.
2022-10-19 13:03:47 -04:00
Tyler Goodlet 55856f5e8b Add back adhoc symbol lookup support, some exchs info is off 2022-10-16 14:34:34 -04:00
Tyler Goodlet e8ab28e456 Order ledger entries by processed datetime
To make it easier to manually read/decipher long ledger files this adds
`dict` sorting based on record-type-specific (api vs. flex report)
datetime processing prior to ledger file write.

- break up parsers into separate routines for flex and api record
  processing.
- add `parse_flex_dt()` for special handling of the weird semicolon
  stamps in flex reports.
2022-10-10 09:27:30 -04:00
Tyler Goodlet d2b6216994 Comment format tweak 2022-10-10 09:27:16 -04:00
Tyler Goodlet eb743759a4 Subtract duration instead of passing to `.subtract()` (facepalm) 2022-10-10 09:27:16 -04:00
Tyler Goodlet 74910ba56c Fix `piker services`; `tractor.run()` is done.. 2022-10-10 09:27:16 -04:00
Tyler Goodlet 28535fa977 Re-request quote feed on data reset events
When a network outage or data feed connection is reset often the
`ib_insync` task will hang until some kind of (internal?) timeout takes
place or, in some (worst) cases it never re-establishes (the event
stream) and thus the backend needs to restart or the live feed will
never resume..

In order to avoid this issue once and for all this patch implements an
additional (extremely simple) task that is started with the  real-time
feed and simply waits for any market data reset events; when detected
restarts the `open_aio_quote_stream()` call in a loop using
a surrounding cancel scope.

Been meaning to implement this for ages and it's finally working!
2022-10-10 09:27:16 -04:00
Tyler Goodlet 1d7e642dbd Support no-disconnect on `open_aio_clients()` exit
Allows for easier restarts of certain `trio` side tasks without killing
the `asyncio`-side clients; support via flag.

Also fix a bug in `Client.bars()`: we need to return the duration on the
empty bars case..
2022-10-10 09:27:16 -04:00
Tyler Goodlet 69be65237f Drop duplicate frame request
Must have gotten left in during refactor from the `trimeter` version?
Drop down to 6 years for 1m sampling.
2022-10-10 09:27:16 -04:00
Tyler Goodlet 96f5a8abb8 Return history-frame duration from `.bars()`
This allows the history manager to know the decrement size for
`end_dt: datetime` on the next query if a no-data / gap case was
encountered; subtract this in `get_bars()` in such cases. Define the
expected `pendulum.Duration`s in the `.api._samplings` table.

Also add a bit of query latency profiling that we may use later to more
dynamically determine timeout driven data feed resets. Factor the `162`
error cases into a common exception handler block.
2022-10-10 09:27:16 -04:00
Tyler Goodlet 13e886c967 Explicit fast chart naming, auto-yrange the fast chart on increment 2022-10-10 09:27:16 -04:00
Tyler Goodlet 2c6b832e50 More correct no-data output handling
When we get a timeout or a `NoData` condition still return a tuple of
empty sequences instead of `None` from `Client.bars()`. Move the
sampling period-duration table to module level.
2022-10-10 09:27:16 -04:00
Tyler Goodlet d5c3124722 Drop `trimeter`-ized concurrent history querying
It doesn't seem to be any slower on our least throttled backend
(binance) and it removes a bunch of hard to get correct frame
re-ordering logic that i'm not sure really ever fully worked XD

Commented some issues we still need to resolve as well.
2022-10-10 09:27:16 -04:00
Tyler Goodlet c939e75ef9 Rework history frame request concurrency
Manual tinker-testing demonstrated that triggering data resets
completely independent of the frame request gets more throughput and
further, that repeated requests (for the same frame after cancelling on
the `trio`-side) can yield duplicate frame responses. Re-work the
dual-task structure to instead have one task wait indefinitely on the
frame response (and thus not trigger duplicate frames) and the 2nd data
reset task poll for the first task to complete in a poll loop which
terminates when the frame arrives via an event.

Dirty deatz:
- make `get_bars()` take an optional timeout (which will eventually be
  dynamically passed from the history mgmt machinery) and move request
  logic inside a new `query()` closure meant to be spawned in a task
  which sets an event on frame arrival, add data reset poll loop in the
  main/parent task, deliver result on nursery completion.
- handle frame request cancelled event case without crash.
- on no-frame result (due to real history gap) hack in a 1 day decrement
  case which we need to eventually allow the caller to control likely
  based on measured frame rx latency.
- make `wait_on_data_reset()` a predicate without output indicating
  reset success as well as `trio.Nursery.start()` compat so that it can
  be started in a new task with the started values yielded being
  a cancel scope and completion event.
- drop the legacy `backfill_bars()`, not longer used.
2022-10-10 09:27:16 -04:00
Tyler Goodlet 844f8beaa7 Add `timeframe` input to `kraken` history api 2022-10-10 09:27:16 -04:00
Tyler Goodlet ac7ba500be Pass back interal cancel scope from data reset task 2022-10-10 09:27:16 -04:00
Tyler Goodlet 3301619647 Temporarily disable error on pos size mismatch 2022-10-10 09:27:16 -04:00
Tyler Goodlet 7f498766af Pass in default history time of 1 min
Adjust all history query machinery to pass a `timeframe: int` in seconds
and set default of 60 (aka 1m) such that history views from here forward
will be 1m sampled OHLCV. Further when the tsdb is detected as up load
a full 10 years of data if possible on the 1m - backends will eventually
get a config section (`brokers.toml`) that allow user's to tune this.
2022-10-10 09:27:16 -04:00
Tyler Goodlet 9270391e01 Make `binance` history api accept a timeframe 2022-10-10 09:27:16 -04:00
Tyler Goodlet 0c061d8957 Make `marketstore` storage api timeframe aware
The `Store.load()`, `.read_ohlcv()` and `.write_ohlcv()` and
`.delete_ts()` now can take a `timeframe: Optional[float]` param which
is used to look up the appropriate sampling period table-key from
`marketstore`.
2022-10-10 09:27:16 -04:00
Tyler Goodlet 87f7a03dbe Make history routines `timeframe` aware
Allow data feed sub-system to specify the timeframe (aka OHLC sample
period) to the `open_history_client()` delivered history fetching API.
Factor the data keycombo hack into a new routine to be used also from
the history backfiller code when request latency increases; there is
a first draft at trying to use the feed reset to speed up 1m frame
throttling by timing out on the history frame response, but it needs
a lot of fine tuning.
2022-10-10 09:27:16 -04:00
Tyler Goodlet 1adf5fb9c0 Add 1m ohlc sample rate support to `Client.bars()`; frame query is 1 day 2022-10-10 09:27:16 -04:00
11 changed files with 830 additions and 773 deletions

View File

@ -195,9 +195,8 @@ async def open_piker_runtime(
) -> Optional[tractor._portal.Portal]:
'''
Start a piker actor who's runtime will automatically
sync with existing piker actors in local network
based on configuration.
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
'''
global _services

View File

@ -388,6 +388,7 @@ async def open_history_client(
async with open_cached_client('binance') as client:
async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,

View File

@ -43,6 +43,7 @@ from bidict import bidict
import trio
import tractor
from tractor import to_asyncio
import pendulum
import ib_insync as ibis
from ib_insync.contract import (
Contract,
@ -52,6 +53,7 @@ from ib_insync.contract import (
from ib_insync.order import Order
from ib_insync.ticker import Ticker
from ib_insync.objects import (
BarDataList,
Position,
Fill,
Execution,
@ -78,26 +80,11 @@ _time_units = {
'h': ' hours',
}
_time_frames = {
'1s': '1 Sec',
'5s': '5 Sec',
'30s': '30 Sec',
'1m': 'OneMinute',
'2m': 'TwoMinutes',
'3m': 'ThreeMinutes',
'4m': 'FourMinutes',
'5m': 'FiveMinutes',
'10m': 'TenMinutes',
'15m': 'FifteenMinutes',
'20m': 'TwentyMinutes',
'30m': 'HalfHour',
'1h': 'OneHour',
'2h': 'TwoHours',
'4h': 'FourHours',
'D': 'OneDay',
'W': 'OneWeek',
'M': 'OneMonth',
'Y': 'OneYear',
_bar_sizes = {
1: '1 Sec',
60: '1 min',
60*60: '1 hour',
24*60*60: '1 day',
}
_show_wap_in_history: bool = False
@ -199,7 +186,8 @@ _adhoc_futes_set = {
'lb.nymex', # random len lumber
# metals
'xauusd.cmdty', # gold spot
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'gc.nymex',
'mgc.nymex', # micro
@ -257,14 +245,12 @@ _exch_skip_list = {
'PSE',
}
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
_enters = 0
def bars_to_np(bars: list) -> np.ndarray:
'''
Convert a "bars list thing" (``BarsList`` type from ibis)
Convert a "bars list thing" (``BarDataList`` type from ibis)
into a numpy struct array.
'''
@ -284,6 +270,27 @@ def bars_to_np(bars: list) -> np.ndarray:
return nparr
# NOTE: pacing violations exist for higher sample rates:
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations
# Also see note on duration limits being lifted on 1m+ periods,
# but they say "use with discretion":
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
_samplings: dict[int, tuple[str, str]] = {
1: (
'1 secs',
f'{int(2e3)} S',
pendulum.duration(seconds=2e3),
),
# TODO: benchmark >1 D duration on query to see if
# throughput can be made faster during backfilling.
60: (
'1 min',
'1 D',
pendulum.duration(days=1),
),
}
class Client:
'''
IB wrapped for our broker backend API.
@ -338,19 +345,32 @@ class Client:
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str] = "",
sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
# ohlc sample period in seconds
sample_period_s: int = 1,
) -> list[dict[str, Any]]:
# optional "duration of time" equal to the
# length of the returned history frame.
duration: Optional[str] = None,
**kwargs,
) -> tuple[BarDataList, np.ndarray, pendulum.Duration]:
'''
Retreive OHLCV bars for a fqsn over a range to the present.
'''
# See API docs here:
# https://interactivebrokers.github.io/tws-api/historical_data.html
bars_kwargs = {'whatToShow': 'TRADES'}
bars_kwargs.update(kwargs)
bar_size, duration, dt_duration = _samplings[sample_period_s]
global _enters
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
print(f'REQUESTING BARS {_enters} @ end={end_dt}')
print(
f"REQUESTING {duration}'s worth {bar_size} BARS\n"
f'{_enters} @ end={end_dt}"'
)
if not end_dt:
end_dt = ''
@ -360,30 +380,20 @@ class Client:
contract = (await self.find_contracts(fqsn))[0]
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime=end_dt,
formatDate=2,
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',
barSizeSetting=bar_size,
# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count),
# barSizeSetting='5 secs',
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
durationStr=duration,
# always use extended hours
useRTH=False,
@ -394,11 +404,21 @@ class Client:
# whatToShow='TRADES',
)
if not bars:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {fqsn}?")
# NOTE: there's 2 cases here to handle (and this should be
# read alongside the implementation of
# ``.reqHistoricalDataAsync()``):
# - no data is returned for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
nparr = bars_to_np(bars)
return bars, nparr
return bars, nparr, dt_duration
async def con_deats(
self,
@ -463,7 +483,7 @@ class Client:
self,
pattern: str,
# how many contracts to search "up to"
upto: int = 6,
upto: int = 16,
asdicts: bool = True,
) -> dict[str, ContractDetails]:
@ -498,6 +518,16 @@ class Client:
exch = tract.exchange
if exch not in _exch_skip_list:
# try to lookup any contracts from our adhoc set
# since often the exchange/venue is named slightly
# different (eg. BRR.CMECRYPTO` instead of just
# `.CME`).
info = _adhoc_symbol_map.get(sym)
if info:
con_kwargs, bars_kwargs = info
exch = con_kwargs['exchange']
# try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = ibis.Future(
@ -1066,6 +1096,7 @@ async def load_aio_clients(
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 0.5,
disconnect_on_exit: bool = True,
) -> dict[str, Client]:
'''
@ -1207,6 +1238,7 @@ async def load_aio_clients(
finally:
# TODO: for re-scans we'll want to not teardown clients which
# are up and stable right?
if disconnect_on_exit:
for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect()

View File

@ -305,7 +305,7 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch
conf = get_config()
entries = trades_to_ledger_entries(
entries = api_trades_to_ledger_entries(
conf['accounts'].inverse,
trade_entries,
)
@ -362,7 +362,7 @@ async def update_and_audit_msgs(
# if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we
# thought.
if diff:
if diff and pikersize:
reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio
@ -372,6 +372,7 @@ async def update_and_audit_msgs(
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError(
# log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
@ -1122,18 +1123,16 @@ def norm_trade_records(
continue
# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
if not date:
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
flex_dtstr = record.get('dateTime')
else:
# epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
if dtstr or date:
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
@ -1182,41 +1181,29 @@ def norm_trade_records(
return {r.tid: r for r in records}
def trades_to_ledger_entries(
def parse_flex_dt(
record: str,
) -> pendulum.datetime:
date, ts = record.split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
def api_trades_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',
) -> dict:
'''
Convert either of API execution objects or flex report
entry objects into ``dict`` form, pretty much straight up
without modification.
Convert API execution objects entry objects into ``dict`` form,
pretty much straight up without modification except add
a `pydatetime` field from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
@ -1243,7 +1230,8 @@ def trades_to_ledger_entries(
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt)
entry['pydatetime'] = dt
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']]
if not tid:
@ -1262,6 +1250,73 @@ def trades_to_ledger_entries(
acctid, {}
)[tid] = entry
# sort entries in output by python based datetime
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
return trades_by_account
def flex_records_to_ledger_entries(
accounts: bidict,
trade_entries: list[object],
) -> dict:
'''
Convert flex report entry objects into ``dict`` form, pretty much
straight up without modification except add a `pydatetime` field
from the parsed timestamp.
'''
trades_by_account = {}
for t in trade_entries:
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
# probably a flex record with a wonky non-std timestamp..
dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime'])
entry['datetime'] = str(dt)
if not tid:
# this is likely some kind of internal adjustment
# transaction, likely one of the following:
# - an expiry event that will show a "book trade" indicating
# some adjustment to cash balances: zeroing or itm settle.
# - a manual cash balance position adjustment likely done by
# the user from the accounts window in TWS where they can
# manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
continue
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted(
trades_by_account[acctid].items(),
key=lambda entry: entry[1]['pydatetime'],
))
return trades_by_account
@ -1308,15 +1363,16 @@ def load_flex_trades(
ln = len(trade_entries)
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names
conf['accounts'].inverse,
trades_by_account = flex_records_to_ledger_entries(
conf['accounts'].inverse, # reverse map to user account names
trade_entries,
source_type='flex',
)
ledger_dict: Optional[dict] = None
for acctid in trades_by_account:
trades_by_id = trades_by_account[acctid]
with open_trade_ledger('ib', acctid) as ledger_dict:
tid_delta = set(trades_by_id) - set(ledger_dict)
log.info(
@ -1324,9 +1380,11 @@ def load_flex_trades(
f'{pformat(tid_delta)}'
)
if tid_delta:
ledger_dict.update(
{tid: trades_by_id[tid] for tid in tid_delta}
)
sorted_delta = dict(sorted(
{tid: trades_by_id[tid] for tid in tid_delta}.items(),
key=lambda entry: entry[1].pop('pydatetime'),
))
ledger_dict.update(sorted_delta)
return ledger_dict

View File

@ -22,6 +22,7 @@ import asyncio
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from datetime import datetime
from functools import partial
from math import isnan
import time
from typing import (
@ -38,7 +39,6 @@ import tractor
import trio
from trio_typing import TaskStatus
from piker.data._sharedmem import ShmArray
from .._util import SymbolNotFound, NoData
from .api import (
# _adhoc_futes_set,
@ -111,24 +111,54 @@ async def open_history_client(
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
'''
# TODO:
# - add logic to handle tradable hours and only grab
# valid bars in the range?
# - we want to avoid overrunning the underlying shm array buffer and
# we should probably calc the number of calls to make depending on
# that until we have the `marketstore` daemon in place in which case
# the shm size will be driven by user config and available sys
# memory.
async with open_data_client() as proxy:
max_timeout: float = 2.
mean: float = 0
count: int = 0
async def get_hist(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
out, fails = await get_bars(proxy, symbol, end_dt=end_dt)
query_start = time.time()
out, timedout = await get_bars(
proxy,
symbol,
timeframe,
end_dt=end_dt,
)
latency = time.time() - query_start
if (
not timedout
# and latency <= max_timeout
):
count += 1
mean += latency / count
print(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
if out is None:
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(
f'{end_dt}',
frame_size=2000,
# frame_size=2000,
)
bars, bars_array, first_dt, last_dt = out
@ -145,7 +175,7 @@ async def open_history_client(
# quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side?
yield get_hist, {'erlangs': 1, 'rate': 6}
yield get_hist, {'erlangs': 1, 'rate': 3}
_pacing: str = (
@ -154,102 +184,19 @@ _pacing: str = (
)
async def get_bars(
async def wait_on_data_reset(
proxy: MethodProxy,
fqsn: str,
reset_type: str = 'data',
timeout: float = 16,
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
) -> (dict, np.ndarray):
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
fails = 0
bars: Optional[list] = None
first_dt: datetime = None
last_dt: datetime = None
if end_dt:
last_dt = pendulum.from_timestamp(end_dt.timestamp())
for _ in range(10):
try:
out = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
)
if out:
bars, bars_array = out
else:
await tractor.breakpoint()
if bars_array is None:
raise SymbolNotFound(fqsn)
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
last_dt = pendulum.from_timestamp(
bars[-1].date.timestamp())
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
except RequestError as err:
msg = err.message
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
elif (
err.code == 162 and
'HMDS query returned no data' in err.message
):
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
# index since the upper layer may be doing so
# concurrently and we don't want to be delivering frames
# that weren't asked for.
log.warning(
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
raise NoData(
f'Symbol: {fqsn}',
frame_size=2000,
)
# elif (
# err.code == 162 and
# 'Trading TWS session is connected from a different IP
# address' in err.message
# ):
# log.warning("ignoring ip address warning")
# continue
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
@ -265,150 +212,238 @@ async def get_bars(
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
tries: int = 2
timeout: float = 10
# try 3 time with a data reset then fail over to
# a connection reset.
for i in range(1, tries):
done = trio.Event()
with trio.move_on_after(timeout) as cs:
task_status.started((cs, done))
log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data')
res = await data_reset_hack(reset_type=reset_type)
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
break
if cs.cancelled_caught:
fails += 1
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else:
log.warning('Sending CONNECTION RESET')
res = await data_reset_hack(reset_type='connection')
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
# break
done.set()
return False
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
# is all that useful here or not.
# - in theory you could wait on one of the ones above first
# to verify the reset request was sent?
# - we need the same for real-time quote feeds which can
# sometimes flake out and stop delivering..
for name, ev in [
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
done.set()
return True
if cs.cancelled_caught:
fails += 1
log.warning('Data CONNECTION RESET timeout!?')
if cs.cancel_called:
log.warning(
'Data reset task canceled?'
)
done.set()
return False
_data_resetter_task: trio.Task | None = None
async def get_bars(
proxy: MethodProxy,
fqsn: str,
timeframe: int,
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency..
timeout: float = 3, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> (dict, np.ndarray):
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
global _data_resetter_task
data_cs: trio.CancelScope | None = None
result: tuple[
ibis.objects.BarDataList,
np.ndarray,
datetime,
datetime,
] | None = None
result_ready = trio.Event()
async def query():
nonlocal result, data_cs, end_dt
while True:
try:
out = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
sample_period_s=timeframe,
# ideally we cancel the request just before we
# cancel on the ``trio``-side and trigger a data
# reset hack.. the problem is there's no way (with
# current impl) to detect a cancel case.
# timeout=timeout,
)
if out is None:
raise NoData(f'{end_dt}')
bars, bars_array, dt_duration = out
if not bars:
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
if bars_array is None:
raise SymbolNotFound(fqsn)
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
last_dt = pendulum.from_timestamp(
bars[-1].date.timestamp())
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived {first_dt} -> {last_dt}'
)
if data_cs:
data_cs.cancel()
result = (bars, bars_array, first_dt, last_dt)
# signal data reset loop parent task
result_ready.set()
return result
except RequestError as err:
msg = err.message
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
elif err.code == 162:
if 'HMDS query returned no data' in err.message:
# XXX: this is now done in the storage mgmt
# layer and we shouldn't implicitly decrement
# the frame dt index since the upper layer may
# be doing so concurrently and we don't want to
# be delivering frames that weren't asked for.
log.warning(
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back
# end_dt = end_dt.subtract(seconds=2000)
end_dt = end_dt.subtract(days=1)
print("SUBTRACTING DAY")
continue
elif 'API historical data query cancelled' in err.message:
log.warning(
'Query cancelled by IB (:eyeroll:):\n'
f'{err.message}'
)
continue
elif (
'Trading TWS session is connected from a different IP'
in err.message
):
log.warning("ignoring ip address warning")
continue
# XXX: more or less same as above timeout case
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
# cancel any existing reset task
if data_cs:
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
timeout=float('inf'),
reset_type='connection'
)
)
continue
else:
raise
return None, None
# else: # throttle wasn't fixed so error out immediately
# raise _err
# TODO: make this global across all history task/requests
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
async def backfill_bars(
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
while not result_ready.is_set():
fqsn: str,
shm: ShmArray, # type: ignore # noqa
with trio.move_on_after(timeout):
await result_ready.wait()
break
# TODO: we want to avoid overrunning the underlying shm array buffer
# and we should probably calc the number of calls to make depending
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
'''
# last_dt1 = None
last_dt = None
with trio.CancelScope() as cs:
async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, first_dt, last_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
last_dt = first_dt
# write historical data to buffer
shm.push(bars_array)
task_status.started(cs)
i = 0
while i < count:
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
if out is None:
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
# only grab valid bars in the range
log.error(f"Can't grab bars starting at {first_dt}!?!?")
# XXX: get_bars() should internally decrement dt by
# 2k seconds and try again.
if _data_resetter_task:
# don't double invoke the reset hack if another
# requester task already has it covered.
continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter = True
(first_bars, bars_array, first_dt, last_dt) = out
# last_dt1 = last_dt
# last_dt = first_dt
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
timeout=float('inf'),
)
)
# sync wait on reset to complete
await reset_done.wait()
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True)
i += 1
_data_resetter_task = None if unset_resetter else _data_resetter_task
return result, data_cs is not None
asset_type_map = {
@ -466,7 +501,9 @@ async def _setup_quote_stream(
to_trio.send_nowait(None)
async with load_aio_clients() as accts2clients:
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -512,10 +549,11 @@ async def _setup_quote_stream(
# Manually do the dereg ourselves.
teardown()
except trio.WouldBlock:
log.warning(
f'channel is blocking symbol feed for {symbol}?'
f'\n{to_trio.statistics}'
)
# log.warning(
# f'channel is blocking symbol feed for {symbol}?'
# f'\n{to_trio.statistics}'
# )
pass
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
@ -545,7 +583,8 @@ async def open_aio_quote_stream(
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer
# if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver(
from_aio,
2**6,
@ -736,17 +775,45 @@ async def stream_quotes(
await trio.sleep_forever()
return # we never expect feed to come up?
async with open_aio_quote_stream(
cs: Optional[trio.CancelScope] = None
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream:
) as stream,
):
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup = False
task_status.started((init_msgs, first_quote))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream):
if syminfo.get('no_vlm', False):
@ -754,29 +821,31 @@ async def stream_quotes(
# include vlm data.
atype = syminfo['asset_type']
log.info(
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
f'No-vlm {sym}@{atype}, skipping quote poll'
)
else:
# wait for real volume on feed (trading might be closed)
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
log.debug("Received first volume tick")
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
@ -904,7 +973,14 @@ async def open_symbol_search(
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
if (
not pattern
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
or len(pattern) < 1
):
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client

View File

@ -258,6 +258,7 @@ async def open_history_client(
queries: int = 0
async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,

View File

@ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir):
@click.pass_obj
def services(config, tl, names):
async def list_services():
from .._daemon import open_piker_runtime
async with tractor.get_arbiter(
async def list_services():
async with (
open_piker_runtime(
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr']
) as portal:
) as portal
):
registry = await portal.run_from_ns('self', 'get_registry')
json_d = {}
for key, socket in registry.items():
# name, uuid = uid
host, port = socket
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}")
tractor.run(
list_services,
name='service_query',
loglevel=config['loglevel'] if tl else None,
arbiter_addr=_tractor_kwargs['arbiter_addr'],
)
trio.run(list_services)
def _load_clis() -> None:

View File

@ -21,16 +21,19 @@ This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager
from dataclasses import (
dataclass,
field,
)
from datetime import datetime
from functools import partial
from pprint import pformat
from types import ModuleType
from typing import (
Any,
AsyncIterator, Optional,
Generator,
AsyncIterator,
Callable,
Optional,
Awaitable,
TYPE_CHECKING,
Union,
@ -39,7 +42,6 @@ from typing import (
import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import trimeter
import tractor
from tractor.trionics import maybe_open_context
import pendulum
@ -252,6 +254,7 @@ async def start_backfill(
mod: ModuleType,
bfqsn: str,
shm: ShmArray,
timeframe: float,
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
@ -262,11 +265,19 @@ async def start_backfill(
) -> int:
hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(bfqsn) as (hist, config):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt=None)
array, start_dt, end_dt = await hist(
timeframe,
end_dt=None,
)
times = array['time']
@ -289,6 +300,9 @@ async def start_backfill(
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push)
# TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn..
# otherwise all fsps get reset on every chart..
for delay_s in sampler.subscribers:
await broadcast(delay_s)
@ -304,8 +318,8 @@ async def start_backfill(
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun '
'do dat brudder.'
f'timeframe of {step_size_s} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# when no tsdb "last datum" is provided, we just load
@ -319,96 +333,60 @@ async def start_backfill(
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
60: {'years': 2},
60: {'years': 6},
}
kwargs = periods[step_size_s]
last_tsdb_dt = start_dt.subtract(**kwargs)
# configure async query throttling
erlangs = config.get('erlangs', 1)
rate = config.get('rate', 1)
frames = {}
def iter_dts(start: datetime):
while True:
hist_period = pendulum.period(
start,
last_tsdb_dt,
)
dtrange = list(hist_period.range('seconds', frame_size_s))
log.debug(f'New datetime index:\n{pformat(dtrange)}')
for end_dt in dtrange:
log.info(f'Yielding next frame start {end_dt}')
start = yield end_dt
# if caller sends a new start date, reset to that
if start is not None:
log.warning(f'Resetting date range: {start}')
break
else:
# from while
return
# pull new history frames until we hit latest
# already in the tsdb or a max count.
count = 0
# NOTE: when gaps are detected in the retreived history (by
# comparisor of the end - start versus the expected "frame size"
# in seconds) we need a way to alert the async request code not
# to continue to query for data "within the gap". This var is
# set in such cases such that further requests in that period
# are discarded and further we reset the "datetimem query frame
# index" in such cases to avoid needless noop requests.
earliest_end_dt: Optional[datetime] = start_dt
async def get_ohlc_frame(
input_end_dt: datetime,
iter_dts_gen: Generator[datetime],
) -> np.ndarray:
nonlocal count, frames, earliest_end_dt, frame_size_s
count += 1
if input_end_dt > earliest_end_dt:
# if a request comes in for an inter-gap frame we
# discard it since likely this request is still
# lingering from before the reset of ``iter_dts()`` via
# ``.send()`` below.
log.info(f'Discarding request history ending @ {input_end_dt}')
# signals to ``trimeter`` loop to discard and
# ``continue`` in it's schedule loop.
return None
# rate = config.get('rate', 1)
# XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1)
# inline sequential loop where we simply pass the
# last retrieved start dt to the next request as
# it's end dt.
starts: set[datetime] = set()
while start_dt > last_tsdb_dt:
try:
log.info(
f'Requesting {step_size_s}s frame ending in {input_end_dt}'
f'Requesting {step_size_s}s frame ending in {start_dt}'
)
array, start_dt, end_dt = await hist(end_dt=input_end_dt)
array, next_start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if new
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp()
except NoData:
# XXX: unhandled history gap (shouldn't happen?)
log.warning(
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?'
f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
)
return None # discard signal
await tractor.breakpoint()
except DataUnavailable as duerr:
# broker is being a bish and we can't pull
# any more..
log.warning('backend halted on data deliver !?!?')
except DataUnavailable: # as duerr:
# broker is being a bish and we can't pull any more..
log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history!?'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
return duerr
return
diff = end_dt - start_dt
frame_time_diff_s = diff.seconds
@ -419,42 +397,12 @@ async def start_backfill(
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so indicate to the request loop that this gap is
# expected by both,
# - resetting the ``iter_dts()`` generator to start at
# the new start point delivered in this result
# - setting the non-locally scoped ``earliest_end_dt``
# to this new value so that the request loop doesn't
# get tripped up thinking there's an out of order
# request-result condition.
# so just report it to console for now.
log.warning(
f'History frame ending @ {end_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds'
)
# reset dtrange gen to new start point
try:
next_end = iter_dts_gen.send(start_dt)
log.info(
f'Reset frame index to start at {start_dt}\n'
f'Was at {next_end}'
)
# NOTE: manually set "earliest end datetime" index-value
# to avoid the request loop getting confused about
# new frames that are earlier in history - i.e. this
# **is not** the case of out-of-order frames from
# an async batch request.
earliest_end_dt = start_dt
except StopIteration:
# gen already terminated meaning we probably already
# exhausted it via frame requests.
log.info(
"Datetime index already exhausted, can't reset.."
)
to_push = diff_history(
array,
start_dt,
@ -464,152 +412,11 @@ async def start_backfill(
ln = len(to_push)
if ln:
log.info(f'{ln} bars for {start_dt} -> {end_dt}')
frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt)
return to_push, start_dt, end_dt
else:
log.warning(
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
)
return None
# initial dt index starts at the start of the first query result
idts = iter_dts(start_dt)
async with trimeter.amap(
partial(
get_ohlc_frame,
# we close in the ``iter_dt()`` gen in so we can send
# reset signals as needed for gap dection in the
# history.
iter_dts_gen=idts,
),
idts,
capture_outcome=True,
include_value=True,
# better technical names bruv...
max_at_once=erlangs,
max_per_second=rate,
) as outcomes:
# Then iterate over the return values, as they become available
# (i.e., not necessarily in the original order)
async for input_end_dt, outcome in outcomes:
try:
out = outcome.unwrap()
if out is None:
# skip signal
continue
elif isinstance(out, DataUnavailable):
# no data available case signal.. so just kill
# further requests and basically just stop
# trying...
break
except Exception:
log.exception('uhh trimeter bail')
raise
else:
to_push, start_dt, end_dt = out
if not len(to_push):
# diff returned no new data (i.e. we probablyl hit
# the ``last_tsdb_dt`` point).
# TODO: raise instead?
log.warning(f'No history for range {start_dt} -> {end_dt}')
continue
# pipeline-style pull frames until we need to wait for
# the next in order to arrive.
# i = end_dts.index(input_end_dt)
# print(f'latest end_dt {end_dt} found at index {i}')
epochs = list(reversed(sorted(frames)))
for epoch in epochs:
start = shm.array['time'][0]
last_shm_prepend_dt = pendulum.from_timestamp(start)
earliest_frame_queue_dt = pendulum.from_timestamp(epoch)
diff = start - epoch
if diff < 0:
log.warning(
'Discarding out of order frame:\n'
f'{earliest_frame_queue_dt}'
)
frames.pop(epoch)
continue
if diff > step_size_s:
if earliest_end_dt < earliest_frame_queue_dt:
# XXX: an expected gap was encountered (see
# logic in ``get_ohlc_frame()``, so allow
# this frame through to the storage layer.
log.warning(
f'Expected history gap of {diff}s:\n'
f'{earliest_frame_queue_dt} <- '
f'{earliest_end_dt}'
)
elif (
erlangs > 1
):
# we don't yet have the next frame to push
# so break back to the async request loop
# while we wait for more async frame-results
# to arrive.
if len(frames) >= erlangs:
log.warning(
'Frame count in async-queue is greater '
'then erlangs?\n'
'There seems to be a gap between:\n'
f'{earliest_frame_queue_dt} <- '
f'{last_shm_prepend_dt}\n'
'Conducting manual call for frame ending: '
f'{last_shm_prepend_dt}'
)
(
to_push,
start_dt,
end_dt,
) = await get_ohlc_frame(
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
)
last_epoch = to_push['time'][-1]
diff = start - last_epoch
if diff > step_size_s:
await tractor.breakpoint()
raise DataUnavailable(
'An awkward frame was found:\n'
f'{start_dt} -> {end_dt}:\n{to_push}'
)
else:
frames[last_epoch] = (
to_push, start_dt, end_dt)
break
expect_end = pendulum.from_timestamp(start)
expect_start = expect_end.subtract(
seconds=frame_size_s)
log.warning(
'waiting on out-of-order history frame:\n'
f'{expect_end - expect_start}'
)
break
to_push, start_dt, end_dt = frames.pop(epoch)
ln = len(to_push)
# bail gracefully on shm allocation overrun/full condition
try:
@ -624,10 +431,6 @@ async def start_backfill(
f'Shm pushed {ln} frame:\n'
f'{start_dt} -> {end_dt}'
)
# keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async
# frame-result order.
earliest_end_dt = start_dt
if (
storage is not None
@ -640,10 +443,12 @@ async def start_backfill(
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
timeframe,
)
# TODO: can we only trigger this if the respective
# history in "in view"?!?
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
@ -651,7 +456,9 @@ async def start_backfill(
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# short-circuit (for now)
bf_done.set()
return
async def manage_history(
@ -660,6 +467,7 @@ async def manage_history(
fqsn: str,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
@ -683,10 +491,10 @@ async def manage_history(
readonly=False,
)
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt',
@ -698,10 +506,10 @@ async def manage_history(
readonly=False,
size=3*_secs_in_day,
)
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
log.info('Scanning for existing `marketstored`')
@ -726,7 +534,10 @@ async def manage_history(
# shm backfiller approach below.
# start history anal and load missing new data via backend.
series, _, last_tsdb_dt = await storage.load(fqsn)
series, _, last_tsdb_dt = await storage.load(
fqsn,
timeframe=timeframe,
)
broker, symbol, expiry = unpack_fqsn(fqsn)
(
@ -739,6 +550,7 @@ async def manage_history(
mod,
bfqsn,
hist_shm,
timeframe=timeframe,
last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True,
storage=storage,
@ -769,7 +581,6 @@ async def manage_history(
else:
dt_diff_s = 0
# await trio.sleep_forever()
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
@ -804,6 +615,7 @@ async def manage_history(
series = await storage.read_ohlcv(
fqsn,
end=end,
timeframe=timeframe,
)
history = list(series.values())
fastest = history[0]
@ -856,6 +668,7 @@ async def manage_history(
mod,
bfqsn,
hist_shm,
timeframe=timeframe,
)
)

View File

@ -387,6 +387,7 @@ class Storage:
async def load(
self,
fqsn: str,
timeframe: int,
) -> tuple[
dict[int, np.ndarray], # timeframe (in secs) to series
@ -400,12 +401,16 @@ class Storage:
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
timeframe=timeframe,
)
log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays:
fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch']
if len(tsdb_arrays):
# fastest = list(tsdb_arrays.values())[0]
# slowest = list(tsdb_arrays.values())[-1]
hist = tsdb_arrays[timeframe]
times = hist['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
@ -420,9 +425,9 @@ class Storage:
end: Optional[int] = None,
limit: int = int(800e3),
) -> tuple[
MarketstoreClient,
Union[dict, np.ndarray]
) -> dict[
int,
Union[dict, np.ndarray],
]:
client = self.client
syms = await client.list_symbols()
@ -430,7 +435,8 @@ class Storage:
if fqsn not in syms:
return {}
tfstr = tf_in_1s[1]
# use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
params = Params(
symbols=fqsn,
@ -463,39 +469,52 @@ class Storage:
return {}
else:
params.set('timeframe', tfstr)
try:
result = await client.query(params)
except purerpc.grpclib.exceptions.UnknownError:
# indicate there is no history for this timeframe
return {}
# Fill out a `numpy` array-results map keyed by timeframe
arrays = {}
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
arrays.setdefault(fqsn, {})[
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
return arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
fmt: str = 'OHLCV',
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
if key not in syms:
raise KeyError(f'`{key}` table key not found in\n{syms}?')
return await client.destroy(tbk=key)
tbk = mk_tbk((
key,
tf_in_1s.get(timeframe, tf_in_1s[60]),
fmt,
))
return await client.destroy(tbk=tbk)
async def write_ohlcv(
self,
fqsn: str,
ohlcv: np.ndarray,
timeframe: int,
append_and_duplicate: bool = True,
limit: int = int(800e3),
@ -525,7 +544,7 @@ class Storage:
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqsn}/1Sec/OHLCV',
tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
@ -577,6 +596,7 @@ class Storage:
# def delete_range(self, start_dt, end_dt) -> None:
# ...
@acm
async def open_storage_client(
fqsn: str,
@ -642,7 +662,7 @@ async def tsdb_history_update(
):
profiler(f'opened feed for {fqsn}')
to_append = feed.shm.array
to_append = feed.hist_shm.array
to_prepend = None
if fqsn:
@ -651,7 +671,7 @@ async def tsdb_history_update(
fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
ohlcv = feed.hist_shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)

View File

@ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends)
'''
from __future__ import annotations
from contextlib import contextmanager as cm
from pprint import pformat
import os
@ -138,13 +139,31 @@ class Position(Struct):
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
str | int, # trade id
dict[str, Any], # transaction history summaries
] = {}
first_clear_dt: Optional[datetime] = None
expiry: Optional[datetime] = None
# @property
# def clears(self) -> dict[
# Union[str, int, Status], # trade id
# dict[str, Any], # transaction history summaries
# ]:
# '''
# Datetime sorted reference to internal clears table.
# '''
# # self._clears = {}
# self._clears = dict(sorted(
# self._clears.items(),
# key=lambda entry: entry[1]['dt'],
# ))
# # self._clears[k] = v
# return self._clears
def to_dict(self) -> dict:
return {
f: getattr(self, f)
@ -219,6 +238,10 @@ class Position(Struct):
'''
clears = list(self.clears.values())
if not clears:
log.warning(f'No clears table for {self.symbol}!?')
return
self.first_clear_dt = min(list(entry['dt'] for entry in clears))
last_clear = clears[-1]
@ -623,6 +646,7 @@ class PpTable(Struct):
def to_toml(
self,
min_clears: bool = True,
) -> dict[str, Any]:
active, closed = self.dump_active()
@ -635,7 +659,9 @@ class PpTable(Struct):
# keep the minimal amount of clears that make up this
# position since the last net-zero state.
if min_clears:
pos.minimize_clears()
pos.ensure_state()
# serialize to pre-toml form
@ -682,6 +708,8 @@ def load_pps_from_ledger(
brokername: str,
acctname: str,
table: Optional[PpTable] = None,
# post normalization filter on ledger entries to be processed
filter_by: Optional[list[dict]] = None,
@ -698,7 +726,6 @@ def load_pps_from_ledger(
'''
with (
open_trade_ledger(brokername, acctname) as ledger,
open_pps(brokername, acctname) as table,
):
if not ledger:
# null case, no ledger file with content
@ -716,6 +743,10 @@ def load_pps_from_ledger(
else:
records = src_records
if table is None:
with open_pps(brokername, acctname) as table:
updated = table.update_from_trans(records)
else:
updated = table.update_from_trans(records)
return records, updated
@ -886,15 +917,27 @@ def open_pps(
conf=conf,
)
# first pass populate all missing clears record tables
# for fqsn, entry in pps.items():
# # convert clears sub-tables (only in this form
# # for toml re-presentation) back into a master table.
# clears_list = entry.get('clears', [])
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# filter_by=[entry['bsuid']],
# table=table,
# )
# # breakpoint()
# unmarshal/load ``pps.toml`` config entries into object form
# and update `PpTable` obj entries.
for fqsn, entry in pps.items():
bsuid = entry['bsuid']
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our
# ``pps.toml``).
@ -906,6 +949,18 @@ def open_pps(
# processing of new clear events.
trans: list[Transaction] = []
# convert clears sub-tables (only in this form
# for toml re-presentation) back into a master table.
clears_list = entry['clears']
# # attempt to reload from ledger
# if not clears_list:
# trans, pos = load_pps_from_ledger(
# brokername,
# acctid,
# table=table,
# )
for clears_table in clears_list:
tid = clears_table.pop('tid')
dtstr = clears_table['dt']

View File

@ -249,14 +249,14 @@ async def graphics_update_loop(
linked: LinkedSplits = godwidget.rt_linked
display_rate = godwidget.window.current_screen().refreshRate()
chart = linked.chart
fast_chart = linked.chart
hist_chart = godwidget.hist_linked.chart
ohlcv = feed.rt_shm
hist_ohlcv = feed.hist_shm
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky = fast_chart._ysticks[fast_chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
@ -268,7 +268,7 @@ async def graphics_update_loop(
maxmin = partial(
chart_maxmin,
chart,
fast_chart,
ohlcv,
vlm_chart,
)
@ -282,15 +282,15 @@ async def graphics_update_loop(
last, volume = ohlcv.array[-1][['close', 'volume']]
symbol = chart.linked.symbol
symbol = fast_chart.linked.symbol
l1 = L1Labels(
chart,
fast_chart,
# determine precision/decimal lengths
digits=symbol.tick_size_digits,
size_digits=symbol.lot_size_digits,
)
chart._l1_labels = l1
fast_chart._l1_labels = l1
# TODO:
# - in theory we should be able to read buffer data faster
@ -300,10 +300,10 @@ async def graphics_update_loop(
# levels this might be dark volume we need to
# present differently -> likely dark vlm
tick_size = chart.linked.symbol.tick_size
tick_size = fast_chart.linked.symbol.tick_size
tick_margin = 3 * tick_size
chart.show()
fast_chart.show()
last_quote = time.time()
i_last = ohlcv.index
@ -313,7 +313,7 @@ async def graphics_update_loop(
'maxmin': maxmin,
'ohlcv': ohlcv,
'hist_ohlcv': hist_ohlcv,
'chart': chart,
'chart': fast_chart,
'last_price_sticky': last_price_sticky,
'hist_last_price_sticky': hist_last_price_sticky,
'l1': l1,
@ -333,7 +333,7 @@ async def graphics_update_loop(
ds.vlm_chart = vlm_chart
ds.vlm_sticky = vlm_sticky
chart.default_view()
fast_chart.default_view()
# TODO: probably factor this into some kinda `DisplayState`
# API that can be reused at least in terms of pulling view
@ -410,16 +410,16 @@ async def graphics_update_loop(
last_quote = time.time()
# chart isn't active/shown so skip render cycle and pause feed(s)
if chart.linked.isHidden():
if fast_chart.linked.isHidden():
# print('skipping update')
chart.pause_all_feeds()
fast_chart.pause_all_feeds()
continue
ic = chart.view._ic
if ic:
chart.pause_all_feeds()
await ic.wait()
chart.resume_all_feeds()
# ic = fast_chart.view._ic
# if ic:
# fast_chart.pause_all_feeds()
# await ic.wait()
# fast_chart.resume_all_feeds()
# sync call to update all graphics/UX components.
graphics_update_cycle(ds)
@ -502,6 +502,7 @@ def graphics_update_cycle(
or trigger_all
):
chart.increment_view(steps=i_diff)
chart.view._set_yrange(yrange=(mn, mx))
if vlm_chart:
vlm_chart.increment_view(steps=i_diff)