Compare commits

..

No commits in common. "f73b9811734e2072a5486b790f0b1232dc81ae25" and "bac8317a4ad488fbbb218a1ffc11655b02dec2d0" have entirely different histories.

2 changed files with 24 additions and 56 deletions

View File

@ -80,14 +80,14 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below ``.service.maybe_open_samplerd()`` and the below
`register_with_sampler()`. ``register_with_sampler()``.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
# TODO: we could stick these in a composed type to avoid angering # TODO: we could stick these in a composed type to avoid
# the "i hate module scoped variables crowd" (yawn). # angering the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {} ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by # holds one-task-per-sample-period tasks which are spawned as-needed by
@ -335,7 +335,7 @@ async def register_with_sampler(
open_index_stream: bool = True, # open a 2way stream for sample step msgs? open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates? sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> set[int]: ) -> None:
get_console_log(tractor.current_actor().loglevel) get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False incr_was_started: bool = False
@ -362,12 +362,7 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into # insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second. # the increment buffer set to update and shift every second.
if ( if shms_by_period is not None:
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
from ._sharedmem import ( from ._sharedmem import (
attach_shm_array, attach_shm_array,
_Token, _Token,
@ -381,17 +376,12 @@ async def register_with_sampler(
readonly=False, readonly=False,
) )
shms_by_period[period] = shm shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault( Sampler.ohlcv_shms.setdefault(period, []).append(shm)
period,
[],
).append(shm)
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started( await ctx.started(set(Sampler.ohlcv_shms.keys()))
set(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -543,8 +533,6 @@ async def open_sample_stream(
# yield bistream # yield bistream
# else: # else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with ( async with (
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
@ -559,10 +547,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream, 'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts, 'sub_for_broadcasts': sub_for_broadcasts,
}, },
) as (ctx, shm_periods) ) as (ctx, first)
): ):
if ensure_is_active: if ensure_is_active:
assert len(shm_periods) > 1 assert len(first) > 1
async with ( async with (
ctx.open_stream( ctx.open_stream(

View File

@ -247,11 +247,6 @@ async def maybe_fill_null_segments(
from_timestamp(array['time'][0]) from_timestamp(array['time'][0])
) < backfill_until_dt ) < backfill_until_dt
): ):
log.error(
f'Invalid frame_start !?\n'
f'frame_start_dt: {frame_start_dt!r}\n'
f'backfill_until_dt: {backfill_until_dt!r}\n'
)
await tractor.pause() await tractor.pause()
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance # XXX TODO: pretty sure if i plot tsla, btcusdt.binance
@ -1289,7 +1284,6 @@ async def manage_history(
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
timeframe: float = 60, # in seconds timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ShmArray, ShmArray] tuple[ShmArray, ShmArray]
@ -1438,26 +1432,12 @@ async def manage_history(
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,
} }
shms_by_period: dict|None = None
with trio.move_on_after(wait_for_live_timeout) as cs:
await feed_is_live.wait()
if cs.cancelled_caught:
log.warning(
f'No live feed within {wait_for_live_timeout!r}s\n'
f'fqme: {mkt.fqme!r}\n'
f'NOT activating shm-buffer-sampler!!\n'
)
if feed_is_live.is_set():
shms_by_period: dict[int, dict] = {
1.: rt_shm.token,
60.: hist_shm.token,
}
async with open_sample_stream( async with open_sample_stream(
period_s=1., period_s=1.,
shms_by_period=shms_by_period, shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
# NOTE: we want to only open a stream for doing # NOTE: we want to only open a stream for doing
# broadcasts on backfill operations, not receive the # broadcasts on backfill operations, not receive the