Compare commits
2 Commits
bac8317a4a
...
f73b981173
| Author | SHA1 | Date |
|---|---|---|
|
|
f73b981173 | |
|
|
d5edd3484f |
|
|
@ -80,20 +80,20 @@ class Sampler:
|
|||
This non-instantiated type is meant to be a singleton within
|
||||
a `samplerd` actor-service spawned once by the user wishing to
|
||||
time-step-sample (real-time) quote feeds, see
|
||||
``.service.maybe_open_samplerd()`` and the below
|
||||
``register_with_sampler()``.
|
||||
`.service.maybe_open_samplerd()` and the below
|
||||
`register_with_sampler()`.
|
||||
|
||||
'''
|
||||
service_nursery: None | trio.Nursery = None
|
||||
service_nursery: None|trio.Nursery = None
|
||||
|
||||
# TODO: we could stick these in a composed type to avoid
|
||||
# angering the "i hate module scoped variables crowd" (yawn).
|
||||
# TODO: we could stick these in a composed type to avoid angering
|
||||
# the "i hate module scoped variables crowd" (yawn).
|
||||
ohlcv_shms: dict[float, list[ShmArray]] = {}
|
||||
|
||||
# holds one-task-per-sample-period tasks which are spawned as-needed by
|
||||
# data feed requests with a given detected time step usually from
|
||||
# history loading.
|
||||
incr_task_cs: trio.CancelScope | None = None
|
||||
incr_task_cs: trio.CancelScope|None = None
|
||||
|
||||
bcast_errors: tuple[Exception] = (
|
||||
trio.BrokenResourceError,
|
||||
|
|
@ -248,8 +248,8 @@ class Sampler:
|
|||
async def broadcast(
|
||||
self,
|
||||
period_s: float,
|
||||
time_stamp: float | None = None,
|
||||
info: dict | None = None,
|
||||
time_stamp: float|None = None,
|
||||
info: dict|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
|
@ -313,7 +313,7 @@ class Sampler:
|
|||
@classmethod
|
||||
async def broadcast_all(
|
||||
self,
|
||||
info: dict | None = None,
|
||||
info: dict|None = None,
|
||||
) -> None:
|
||||
|
||||
# NOTE: take a copy of subs since removals can happen
|
||||
|
|
@ -330,12 +330,12 @@ class Sampler:
|
|||
async def register_with_sampler(
|
||||
ctx: Context,
|
||||
period_s: float,
|
||||
shms_by_period: dict[float, dict] | None = None,
|
||||
shms_by_period: dict[float, dict]|None = None,
|
||||
|
||||
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
|
||||
sub_for_broadcasts: bool = True, # sampler side to send step updates?
|
||||
|
||||
) -> None:
|
||||
) -> set[int]:
|
||||
|
||||
get_console_log(tractor.current_actor().loglevel)
|
||||
incr_was_started: bool = False
|
||||
|
|
@ -362,7 +362,12 @@ async def register_with_sampler(
|
|||
|
||||
# insert the base 1s period (for OHLC style sampling) into
|
||||
# the increment buffer set to update and shift every second.
|
||||
if shms_by_period is not None:
|
||||
if (
|
||||
shms_by_period is not None
|
||||
# and
|
||||
# feed_is_live.is_set()
|
||||
# ^TODO? pass it in instead?
|
||||
):
|
||||
from ._sharedmem import (
|
||||
attach_shm_array,
|
||||
_Token,
|
||||
|
|
@ -376,12 +381,17 @@ async def register_with_sampler(
|
|||
readonly=False,
|
||||
)
|
||||
shms_by_period[period] = shm
|
||||
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
|
||||
Sampler.ohlcv_shms.setdefault(
|
||||
period,
|
||||
[],
|
||||
).append(shm)
|
||||
|
||||
assert Sampler.ohlcv_shms
|
||||
|
||||
# unblock caller
|
||||
await ctx.started(set(Sampler.ohlcv_shms.keys()))
|
||||
await ctx.started(
|
||||
set(Sampler.ohlcv_shms.keys())
|
||||
)
|
||||
|
||||
if open_index_stream:
|
||||
try:
|
||||
|
|
@ -427,7 +437,7 @@ async def register_with_sampler(
|
|||
|
||||
async def spawn_samplerd(
|
||||
|
||||
loglevel: str | None = None,
|
||||
loglevel: str|None = None,
|
||||
**extra_tractor_kwargs
|
||||
|
||||
) -> bool:
|
||||
|
|
@ -473,7 +483,7 @@ async def spawn_samplerd(
|
|||
@acm
|
||||
async def maybe_open_samplerd(
|
||||
|
||||
loglevel: str | None = None,
|
||||
loglevel: str|None = None,
|
||||
**pikerd_kwargs,
|
||||
|
||||
) -> tractor.Portal: # noqa
|
||||
|
|
@ -498,11 +508,11 @@ async def maybe_open_samplerd(
|
|||
@acm
|
||||
async def open_sample_stream(
|
||||
period_s: float,
|
||||
shms_by_period: dict[float, dict] | None = None,
|
||||
shms_by_period: dict[float, dict]|None = None,
|
||||
open_index_stream: bool = True,
|
||||
sub_for_broadcasts: bool = True,
|
||||
|
||||
cache_key: str | None = None,
|
||||
cache_key: str|None = None,
|
||||
allow_new_sampler: bool = True,
|
||||
|
||||
ensure_is_active: bool = False,
|
||||
|
|
@ -533,6 +543,8 @@ async def open_sample_stream(
|
|||
# yield bistream
|
||||
# else:
|
||||
|
||||
ctx: tractor.Context
|
||||
shm_periods: set[int] # in `int`-seconds
|
||||
async with (
|
||||
# XXX: this should be singleton on a host,
|
||||
# a lone broker-daemon per provider should be
|
||||
|
|
@ -547,10 +559,10 @@ async def open_sample_stream(
|
|||
'open_index_stream': open_index_stream,
|
||||
'sub_for_broadcasts': sub_for_broadcasts,
|
||||
},
|
||||
) as (ctx, first)
|
||||
) as (ctx, shm_periods)
|
||||
):
|
||||
if ensure_is_active:
|
||||
assert len(first) > 1
|
||||
assert len(shm_periods) > 1
|
||||
|
||||
async with (
|
||||
ctx.open_stream(
|
||||
|
|
|
|||
|
|
@ -247,6 +247,11 @@ async def maybe_fill_null_segments(
|
|||
from_timestamp(array['time'][0])
|
||||
) < backfill_until_dt
|
||||
):
|
||||
log.error(
|
||||
f'Invalid frame_start !?\n'
|
||||
f'frame_start_dt: {frame_start_dt!r}\n'
|
||||
f'backfill_until_dt: {backfill_until_dt!r}\n'
|
||||
)
|
||||
await tractor.pause()
|
||||
|
||||
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
||||
|
|
@ -1284,6 +1289,7 @@ async def manage_history(
|
|||
some_data_ready: trio.Event,
|
||||
feed_is_live: trio.Event,
|
||||
timeframe: float = 60, # in seconds
|
||||
wait_for_live_timeout: float = 0.5,
|
||||
|
||||
task_status: TaskStatus[
|
||||
tuple[ShmArray, ShmArray]
|
||||
|
|
@ -1432,12 +1438,26 @@ async def manage_history(
|
|||
1: rt_shm,
|
||||
60: hist_shm,
|
||||
}
|
||||
async with open_sample_stream(
|
||||
period_s=1.,
|
||||
shms_by_period={
|
||||
|
||||
shms_by_period: dict|None = None
|
||||
with trio.move_on_after(wait_for_live_timeout) as cs:
|
||||
await feed_is_live.wait()
|
||||
|
||||
if cs.cancelled_caught:
|
||||
log.warning(
|
||||
f'No live feed within {wait_for_live_timeout!r}s\n'
|
||||
f'fqme: {mkt.fqme!r}\n'
|
||||
f'NOT activating shm-buffer-sampler!!\n'
|
||||
)
|
||||
|
||||
if feed_is_live.is_set():
|
||||
shms_by_period: dict[int, dict] = {
|
||||
1.: rt_shm.token,
|
||||
60.: hist_shm.token,
|
||||
},
|
||||
}
|
||||
async with open_sample_stream(
|
||||
period_s=1.,
|
||||
shms_by_period=shms_by_period,
|
||||
|
||||
# NOTE: we want to only open a stream for doing
|
||||
# broadcasts on backfill operations, not receive the
|
||||
|
|
|
|||
Loading…
Reference in New Issue