Compare commits
11 Commits
cd6bc105de
...
9ebb977731
| Author | SHA1 | Date |
|---|---|---|
|
|
9ebb977731 | |
|
|
56b69f97b3 | |
|
|
e2ff43f5c3 | |
|
|
0d76323a90 | |
|
|
176090b234 | |
|
|
f3530b2f6b | |
|
|
2d4d7cca57 | |
|
|
d147bfe8c4 | |
|
|
390382b83f | |
|
|
0a53e5cb0c | |
|
|
6e3add2f91 |
|
|
@ -108,7 +108,6 @@ class AggTrade(Struct, frozen=True):
|
||||||
m: bool # Is the buyer the market maker?
|
m: bool # Is the buyer the market maker?
|
||||||
M: bool|None = None # Ignore
|
M: bool|None = None # Ignore
|
||||||
nq: float|None = None # Normal quantity without the trades involving RPI orders
|
nq: float|None = None # Normal quantity without the trades involving RPI orders
|
||||||
# ^XXX https://developers.binance.com/docs/derivatives/change-log#2025-12-29
|
|
||||||
|
|
||||||
|
|
||||||
async def stream_messages(
|
async def stream_messages(
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,7 @@ from piker.brokers._util import (
|
||||||
)
|
)
|
||||||
from piker.storage import TimeseriesNotFound
|
from piker.storage import TimeseriesNotFound
|
||||||
from ._anal import (
|
from ._anal import (
|
||||||
|
|
||||||
dedupe,
|
dedupe,
|
||||||
get_null_segs,
|
get_null_segs,
|
||||||
iter_null_segs,
|
iter_null_segs,
|
||||||
|
|
@ -127,8 +128,7 @@ def diff_history(
|
||||||
# no diffing with tsdb dt index possible..
|
# no diffing with tsdb dt index possible..
|
||||||
if (
|
if (
|
||||||
prepend_until_dt is None
|
prepend_until_dt is None
|
||||||
and
|
and append_until_dt is None
|
||||||
append_until_dt is None
|
|
||||||
):
|
):
|
||||||
return array
|
return array
|
||||||
|
|
||||||
|
|
@ -140,26 +140,15 @@ def diff_history(
|
||||||
return array[times >= prepend_until_dt.timestamp()]
|
return array[times >= prepend_until_dt.timestamp()]
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: can't we just make this a sync func now?
|
||||||
async def shm_push_in_between(
|
async def shm_push_in_between(
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
to_push: np.ndarray,
|
to_push: np.ndarray,
|
||||||
prepend_index: int,
|
prepend_index: int,
|
||||||
backfill_until_dt: datetime,
|
|
||||||
|
|
||||||
update_start_on_prepend: bool = False,
|
update_start_on_prepend: bool = False,
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
|
|
||||||
# XXX, try to catch bad inserts by peeking at the first/last
|
|
||||||
# times and ensure we don't violate order.
|
|
||||||
f_times: np.ndarray = to_push['time']
|
|
||||||
f_start: float = f_times[0]
|
|
||||||
f_start_dt = from_timestamp(f_start)
|
|
||||||
if (
|
|
||||||
f_start_dt < backfill_until_dt
|
|
||||||
):
|
|
||||||
await tractor.pause()
|
|
||||||
|
|
||||||
# XXX: extremely important, there can be no checkpoints
|
# XXX: extremely important, there can be no checkpoints
|
||||||
# in the body of this func to avoid entering new ``frames``
|
# in the body of this func to avoid entering new ``frames``
|
||||||
# values while we're pipelining the current ones to
|
# values while we're pipelining the current ones to
|
||||||
|
|
@ -192,7 +181,6 @@ async def maybe_fill_null_segments(
|
||||||
get_hist: Callable,
|
get_hist: Callable,
|
||||||
sampler_stream: tractor.MsgStream,
|
sampler_stream: tractor.MsgStream,
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
backfill_until_dt: datetime,
|
|
||||||
|
|
||||||
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
|
@ -203,10 +191,6 @@ async def maybe_fill_null_segments(
|
||||||
|
|
||||||
frame: Frame = shm.array
|
frame: Frame = shm.array
|
||||||
|
|
||||||
# TODO, put in parent task/daemon root!
|
|
||||||
import greenback
|
|
||||||
await greenback.ensure_portal()
|
|
||||||
|
|
||||||
null_segs: tuple | None = get_null_segs(
|
null_segs: tuple | None = get_null_segs(
|
||||||
frame,
|
frame,
|
||||||
period=timeframe,
|
period=timeframe,
|
||||||
|
|
@ -253,7 +237,6 @@ async def maybe_fill_null_segments(
|
||||||
shm,
|
shm,
|
||||||
to_push,
|
to_push,
|
||||||
prepend_index=absi_end,
|
prepend_index=absi_end,
|
||||||
backfill_until_dt=backfill_until_dt,
|
|
||||||
update_start_on_prepend=False,
|
update_start_on_prepend=False,
|
||||||
)
|
)
|
||||||
# TODO: UI side needs IPC event to update..
|
# TODO: UI side needs IPC event to update..
|
||||||
|
|
@ -369,12 +352,15 @@ async def start_backfill(
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
|
|
||||||
backfill_from_shm_index: int,
|
backfill_from_shm_index: int,
|
||||||
backfill_from_dt: datetime,
|
backfill_from_dt: datetime,
|
||||||
|
|
||||||
sampler_stream: tractor.MsgStream,
|
sampler_stream: tractor.MsgStream,
|
||||||
|
|
||||||
backfill_until_dt: datetime | None = None,
|
backfill_until_dt: datetime | None = None,
|
||||||
storage: StorageClient | None = None,
|
storage: StorageClient | None = None,
|
||||||
|
|
||||||
write_tsdb: bool = True,
|
write_tsdb: bool = True,
|
||||||
|
|
||||||
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
@ -509,14 +495,7 @@ async def start_backfill(
|
||||||
|
|
||||||
assert time[-1] == next_end_dt.timestamp()
|
assert time[-1] == next_end_dt.timestamp()
|
||||||
|
|
||||||
expected_dur: Interval = (
|
expected_dur: Interval = last_start_dt - next_start_dt
|
||||||
last_start_dt.subtract(
|
|
||||||
seconds=timeframe
|
|
||||||
# ^XXX, always "up to" the bar *before*
|
|
||||||
)
|
|
||||||
-
|
|
||||||
next_start_dt
|
|
||||||
)
|
|
||||||
|
|
||||||
# frame's worth of sample-period-steps, in seconds
|
# frame's worth of sample-period-steps, in seconds
|
||||||
frame_size_s: float = len(array) * timeframe
|
frame_size_s: float = len(array) * timeframe
|
||||||
|
|
@ -577,7 +556,6 @@ async def start_backfill(
|
||||||
shm,
|
shm,
|
||||||
to_push,
|
to_push,
|
||||||
prepend_index=next_prepend_index,
|
prepend_index=next_prepend_index,
|
||||||
backfill_until_dt=backfill_until_dt,
|
|
||||||
update_start_on_prepend=update_start_on_prepend,
|
update_start_on_prepend=update_start_on_prepend,
|
||||||
)
|
)
|
||||||
await sampler_stream.send({
|
await sampler_stream.send({
|
||||||
|
|
@ -607,7 +585,6 @@ async def start_backfill(
|
||||||
shm,
|
shm,
|
||||||
to_push,
|
to_push,
|
||||||
prepend_index=next_prepend_index,
|
prepend_index=next_prepend_index,
|
||||||
backfill_until_dt=backfill_until_dt,
|
|
||||||
update_start_on_prepend=update_start_on_prepend,
|
update_start_on_prepend=update_start_on_prepend,
|
||||||
)
|
)
|
||||||
await sampler_stream.send({
|
await sampler_stream.send({
|
||||||
|
|
@ -1079,7 +1056,7 @@ async def tsdb_backfill(
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
|
|
||||||
bf_done: trio.Event = await tn.start(
|
bf_done = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
start_backfill,
|
start_backfill,
|
||||||
get_hist=get_hist,
|
get_hist=get_hist,
|
||||||
|
|
@ -1100,9 +1077,7 @@ async def tsdb_backfill(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
nulls_detected: trio.Event | None = None
|
nulls_detected: trio.Event | None = None
|
||||||
|
|
||||||
if last_tsdb_dt is not None:
|
if last_tsdb_dt is not None:
|
||||||
|
|
||||||
# calc the index from which the tsdb data should be
|
# calc the index from which the tsdb data should be
|
||||||
# prepended, presuming there is a gap between the
|
# prepended, presuming there is a gap between the
|
||||||
# latest frame (loaded/read above) and the latest
|
# latest frame (loaded/read above) and the latest
|
||||||
|
|
@ -1173,7 +1148,7 @@ async def tsdb_backfill(
|
||||||
# TODO: ideally these can never exist!
|
# TODO: ideally these can never exist!
|
||||||
# -[ ] somehow it seems sometimes we're writing zero-ed
|
# -[ ] somehow it seems sometimes we're writing zero-ed
|
||||||
# segments to tsdbs during teardown?
|
# segments to tsdbs during teardown?
|
||||||
# -[ ] can we ensure that the backfiller tasks do this
|
# -[ ] can we ensure that the backcfiller tasks do this
|
||||||
# work PREVENTAVELY instead?
|
# work PREVENTAVELY instead?
|
||||||
# -[ ] fill in non-zero epoch time values ALWAYS!
|
# -[ ] fill in non-zero epoch time values ALWAYS!
|
||||||
# await maybe_fill_null_segments(
|
# await maybe_fill_null_segments(
|
||||||
|
|
@ -1185,7 +1160,6 @@ async def tsdb_backfill(
|
||||||
get_hist=get_hist,
|
get_hist=get_hist,
|
||||||
sampler_stream=sampler_stream,
|
sampler_stream=sampler_stream,
|
||||||
mkt=mkt,
|
mkt=mkt,
|
||||||
backfill_until_dt=last_tsdb_dt,
|
|
||||||
))
|
))
|
||||||
|
|
||||||
# 2nd nursery END
|
# 2nd nursery END
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue