Compare commits

..

No commits in common. "1f9a4976378f863193572a0b64a5dba5e4aaf672" and "f274c3db3b5cc00abee8cec84ef98bd564e437d2" have entirely different histories.

10 changed files with 472 additions and 929 deletions

View File

@ -401,15 +401,7 @@ class Client:
# => we recursively call this method until we get at least # => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the # as many bars such that they sum in aggregate to the the
# desired total time (duration) at most. # desired total time (duration) at most.
# XXX XXX XXX if end_dt:
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
if (
end_dt
and False
):
nparr: np.ndarray = bars_to_np(bars) nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time'] times: np.ndarray = nparr['time']
first: float = times[0] first: float = times[0]
@ -418,7 +410,6 @@ class Client:
if ( if (
# len(bars) * sample_period_s) < dt_duration.in_seconds() # len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds() tdiff < dt_duration.in_seconds()
# and False
): ):
end_dt: DateTime = from_timestamp(first) end_dt: DateTime = from_timestamp(first)
log.warning( log.warning(
@ -868,9 +859,6 @@ class Client:
timeout=timeout, timeout=timeout,
) )
except TimeoutError: except TimeoutError:
import pdbp
pdbp.set_trace()
if raise_on_timeout: if raise_on_timeout:
raise raise
return None return None

View File

