Compare commits
2 Commits
bac8317a4a
...
f73b981173
| Author | SHA1 | Date |
|---|---|---|
|
|
f73b981173 | |
|
|
d5edd3484f |
|
|
@ -80,20 +80,20 @@ class Sampler:
|
||||||
This non-instantiated type is meant to be a singleton within
|
This non-instantiated type is meant to be a singleton within
|
||||||
a `samplerd` actor-service spawned once by the user wishing to
|
a `samplerd` actor-service spawned once by the user wishing to
|
||||||
time-step-sample (real-time) quote feeds, see
|
time-step-sample (real-time) quote feeds, see
|
||||||
``.service.maybe_open_samplerd()`` and the below
|
`.service.maybe_open_samplerd()` and the below
|
||||||
``register_with_sampler()``.
|
`register_with_sampler()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
service_nursery: None | trio.Nursery = None
|
service_nursery: None|trio.Nursery = None
|
||||||
|
|
||||||
# TODO: we could stick these in a composed type to avoid
|
# TODO: we could stick these in a composed type to avoid angering
|
||||||
# angering the "i hate module scoped variables crowd" (yawn).
|
# the "i hate module scoped variables crowd" (yawn).
|
||||||
ohlcv_shms: dict[float, list[ShmArray]] = {}
|
ohlcv_shms: dict[float, list[ShmArray]] = {}
|
||||||
|
|
||||||
# holds one-task-per-sample-period tasks which are spawned as-needed by
|
# holds one-task-per-sample-period tasks which are spawned as-needed by
|
||||||
# data feed requests with a given detected time step usually from
|
# data feed requests with a given detected time step usually from
|
||||||
# history loading.
|
# history loading.
|
||||||
incr_task_cs: trio.CancelScope | None = None
|
incr_task_cs: trio.CancelScope|None = None
|
||||||
|
|
||||||
bcast_errors: tuple[Exception] = (
|
bcast_errors: tuple[Exception] = (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
|
@ -248,8 +248,8 @@ class Sampler:
|
||||||
async def broadcast(
|
async def broadcast(
|
||||||
self,
|
self,
|
||||||
period_s: float,
|
period_s: float,
|
||||||
time_stamp: float | None = None,
|
time_stamp: float|None = None,
|
||||||
info: dict | None = None,
|
info: dict|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
|
@ -313,7 +313,7 @@ class Sampler:
|
||||||
@classmethod
|
@classmethod
|
||||||
async def broadcast_all(
|
async def broadcast_all(
|
||||||
self,
|
self,
|
||||||
info: dict | None = None,
|
info: dict|None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# NOTE: take a copy of subs since removals can happen
|
# NOTE: take a copy of subs since removals can happen
|
||||||
|
|
@ -330,12 +330,12 @@ class Sampler:
|
||||||
async def register_with_sampler(
|
async def register_with_sampler(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
period_s: float,
|
period_s: float,
|
||||||
shms_by_period: dict[float, dict] | None = None,
|
shms_by_period: dict[float, dict]|None = None,
|
||||||
|
|
||||||
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
|
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
|
||||||
sub_for_broadcasts: bool = True, # sampler side to send step updates?
|
sub_for_broadcasts: bool = True, # sampler side to send step updates?
|
||||||
|
|
||||||
) -> None:
|
) -> set[int]:
|
||||||
|
|
||||||
get_console_log(tractor.current_actor().loglevel)
|
get_console_log(tractor.current_actor().loglevel)
|
||||||
incr_was_started: bool = False
|
incr_was_started: bool = False
|
||||||
|
|
@ -362,7 +362,12 @@ async def register_with_sampler(
|
||||||
|
|
||||||
# insert the base 1s period (for OHLC style sampling) into
|
# insert the base 1s period (for OHLC style sampling) into
|
||||||
# the increment buffer set to update and shift every second.
|
# the increment buffer set to update and shift every second.
|
||||||
if shms_by_period is not None:
|
if (
|
||||||
|
shms_by_period is not None
|
||||||
|
# and
|
||||||
|
# feed_is_live.is_set()
|
||||||
|
# ^TODO? pass it in instead?
|
||||||
|
):
|
||||||
from ._sharedmem import (
|
from ._sharedmem import (
|
||||||
attach_shm_array,
|
attach_shm_array,
|
||||||
_Token,
|
_Token,
|
||||||
|
|
@ -376,12 +381,17 @@ async def register_with_sampler(
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
shms_by_period[period] = shm
|
shms_by_period[period] = shm
|
||||||
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
|
Sampler.ohlcv_shms.setdefault(
|
||||||
|
period,
|
||||||
|
[],
|
||||||
|
).append(shm)
|
||||||
|
|
||||||
assert Sampler.ohlcv_shms
|
assert Sampler.ohlcv_shms
|
||||||
|
|
||||||
# unblock caller
|
# unblock caller
|
||||||
await ctx.started(set(Sampler.ohlcv_shms.keys()))
|
await ctx.started(
|
||||||
|
set(Sampler.ohlcv_shms.keys())
|
||||||
|
)
|
||||||
|
|
||||||
if open_index_stream:
|
if open_index_stream:
|
||||||
try:
|
try:
|
||||||
|
|
@ -427,7 +437,7 @@ async def register_with_sampler(
|
||||||
|
|
||||||
async def spawn_samplerd(
|
async def spawn_samplerd(
|
||||||
|
|
||||||
loglevel: str | None = None,
|
loglevel: str|None = None,
|
||||||
**extra_tractor_kwargs
|
**extra_tractor_kwargs
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
|
@ -473,7 +483,7 @@ async def spawn_samplerd(
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_samplerd(
|
async def maybe_open_samplerd(
|
||||||
|
|
||||||
loglevel: str | None = None,
|
loglevel: str|None = None,
|
||||||
**pikerd_kwargs,
|
**pikerd_kwargs,
|
||||||
|
|
||||||
) -> tractor.Portal: # noqa
|
) -> tractor.Portal: # noqa
|
||||||
|
|
@ -498,11 +508,11 @@ async def maybe_open_samplerd(
|
||||||
@acm
|
@acm
|
||||||
async def open_sample_stream(
|
async def open_sample_stream(
|
||||||
period_s: float,
|
period_s: float,
|
||||||
shms_by_period: dict[float, dict] | None = None,
|
shms_by_period: dict[float, dict]|None = None,
|
||||||
open_index_stream: bool = True,
|
open_index_stream: bool = True,
|
||||||
sub_for_broadcasts: bool = True,
|
sub_for_broadcasts: bool = True,
|
||||||
|
|
||||||
cache_key: str | None = None,
|
cache_key: str|None = None,
|
||||||
allow_new_sampler: bool = True,
|
allow_new_sampler: bool = True,
|
||||||
|
|
||||||
ensure_is_active: bool = False,
|
ensure_is_active: bool = False,
|
||||||
|
|
@ -533,6 +543,8 @@ async def open_sample_stream(
|
||||||
# yield bistream
|
# yield bistream
|
||||||
# else:
|
# else:
|
||||||
|
|
||||||
|
ctx: tractor.Context
|
||||||
|
shm_periods: set[int] # in `int`-seconds
|
||||||
async with (
|
async with (
|
||||||
# XXX: this should be singleton on a host,
|
# XXX: this should be singleton on a host,
|
||||||
# a lone broker-daemon per provider should be
|
# a lone broker-daemon per provider should be
|
||||||
|
|
@ -547,10 +559,10 @@ async def open_sample_stream(
|
||||||
'open_index_stream': open_index_stream,
|
'open_index_stream': open_index_stream,
|
||||||
'sub_for_broadcasts': sub_for_broadcasts,
|
'sub_for_broadcasts': sub_for_broadcasts,
|
||||||
},
|
},
|
||||||
) as (ctx, first)
|
) as (ctx, shm_periods)
|
||||||
):
|
):
|
||||||
if ensure_is_active:
|
if ensure_is_active:
|
||||||
assert len(first) > 1
|
assert len(shm_periods) > 1
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream(
|
ctx.open_stream(
|
||||||
|
|
|
||||||
|
|
@ -247,6 +247,11 @@ async def maybe_fill_null_segments(
|
||||||
from_timestamp(array['time'][0])
|
from_timestamp(array['time'][0])
|
||||||
) < backfill_until_dt
|
) < backfill_until_dt
|
||||||
):
|
):
|
||||||
|
log.error(
|
||||||
|
f'Invalid frame_start !?\n'
|
||||||
|
f'frame_start_dt: {frame_start_dt!r}\n'
|
||||||
|
f'backfill_until_dt: {backfill_until_dt!r}\n'
|
||||||
|
)
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
||||||
|
|
@ -1284,6 +1289,7 @@ async def manage_history(
|
||||||
some_data_ready: trio.Event,
|
some_data_ready: trio.Event,
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
timeframe: float = 60, # in seconds
|
timeframe: float = 60, # in seconds
|
||||||
|
wait_for_live_timeout: float = 0.5,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
tuple[ShmArray, ShmArray]
|
tuple[ShmArray, ShmArray]
|
||||||
|
|
@ -1432,12 +1438,26 @@ async def manage_history(
|
||||||
1: rt_shm,
|
1: rt_shm,
|
||||||
60: hist_shm,
|
60: hist_shm,
|
||||||
}
|
}
|
||||||
async with open_sample_stream(
|
|
||||||
period_s=1.,
|
shms_by_period: dict|None = None
|
||||||
shms_by_period={
|
with trio.move_on_after(wait_for_live_timeout) as cs:
|
||||||
|
await feed_is_live.wait()
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
log.warning(
|
||||||
|
f'No live feed within {wait_for_live_timeout!r}s\n'
|
||||||
|
f'fqme: {mkt.fqme!r}\n'
|
||||||
|
f'NOT activating shm-buffer-sampler!!\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if feed_is_live.is_set():
|
||||||
|
shms_by_period: dict[int, dict] = {
|
||||||
1.: rt_shm.token,
|
1.: rt_shm.token,
|
||||||
60.: hist_shm.token,
|
60.: hist_shm.token,
|
||||||
},
|
}
|
||||||
|
async with open_sample_stream(
|
||||||
|
period_s=1.,
|
||||||
|
shms_by_period=shms_by_period,
|
||||||
|
|
||||||
# NOTE: we want to only open a stream for doing
|
# NOTE: we want to only open a stream for doing
|
||||||
# broadcasts on backfill operations, not receive the
|
# broadcasts on backfill operations, not receive the
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue