Compare commits

..

No commits in common. "f73b9811734e2072a5486b790f0b1232dc81ae25" and "bac8317a4ad488fbbb218a1ffc11655b02dec2d0" have entirely different histories.

2 changed files with 24 additions and 56 deletions

View File

@ -80,20 +80,20 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
'''
service_nursery: None|trio.Nursery = None
service_nursery: None | trio.Nursery = None
# TODO: we could stick these in a composed type to avoid angering
# the "i hate module scoped variables crowd" (yawn).
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by
# data feed requests with a given detected time step usually from
# history loading.
incr_task_cs: trio.CancelScope|None = None
incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
@ -248,8 +248,8 @@ class Sampler:
async def broadcast(
self,
period_s: float,
time_stamp: float|None = None,
info: dict|None = None,
time_stamp: float | None = None,
info: dict | None = None,
) -> None:
'''
@ -313,7 +313,7 @@ class Sampler:
@classmethod
async def broadcast_all(
self,
info: dict|None = None,
info: dict | None = None,
) -> None:
# NOTE: take a copy of subs since removals can happen
@ -330,12 +330,12 @@ class Sampler:
async def register_with_sampler(
ctx: Context,
period_s: float,
shms_by_period: dict[float, dict]|None = None,
shms_by_period: dict[float, dict] | None = None,
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> set[int]:
) -> None:
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
@ -362,12 +362,7 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second.
if (
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
if shms_by_period is not None:
from ._sharedmem import (
attach_shm_array,
_Token,
@ -381,17 +376,12 @@ async def register_with_sampler(
readonly=False,
)
shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault(
period,
[],
).append(shm)
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(
set(Sampler.ohlcv_shms.keys())
)
await ctx.started(set(Sampler.ohlcv_shms.keys()))
if open_index_stream:
try:
@ -437,7 +427,7 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str|None = None,
loglevel: str | None = None,
**extra_tractor_kwargs
) -> bool:
@ -483,7 +473,7 @@ async def spawn_samplerd(
@acm
async def maybe_open_samplerd(
loglevel: str|None = None,
loglevel: str | None = None,
**pikerd_kwargs,
) -> tractor.Portal: # noqa
@ -508,11 +498,11 @@ async def maybe_open_samplerd(
@acm
async def open_sample_stream(
period_s: float,
shms_by_period: dict[float, dict]|None = None,
shms_by_period: dict[float, dict] | None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
cache_key: str|None = None,
cache_key: str | None = None,
allow_new_sampler: bool = True,
ensure_is_active: bool = False,
@ -543,8 +533,6 @@ async def open_sample_stream(
# yield bistream
# else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with (
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
@ -559,10 +547,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
},
) as (ctx, shm_periods)
) as (ctx, first)
):
if ensure_is_active:
assert len(shm_periods) > 1
assert len(first) > 1
async with (
ctx.open_stream(

View File

@ -247,11 +247,6 @@ async def maybe_fill_null_segments(
from_timestamp(array['time'][0])
) < backfill_until_dt
):
log.error(
f'Invalid frame_start !?\n'
f'frame_start_dt: {frame_start_dt!r}\n'
f'backfill_until_dt: {backfill_until_dt!r}\n'
)
await tractor.pause()
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
@ -1289,7 +1284,6 @@ async def manage_history(
some_data_ready: trio.Event,
feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
@ -1438,26 +1432,12 @@ async def manage_history(
1: rt_shm,
60: hist_shm,
}
shms_by_period: dict|None = None
with trio.move_on_after(wait_for_live_timeout) as cs:
await feed_is_live.wait()
if cs.cancelled_caught:
log.warning(
f'No live feed within {wait_for_live_timeout!r}s\n'
f'fqme: {mkt.fqme!r}\n'
f'NOT activating shm-buffer-sampler!!\n'
)
if feed_is_live.is_set():
shms_by_period: dict[int, dict] = {
1.: rt_shm.token,
60.: hist_shm.token,
}
async with open_sample_stream(
period_s=1.,
shms_by_period=shms_by_period,
shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
# NOTE: we want to only open a stream for doing
# broadcasts on backfill operations, not receive the