@ -174,15 +174,8 @@ async def open_history_client(
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[np.ndarray, str]: ) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count nonlocal max_timeout, mean, count
if (
start_dt
and start_dt.timestamp() == 0
):
await tractor.pause()
query_start = time.time() query_start = time.time()
out, timedout = await get_bars( out, timedout = await get_bars(
proxy, proxy,
@ -410,54 +403,34 @@ async def get_bars(
bars, bars_array, dt_duration = out bars, bars_array, dt_duration = out
if bars_array is None:
raise SymbolNotFound(fqme)
# not enough bars signal, likely due to venue # not enough bars signal, likely due to venue
# operational gaps. # operational gaps.
# too_little: bool = False too_little: bool = False
if end_dt: if (
if not bars: end_dt
# no data returned? and (
log.warning( not bars
'History frame is blank?\n' or (too_little :=
f'start_dt: {start_dt}\n' start_dt
f'end_dt: {end_dt}\n' and (len(bars) * timeframe)
f'duration: {dt_duration}\n' < dt_duration.in_seconds()
) )
raise NoData(f'{end_dt}') )
):
if (
end_dt
or too_little
):
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
else: raise NoData(f'{end_dt}')
dur_s: float = len(bars) * timeframe
bars_dur = pendulum.Duration(seconds=dur_s) if bars_array is None:
dt_dur_s: float = dt_duration.in_seconds() raise SymbolNotFound(fqme)
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
first_dt = pendulum.from_timestamp( first_dt = pendulum.from_timestamp(
bars[0].date.timestamp()) bars[0].date.timestamp())
@ -881,13 +854,7 @@ async def stream_quotes(
init_msgs.append(init_msg) init_msgs.append(init_msg)
con: Contract = details.contract con: Contract = details.contract
first_ticker: Ticker | None = None first_ticker: Ticker = await proxy.get_quote(contract=con)
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
if first_ticker: if first_ticker:
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info( log.info(
@ -895,6 +862,18 @@ async def stream_quotes(
f'{pformat(first_quote)}' f'{pformat(first_quote)}'
) )
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
# NOTE: it might be outside regular trading hours for # NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we # assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset # only "pretend the feed is live" when the dst asset
@ -905,8 +884,6 @@ async def stream_quotes(
# (equitiies, futes, bonds etc.) we at least try to # (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history. # grab the OHLC history.
if ( if (
first_ticker
and
isnan(first_ticker.last) isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY # SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known # "pretend to do" `feed_is_live.set()` if it's a known
@ -930,19 +907,6 @@ async def stream_quotes(
await trio.sleep_forever() await trio.sleep_forever()
return # we never expect feed to come up? return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# XXX: MUST acquire a ticker + first quote before starting
# the live quotes loop!
# with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
cs: trio.CancelScope | None = None cs: trio.CancelScope | None = None
startup: bool = True startup: bool = True
while ( while (

View File

@ -41,6 +41,7 @@ from typing import (
import wsproto import wsproto
from uuid import uuid4 from uuid import uuid4
from rapidfuzz import process as fuzzy
from trio_typing import TaskStatus from trio_typing import TaskStatus
import asks import asks
from bidict import bidict from bidict import bidict
@ -415,7 +416,8 @@ class Client:
await self.get_mkt_pairs() await self.get_mkt_pairs()
assert self._pairs, '`Client.get_mkt_pairs()` was never called!?' assert self._pairs, '`Client.get_mkt_pairs()` was never called!?'
matches: dict[str, KucoinMktPair] = match_from_pairs(
matches: dict[str, Pair] = match_from_pairs(
pairs=self._pairs, pairs=self._pairs,
# query=pattern.upper(), # query=pattern.upper(),
query=pattern.upper(), query=pattern.upper(),

View File

@ -31,8 +31,6 @@ from pathlib import Path
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Sequence,
Hashable,
TYPE_CHECKING, TYPE_CHECKING,
) )
from types import ModuleType from types import ModuleType
@ -130,8 +128,8 @@ class SymbologyCache(Struct):
- `.get_mkt_pairs()`: returning a table of pair-`Struct` - `.get_mkt_pairs()`: returning a table of pair-`Struct`
types, custom defined by the particular backend. types, custom defined by the particular backend.
AND, the required `.get_mkt_info()` module-level endpoint AND, the required `.get_mkt_info()` module-level endpoint which
which maps `fqme: str` -> `MktPair`s. maps `fqme: str` -> `MktPair`s.
These tables are then used to fill out the `.assets`, `.pairs` and These tables are then used to fill out the `.assets`, `.pairs` and
`.mktmaps` tables on this cache instance, respectively. `.mktmaps` tables on this cache instance, respectively.
@ -502,7 +500,7 @@ def match_from_pairs(
) )
# pop and repack pairs in output dict # pop and repack pairs in output dict
matched_pairs: dict[str, Struct] = {} matched_pairs: dict[str, Pair] = {}
for item in matches: for item in matches:
pair_key: str = item[0] pair_key: str = item[0]
matched_pairs[pair_key] = pairs[pair_key] matched_pairs[pair_key] = pairs[pair_key]

View File

@ -16,26 +16,19 @@
# <https://www.gnu.org/licenses/>. # <https://www.gnu.org/licenses/>.
''' '''
Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for, Historical data business logic for load, backfill and tsdb storage.
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
orchestration and mgmt of tsdb data sets.
- core data-provider history backfilling middleware (as task-funcs) via
(what will eventually be `datad`, but are rn is the) `.brokers` backend
APIs.
- various data set cleaning, repairing and issue-detection/analysis
routines to ensure consistent series whether in shm or when
stored offline (in a tsdb).
''' '''
from __future__ import annotations from __future__ import annotations
# from collections import (
# Counter,
# )
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from pathlib import Path # import time
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Callable, Callable,
Generator,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -43,7 +36,6 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
DateTime,
Duration, Duration,
from_timestamp, from_timestamp,
) )
@ -64,14 +56,7 @@ from ._source import def_iohlcv_fields
from ._sampling import ( from ._sampling import (
open_sample_stream, open_sample_stream,
) )
from .tsp import ( from . import tsp
dedupe,
get_null_segs,
iter_null_segs,
sort_diff,
Frame,
# Seq,
)
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
@ -125,7 +110,6 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()] return array[times >= prepend_until_dt.timestamp()]
# TODO: can't we just make this a sync func now?
async def shm_push_in_between( async def shm_push_in_between(
shm: ShmArray, shm: ShmArray,
to_push: np.ndarray, to_push: np.ndarray,
@ -134,10 +118,6 @@ async def shm_push_in_between(
update_start_on_prepend: bool = False, update_start_on_prepend: bool = False,
) -> int: ) -> int:
# XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
shm.push( shm.push(
to_push, to_push,
prepend=True, prepend=True,
@ -158,102 +138,10 @@ async def shm_push_in_between(
else None else None
), ),
) )
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
async def maybe_fill_null_segments( # values while we're pipelining the current ones to
shm: ShmArray, # memory...
timeframe: float,
get_hist: Callable,
sampler_stream: tractor.MsgStream,
mkt: MktPair,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
) -> list[Frame]:
null_segs_detected = trio.Event()
task_status.started(null_segs_detected)
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
for (
absi_start, absi_end,
fi_start, fi_end,
start_t, end_t,
start_dt, end_dt,
) in iter_null_segs(
null_segs=null_segs,
frame=frame,
timeframe=timeframe,
):
# XXX NOTE: ?if we get a badly ordered timestamp
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
):
await tractor.pause()
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(absi_end, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=absi_end,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
null_segs_detected.set()
# RECHECK for more null-gaps
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if (
null_segs
and
len(null_segs[-1])
):
await tractor.pause()
array = shm.array array = shm.array
zeros = array[array['low'] == 0] zeros = array[array['low'] == 0]
@ -267,24 +155,10 @@ async def maybe_fill_null_segments(
'low', 'low',
'close', 'close',
]] = shm._array[zeros['index'][0] - 1]['close'] ]] = shm._array[zeros['index'][0] - 1]['close']
# await tractor.pause()
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# if (
# next_end_dt not in frame[
# ):
# pass
async def start_backfill( async def start_backfill(
tn: trio.Nursery,
get_hist, get_hist,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
@ -339,20 +213,27 @@ async def start_backfill(
backfill_until_dt = backfill_from_dt.subtract(**period_duration) backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# STAGE NOTE: "backward history gap filling": # TODO: can we drop this? without conc i don't think this
# - we push to the shm buffer until we have history back # is necessary any more?
# until the latest entry loaded from the tsdb's table B) # configure async query throttling
# - after this loop continue to check for other gaps in the # rate = config.get('rate', 1)
# (tsdb) history and (at least report) maybe fill them # XXX: legacy from ``trimeter`` code but unsupported now.
# from new frame queries to the backend? # erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts and associated counts of how many duplicates we see
# per time stamp.
# starts: Counter[datetime] = Counter()
# conduct "backward history gap filling" where we push to
# the shm buffer until we have history back until the
# latest entry loaded from the tsdb's table B)
last_start_dt: datetime = backfill_from_dt last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index next_prepend_index: int = backfill_from_shm_index
while last_start_dt > backfill_until_dt: while last_start_dt > backfill_until_dt:
log.info(
f'Requesting {timeframe}s frame:\n' log.debug(
f'backfill_until_dt: {backfill_until_dt}\n' f'Requesting {timeframe}s frame ending in {last_start_dt}'
f'last_start_dt: {last_start_dt}\n'
) )
try: try:
@ -380,18 +261,36 @@ async def start_backfill(
await tractor.pause() await tractor.pause()
return return
assert ( # TODO: drop this? see todo above..
array['time'][0] # if (
== # next_start_dt in starts
next_start_dt.timestamp() # and starts[next_start_dt] <= 6
) # ):
# start_dt = min(starts)
# log.warning(
# f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}"
# )
# starts[start_dt] += 1
# await tractor.pause()
# continue
# elif starts[next_start_dt] > 6:
# log.warning(
# f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
# )
# return
# # only update new start point if not-yet-seen
# starts[next_start_dt] += 1
assert array['time'][0] == next_start_dt.timestamp()
diff = last_start_dt - next_start_dt diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe expected_frame_size_s = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s: if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
@ -399,10 +298,8 @@ async def start_backfill(
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
log.warning( log.warning(
'GAP DETECTED:\n' f'History frame ending @ {last_start_dt} appears to have a gap:\n'
f'last_start_dt: {last_start_dt}\n' f'{diff} ~= {frame_time_diff_s} seconds'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
) )
to_push = diff_history( to_push = diff_history(
@ -518,15 +415,74 @@ async def start_backfill(
gaps, gaps,
deduped, deduped,
diff, diff,
) = dedupe(df) ) = tsp.dedupe(df)
if diff: if diff:
sort_diff(df) tsp.sort_diff(df)
else: else:
# finally filled gap # finally filled gap
log.info( log.info(
f'Finished filling gap to tsdb start @ {backfill_until_dt}!' f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
) )
# conduct tsdb timestamp gap detection and backfill any
# seemingly missing sequence segments..
# TODO: ideally these never exist but somehow it seems
# sometimes we're writing zero-ed segments on certain
# (teardown) cases?
from .tsp import detect_null_time_gap
gap_indices: tuple | None = detect_null_time_gap(shm)
while gap_indices:
(
istart,
start,
end,
iend,
) = gap_indices
start_dt = from_timestamp(start)
end_dt = from_timestamp(end)
# if we get a baddly ordered timestamp
# pair, imeeditately stop backfilling.
if end_dt < start_dt:
break
(
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe,
start_dt=start_dt,
end_dt=end_dt,
)
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
# and mnq.cme.ib this causes a Qt crash XXDDD
# make sure we don't overrun the buffer start
len_to_push: int = min(iend, array.size)
to_push: np.ndarray = array[-len_to_push:]
await shm_push_in_between(
shm,
to_push,
prepend_index=iend,
update_start_on_prepend=False,
)
# TODO: UI side needs IPC event to update..
# - make sure the UI actually always handles
# this update!
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
'backfilling': (mkt.fqme, timeframe),
},
})
gap_indices: tuple | None = detect_null_time_gap(shm)
# XXX: extremely important, there can be no checkpoints # XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames`` # in the block above to avoid entering new ``frames``
@ -538,12 +494,6 @@ async def start_backfill(
bf_done.set() bf_done.set()
# NOTE: originally this was used to cope with a tsdb (marketstore)
# which could not delivery very large frames of history over gRPC
# (thanks goolag) due to corruption issues. NOW, using apache
# parquet (by default in the local filesys) we don't have this
# requirement since the files can be loaded very quickly in
# entirety to memory via
async def back_load_from_tsdb( async def back_load_from_tsdb(
storemod: ModuleType, storemod: ModuleType,
storage: StorageClient, storage: StorageClient,
@ -681,94 +631,10 @@ async def back_load_from_tsdb(
# await sampler_stream.send('broadcast_all') # await sampler_stream.send('broadcast_all')
async def push_latest_frame(
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
],
timeframe: float,
config: dict,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
end_dt=None,
)
# so caller can access these ep values
dt_eps.extend([
mr_start_dt,
mr_end_dt,
])
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
if timeframe > 1:
await tractor.pause()
return
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
) -> tuple[
np.ndarray,
DateTime,
DateTime,
] | None:
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
tsdb_entry: tuple[
np.ndarray,
DateTime,
DateTime,
]
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
return tsdb_entry
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
return None
async def tsdb_backfill( async def tsdb_backfill(
mod: ModuleType, mod: ModuleType,
storemod: ModuleType, storemod: ModuleType,
tn: trio.Nursery,
storage: StorageClient, storage: StorageClient,
mkt: MktPair, mkt: MktPair,
@ -783,193 +649,192 @@ async def tsdb_backfill(
) -> None: ) -> None:
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
get_hist: Callable[ get_hist: Callable[
[int, datetime, datetime], [int, datetime, datetime],
tuple[np.ndarray, str] tuple[np.ndarray, str]
] ]
config: dict[str, int] config: dict[str, int]
async with ( async with mod.open_history_client(
mod.open_history_client( mkt,
mkt, ) as (get_hist, config):
) as (get_hist, config),
# NOTE: this sub-nursery splits to tasks for the given
# sampling rate to concurrently load offline tsdb
# timeseries as well as new data from the venue backend!
):
log.info( log.info(
f'`{mod}` history client returned backfill config:\n' f'`{mod}` history client returned backfill config:\n'
f'{config}\n' f'{config}\n'
) )
dt_eps: list[DateTime, DateTime] = [] # get latest query's worth of history all the way
async with trio.open_nursery() as tn: # back to what is recorded in the tsdb
tn.start_soon( try:
push_latest_frame, (
dt_eps, array,
shm, mr_start_dt,
get_hist, mr_end_dt,
) = await get_hist(
timeframe, timeframe,
config, end_dt=None,
) )
# tell parent task to continue # XXX: timeframe not supported for backend (since
# TODO: really we'd want this the other way with the # above exception type), terminate immediately since
# tsdb load happening asap and the since the latest # there's no backfilling possible.
# frame query will normally be the main source of except DataUnavailable:
# latency?
task_status.started() task_status.started()
await tractor.pause()
return
tsdb_entry: tuple = await load_tsdb_hist( # TODO: fill in non-zero epoch time values ALWAYS!
storage, # hist_shm._array['time'] = np.arange(
mkt, # start=
timeframe,
)
# NOTE: iabs to start backfilling from, reverse chronological, # NOTE: removed for now since it'll always break
# ONLY AFTER the first history frame has been pushed to # on the first 60s of the venue open..
# mem! # times: np.ndarray = array['time']
# # sample period step size in seconds
# step_size_s = (
# from_timestamp(times[-1])
# - from_timestamp(times[-2])
# ).seconds
# if step_size_s not in (1, 60):
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
# step_size_s = (
# from_timestamp(times[-2])
# - from_timestamp(times[-3])
# ).seconds
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
backfill_gap_from_shm_index: int = shm._first.value + 1 backfill_gap_from_shm_index: int = shm._first.value + 1
( # tell parent task to continue
mr_start_dt, task_status.started()
mr_end_dt,
) = dt_eps
async with trio.open_nursery() as tn: # loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
last_tsdb_dt: datetime | None = None
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
else:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# Prepend any tsdb history to the shm buffer which should # calc the index from which the tsdb data should be
# now be full of the most recent history pulled from the # prepended, presuming there is a gap between the
# backend's last frame. # latest frame (loaded/read above) and the latest
if tsdb_entry: # sample loaded from the tsdb.
( backfill_diff: Duration = mr_start_dt - last_tsdb_dt
tsdb_history, offset_s: float = backfill_diff.in_seconds()
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# if there is a gap to backfill from the first # XXX EDGE CASEs: the most recent frame overlaps with
# history frame until the last datum loaded from the tsdb # prior tsdb history!!
# continue that now in the background # - so the latest frame's start time is earlier then
bf_done = await tn.start( # the tsdb's latest sample.
partial( # - alternatively this may also more generally occur
start_backfill, # when the venue was closed (say over the weeknd)
tn=tn, # causing a timeseries gap, AND the query frames size
get_hist=get_hist, # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
mod=mod, # GREATER THAN the current venue-market's operating
mkt=mkt, # session (time) we will receive datums from BEFORE THE
shm=shm, # CLOSURE GAP and thus the `offset_s` value will be
timeframe=timeframe, # NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
backfill_from_shm_index=backfill_gap_from_shm_index, offset_samples: int = round(offset_s / timeframe)
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream, # TODO: see if there's faster multi-field reads:
backfill_until_dt=last_tsdb_dt, # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
storage=storage, # re-index with a `time` and index field
write_tsdb=True, if offset_s > 0:
) # NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
) )
# calc the index from which the tsdb data should be log.info(f'Loaded {to_push.shape} datums from storage')
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
))
# TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history # TODO: maybe start history anal and load missing "history
# gaps" via backend.. # gaps" via backend..
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
# if len(hist_shm.array) < 2: # if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last # TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and # frame before market close (at least on ib) was pushed and
@ -983,29 +848,32 @@ async def tsdb_backfill(
# backload any further data from tsdb (concurrently per # backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory) # timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above. # from the ``StorageClient.load()`` call above.
await trio.sleep_forever() try:
await trio.sleep_forever()
finally:
return
# XXX NOTE: this is legacy from when we were using # XXX NOTE: this is legacy from when we were using
# marketstore and we needed to continue backloading # marketstore and we needed to continue backloading
# incrementally from the tsdb client.. (bc it couldn't # incrementally from the tsdb client.. (bc it couldn't
# handle a single large query with gRPC for some # handle a single large query with gRPC for some
# reason.. classic goolag pos) # reason.. classic goolag pos)
# tn.start_soon( tn.start_soon(
# back_load_from_tsdb, back_load_from_tsdb,
# storemod, storemod,
# storage, storage,
# fqme, fqme,
# tsdb_history, tsdb_history,
# last_tsdb_dt, last_tsdb_dt,
# mr_start_dt, mr_start_dt,
# mr_end_dt, mr_end_dt,
# bf_done, bf_done,
# timeframe, timeframe,
# shm, shm,
# ) )
async def manage_history( async def manage_history(
@ -1113,11 +981,6 @@ async def manage_history(
async with ( async with (
storage.open_storage_client() as (storemod, client), storage.open_storage_client() as (storemod, client),
# NOTE: this nursery spawns a task per "timeframe" (aka
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
log.info( log.info(
@ -1167,6 +1030,7 @@ async def manage_history(
tsdb_backfill, tsdb_backfill,
mod=mod, mod=mod,
storemod=storemod, storemod=storemod,
tn=tn,
# bus, # bus,
storage=client, storage=client,
mkt=mkt, mkt=mkt,
@ -1195,70 +1059,3 @@ async def manage_history(
# and thus a small RPC-prot for remotely controllinlg # and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing. # what data is loaded for viewing.
await trio.sleep_forever() await trio.sleep_forever()
def iter_dfs_from_shms(
fqme: str
) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)

View File

@ -29,19 +29,12 @@ from math import (
floor, floor,
) )
import time import time
from typing import ( from typing import Literal
Literal,
# AsyncGenerator,
Generator,
)
import numpy as np import numpy as np
import polars as pl import polars as pl
from pendulum import (
DateTime,
from_timestamp,
)
from ._sharedmem import ShmArray
from ..toolz.profile import ( from ..toolz.profile import (
Profiler, Profiler,
pg_profile_enabled, pg_profile_enabled,
@ -60,14 +53,6 @@ get_console_log = partial(
name=subsys, name=subsys,
) )
# NOTE: union type-defs to handle generic `numpy` and `polars` types
# side-by-side Bo
# |_ TODO: schema spec typing?
# -[ ] nptyping!
# -[ ] wtv we can with polars?
Frame = pl.DataFrame | np.ndarray
Seq = pl.Series | np.ndarray
def slice_from_time( def slice_from_time(
arr: np.ndarray, arr: np.ndarray,
@ -224,282 +209,51 @@ def slice_from_time(
return read_slc return read_slc
def get_null_segs( def detect_null_time_gap(
frame: Frame, shm: ShmArray,
period: float, # sampling step in seconds
imargin: int = 1, imargin: int = 1,
col: str = 'time',
) -> tuple[ ) -> tuple[float, float] | None:
# Seq, # TODO: can we make it an array-type instead?
list[
list[int, int],
],
Seq,
Frame
] | None:
''' '''
Detect if there are any zero(-epoch stamped) valued Detect if there are any zero-epoch stamped rows in
rows in for the provided `col: str` column; by default the presumed 'time' field-column.
presume the 'time' field/column.
Filter to all such zero (time) segments and return Filter to the gap and return a surrounding index range.
the corresponding frame zeroed segment's,
- gap absolute (in buffer terms) indices-endpoints as NOTE: for now presumes only ONE gap XD
`absi_zsegs`
- abs indices of all rows with zeroed `col` values as `absi_zeros`
- the corresponding frame's row-entries (view) which are
zeroed for the `col` as `zero_t`
''' '''
times: Seq = frame['time'] # ensure we read buffer state only once so that ShmArray rt
zero_pred: Seq = (times == 0) # circular-buffer updates don't cause a indexing/size mismatch.
array: np.ndarray = shm.array
if isinstance(frame, np.ndarray): zero_pred: np.ndarray = array['time'] == 0
tis_zeros: int = zero_pred.any() zero_t: np.ndarray = array[zero_pred]
else:
tis_zeros: int = zero_pred.any()
if not tis_zeros: if zero_t.size:
return None istart, iend = zero_t['index'][[0, -1]]
start, end = shm._array['time'][
# TODO: use ndarray for this?! [istart - imargin, iend + imargin]
absi_zsegs: list[list[int, int]] = [] ]
return (
if isinstance(frame, np.ndarray): istart - imargin,
# view of ONLY the zero segments as one continuous chunk start,
zero_t: np.ndarray = frame[zero_pred] end,
# abs indices of said zeroed rows iend + imargin,
absi_zeros = zero_t['index']
# diff of abs index steps between each zeroed row
absi_zdiff: np.ndarray = np.diff(absi_zeros)
# scan for all frame-indices where the
# zeroed-row-abs-index-step-diff is greater then the
# expected increment of 1.
# data 1st zero seg data zeros
# ---- ------------ ---- ----- ------ ----
# ||||..000000000000..||||..00000..||||||..0000
# ---- ------------ ---- ----- ------ ----
# ^zero_t[0] ^zero_t[-1]
# ^fi_zgaps[0] ^fi_zgaps[1]
# ^absi_zsegs[0][0] ^---^ => absi_zsegs[1]: tuple
# absi_zsegs[0][1]^
#
# NOTE: the first entry in `fi_zgaps` is where
# the first (absolute) index step diff is > 1.
# and it is a frame-relative index into `zero_t`.
fi_zgaps = np.argwhere(
absi_zdiff > 1
# NOTE: +1 here is ensure we index to the "start" of each
# segment (if we didn't the below loop needs to be
# re-written to expect `fi_end_rows`!
) + 1
# the rows from the contiguous zeroed segments which have
# abs-index steps >1 compared to the previous zero row
# (indicating an end of zeroed segment).
fi_zseg_start_rows = zero_t[fi_zgaps]
# TODO: equiv for pl.DataFrame case!
else:
izeros: pl.Series = zero_pred.arg_true()
zero_t: pl.DataFrame = frame[izeros]
absi_zeros = zero_t['index']
absi_zdiff: pl.Series = absi_zeros.diff()
fi_zgaps = (absi_zdiff > 1).arg_true()
# XXX: our goal (in this func) is to select out slice index
# pairs (zseg0_start, zseg_end) in abs index units for each
# null-segment portion detected throughout entire input frame.
# only up to one null-segment in entire frame?
num_gaps: int = fi_zgaps.size + 1
if num_gaps < 1:
if absi_zeros.size > 1:
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
max(
absi_zeros[0] - 1,
0,
),
# NOTE: need the + 1 to guarantee we index "up to"
# the next non-null row-datum.
min(
absi_zeros[-1] + 1,
frame['index'][-1],
),
]]
else:
# XXX EDGE CASE: only one null-datum found so
# mark the start abs index as None to trigger
# a full frame-len query to the respective backend?
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
None,
absi_zeros[0] + 1,
]]
# XXX NOTE XXX: if >= 2 zeroed segments are found, there should
# ALWAYS be more then one zero-segment-abs-index-step-diff row
# in `absi_zdiff`, so loop through all such
# abs-index-step-diffs >1 (i.e. the entries of `absi_zdiff`)
# and add them as the "end index" entries for each segment.
# Then, iif NOT iterating the first such segment end, look back
# for the prior segments zero-segment start indext by relative
# indexing the `zero_t` frame by -1 and grabbing the abs index
# of what should be the prior zero-segment abs start index.
else:
# NOTE: since `absi_zdiff` will never have a row
# corresponding to the first zero-segment's row, we add it
# manually here.
absi_zsegs.append([
absi_zeros[0] - 1,
None,
])
# TODO: can we do it with vec ops?
for i, (
fi, # frame index of zero-seg start
zseg_start_row, # full row for ^
) in enumerate(zip(
fi_zgaps,
fi_zseg_start_rows,
)):
assert (zseg_start_row == zero_t[fi]).all()
iabs: int = zseg_start_row['index'][0]
absi_zsegs.append([
iabs - 1,
None, # backfilled on next iter
])
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
# absi_pre_zseg = absi - 1
# final iter case, backfill FINAL end iabs!
if (i + 1) == fi_zgaps.size:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
# NOTE: only after the first segment (due to `.diff()`
# usage above) can we do a lookback to the prior
# segment's end row and determine it's abs index to
# retroactively insert to the prior
# `absi_zsegs[i-1][1]` entry Bo
last_end: int = absi_zsegs[i][1]
if last_end is None:
prev_zseg_row = zero_t[fi - 1]
absi_post_zseg = prev_zseg_row['index'][0] + 1
# XXX: MUST BACKFILL previous end iabs!
absi_zsegs[i][1] = absi_post_zseg
else:
if 0 < num_gaps < 2:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
iabs_first: int = frame['index'][0]
for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first]
if (
ts_start == 0
or
ts_end == 0
):
import pdbp
pdbp.set_trace()
assert end
assert start < end
log.warning(
f'Frame has {len(absi_zsegs)} NULL GAPS!?\n'
f'period: {period}\n'
f'total null samples: {len(zero_t)}\n'
)
return (
absi_zsegs, # [start, end] abs slice indices of seg
absi_zeros, # all abs indices within all null-segs
zero_t, # sliced-view of all null-segment rows-datums
)
def iter_null_segs(
timeframe: float,
frame: Frame | None = None,
null_segs: tuple | None = None,
) -> Generator[
tuple[
int, int,
int, int,
float, float,
float, float,
# Seq, # TODO: can we make it an array-type instead?
# list[
# list[int, int],
# ],
# Seq,
# Frame
],
None,
]:
if null_segs is None:
null_segs: tuple = get_null_segs(
frame,
period=timeframe,
) )
absi_pairs_zsegs: list[list[float, float]] return None
izeros: Seq
zero_t: Frame
(
absi_pairs_zsegs,
izeros,
zero_t,
) = null_segs
absi_first: int = frame[0]['index']
for (
absi_start,
absi_end,
) in absi_pairs_zsegs:
fi_end: int = absi_end - absi_first t_unit: Literal = Literal[
end_row: Seq = frame[fi_end] 'days',
end_t: float = end_row['time'] 'hours',
end_dt: DateTime = from_timestamp(end_t) 'minutes',
'seconds',
fi_start = None 'miliseconds',
start_row = None 'microseconds',
start_t = None 'nanoseconds',
start_dt = None ]
if (
absi_start is not None
and start_t != 0
):
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)
if absi_start < 0:
import pdbp
pdbp.set_trace()
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
)
def with_dts( def with_dts(
@ -538,17 +292,6 @@ def dedup_dt(
) )
t_unit: Literal = Literal[
'days',
'hours',
'minutes',
'seconds',
'miliseconds',
'microseconds',
'nanoseconds',
]
def detect_time_gaps( def detect_time_gaps(
df: pl.DataFrame, df: pl.DataFrame,
@ -663,6 +406,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
f'Gaps found:\n{gaps}\n' f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}' f'deduped Gaps found:\n{deduped_gaps}'
) )
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = detect_null_time_gap()
return ( return (
df, df,
gaps, gaps,
@ -681,19 +428,14 @@ def sort_diff(
list[int], # indices of segments that are out-of-order list[int], # indices of segments that are out-of-order
]: ]:
ser: pl.Series = src_df[col] ser: pl.Series = src_df[col]
sortd: pl.DataFrame = ser.sort()
diff: pl.Series = ser.diff()
diff: pl.Series = ser.diff()
sortd: pl.DataFrame = ser.sort()
sortd_diff: pl.Series = sortd.diff() sortd_diff: pl.Series = sortd.diff()
i_step_diff = (diff != sortd_diff).arg_true() i_step_diff = (diff != sortd_diff).arg_true()
frame_reorders: int = i_step_diff.len() if i_step_diff.len():
if frame_reorders: import pdbp
log.warn( pdbp.set_trace()
f'Resorted frame on col: {col}\n'
f'{frame_reorders}'
)
# import pdbp; pdbp.set_trace()
# NOTE: thanks to this SO answer for the below conversion routines # NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back: # to go from numpy struct-arrays to polars dataframes and back:

View File

@ -21,6 +21,8 @@ Storage middle-ware CLIs.
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
import time import time
from typing import Generator
# from typing import TYPE_CHECKING
import polars as pl import polars as pl
import numpy as np import numpy as np
@ -35,11 +37,14 @@ from piker.service import open_piker_runtime
from piker.cli import cli from piker.cli import cli
from piker.config import get_conf_dir from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray, ShmArray,
tsp, tsp,
) )
from piker.data.history import ( from piker.data.history import (
iter_dfs_from_shms, _default_hist_size,
_default_rt_size,
) )
from . import ( from . import (
log, log,
@ -185,13 +190,6 @@ def anal(
) )
assert first_dt < last_dt assert first_dt < last_dt
null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
if null_segs:
await tractor.pause()
shm_df: pl.DataFrame = await client.as_df( shm_df: pl.DataFrame = await client.as_df(
fqme, fqme,
period, period,
@ -206,7 +204,6 @@ def anal(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
if diff: if diff:
await client.write_ohlcv( await client.write_ohlcv(
fqme, fqme,
@ -222,6 +219,69 @@ def anal(
trio.run(main) trio.run(main)
def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
@ -247,8 +307,8 @@ def ldshm(
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
period_s: float = times[-1] - times[-2] secs: float = times[-1] - times[-2]
if period_s < 1.: if secs < 1.:
raise ValueError( raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
@ -263,22 +323,17 @@ def ldshm(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?
if not gaps.is_empty(): if (
await tractor.pause() not gaps.is_empty()
or secs > 2
if null_segs: ):
await tractor.pause() await tractor.pause()
# write to parquet file? # write to parquet file?
if write_parquet: if write_parquet:
timeframe: str = f'{period_s}s' timeframe: str = f'{secs}s'
datadir: Path = get_conf_dir() / 'nativedb' datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir(): if not datadir.is_dir():

View File

@ -342,6 +342,7 @@ class NativeStorageClient:
) )
return path return path
async def write_ohlcv( async def write_ohlcv(
self, self,
fqme: str, fqme: str,

View File

@ -207,10 +207,9 @@ class ContentsLabel(pg.LabelItem):
# this being "html" is the dumbest shit :eyeroll: # this being "html" is the dumbest shit :eyeroll:
self.setText( self.setText(
"<b>i_arr</b>:{index}<br/>" "<b>i</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via # NB: these fields must be indexed in the correct order via
# the slice syntax below. # the slice syntax below.
"<b>i_shm</b>:{}<br/>"
"<b>epoch</b>:{}<br/>" "<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>" "<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>" "<b>H</b>:{}<br/>"
@ -220,7 +219,6 @@ class ContentsLabel(pg.LabelItem):
# "<b>wap</b>:{}".format( # "<b>wap</b>:{}".format(
*array[ix][ *array[ix][
[ [
'index',
'time', 'time',
'open', 'open',
'high', 'high',

View File

@ -248,27 +248,25 @@ async def increment_history_view(
# - samplerd could emit the actual update range via # - samplerd could emit the actual update range via
# tuple and then we only enter the below block if that # tuple and then we only enter the below block if that
# range is detected as in-view? # range is detected as in-view?
match msg: if (
case { (bf_wut := msg.get('backfilling', False))
'backfilling': (viz_name, timeframe), ):
} if ( viz_name, timeframe = bf_wut
if (
viz_name == name viz_name == name
):
log.warning(
f'Forcing HARD REDRAW:\n'
f'name: {name}\n'
f'timeframe: {timeframe}\n'
)
# TODO: only allow this when the data is IN VIEW! # TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently # also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the # / smarter by only redrawing the portion of the
# path necessary? # path necessary?
{ and False
60: hist_viz, ):
1: viz, log.info(f'Forcing hard redraw -> {name}@{timeframe}')
}[timeframe].update_graphics( match timeframe:
force_redraw=True case 60:
) hist_viz.update_graphics(force_redraw=True)
case 1:
viz.update_graphics(force_redraw=True)
# check if slow chart needs an x-domain shift and/or # check if slow chart needs an x-domain shift and/or
# y-range resize. # y-range resize.