Begin formalizing `Sampler` singleton API
We're moving toward a single actor managing sampler work and distributed independently of `brokerd` services such that a user can run samplers on different hosts then real-time data feed infra. Most of the implementation details include aggregating `.data._sampling` routines into a new `Sampler` singleton type. Move the following methods to class methods: - `.increment_ohlc_buffer()` to allow a single task to increment all registered shm buffers. - `.broadcast()` for IPC relay to all registered clients/shms. Further add a new `maybe_open_global_sampler()` which allocates a service nursery and assigns it to the `Sampler.service_nursery`; this is prep for putting the step incrementer in a singleton service task higher up the data-layer actor tree.samplerd_service
parent
b5f2ff854c
commit
2c76cee928
|
@ -33,7 +33,10 @@ import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
|
||||||
from ..log import get_logger
|
from ..log import (
|
||||||
|
get_logger,
|
||||||
|
get_console_log,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._sharedmem import ShmArray
|
from ._sharedmem import ShmArray
|
||||||
|
@ -45,7 +48,7 @@ log = get_logger(__name__)
|
||||||
_default_delay_s: float = 1.0
|
_default_delay_s: float = 1.0
|
||||||
|
|
||||||
|
|
||||||
class sampler:
|
class Sampler:
|
||||||
'''
|
'''
|
||||||
Global sampling engine registry.
|
Global sampling engine registry.
|
||||||
|
|
||||||
|
@ -53,6 +56,8 @@ class sampler:
|
||||||
sample period logic.
|
sample period logic.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
service_nursery: None | trio.Nursery = None
|
||||||
|
|
||||||
# TODO: we could stick these in a composed type to avoid
|
# TODO: we could stick these in a composed type to avoid
|
||||||
# angering the "i hate module scoped variables crowd" (yawn).
|
# angering the "i hate module scoped variables crowd" (yawn).
|
||||||
ohlcv_shms: dict[int, list[ShmArray]] = {}
|
ohlcv_shms: dict[int, list[ShmArray]] = {}
|
||||||
|
@ -67,11 +72,12 @@ class sampler:
|
||||||
# notified on a step.
|
# notified on a step.
|
||||||
subscribers: dict[int, tractor.Context] = {}
|
subscribers: dict[int, tractor.Context] = {}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
async def increment_ohlc_buffer(
|
async def increment_ohlc_buffer(
|
||||||
|
self,
|
||||||
delay_s: int,
|
delay_s: int,
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Task which inserts new bars into the provide shared memory array
|
Task which inserts new bars into the provide shared memory array
|
||||||
every ``delay_s`` seconds.
|
every ``delay_s`` seconds.
|
||||||
|
@ -95,17 +101,17 @@ async def increment_ohlc_buffer(
|
||||||
# tradable hours?
|
# tradable hours?
|
||||||
|
|
||||||
# adjust delay to compensate for trio processing time
|
# adjust delay to compensate for trio processing time
|
||||||
ad = min(sampler.ohlcv_shms.keys()) - 0.001
|
ad = min(self.ohlcv_shms.keys()) - 0.001
|
||||||
|
|
||||||
total_s = 0 # total seconds counted
|
total_s = 0 # total seconds counted
|
||||||
lowest = min(sampler.ohlcv_shms.keys())
|
lowest = min(self.ohlcv_shms.keys())
|
||||||
lowest_shm = sampler.ohlcv_shms[lowest][0]
|
lowest_shm = self.ohlcv_shms[lowest][0]
|
||||||
ad = lowest - 0.001
|
ad = lowest - 0.001
|
||||||
|
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
|
|
||||||
# register this time period step as active
|
# register this time period step as active
|
||||||
sampler.incrementers[delay_s] = cs
|
self.incrementers[delay_s] = cs
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
@ -118,7 +124,7 @@ async def increment_ohlc_buffer(
|
||||||
# TODO:
|
# TODO:
|
||||||
# - this in ``numba``
|
# - this in ``numba``
|
||||||
# - just lookup shms for this step instead of iterating?
|
# - just lookup shms for this step instead of iterating?
|
||||||
for this_delay_s, shms in sampler.ohlcv_shms.items():
|
for this_delay_s, shms in self.ohlcv_shms.items():
|
||||||
|
|
||||||
# short-circuit on any not-ready because slower sample
|
# short-circuit on any not-ready because slower sample
|
||||||
# rate consuming shm buffers.
|
# rate consuming shm buffers.
|
||||||
|
@ -128,7 +134,8 @@ async def increment_ohlc_buffer(
|
||||||
|
|
||||||
# TODO: ``numba`` this!
|
# TODO: ``numba`` this!
|
||||||
for shm in shms:
|
for shm in shms:
|
||||||
# append new entry to buffer thus "incrementing" the bar
|
# append new entry to buffer thus "incrementing"
|
||||||
|
# the bar
|
||||||
array = shm.array
|
array = shm.array
|
||||||
last = array[-1:][shm._write_fields].copy()
|
last = array[-1:][shm._write_fields].copy()
|
||||||
|
|
||||||
|
@ -145,7 +152,8 @@ async def increment_ohlc_buffer(
|
||||||
|
|
||||||
# print(f'epoch {shm.token["shm_name"]}: {next_t}')
|
# print(f'epoch {shm.token["shm_name"]}: {next_t}')
|
||||||
|
|
||||||
# this copies non-std fields (eg. vwap) from the last datum
|
# this copies non-std fields (eg. vwap) from the
|
||||||
|
# last datum
|
||||||
last[[
|
last[[
|
||||||
'time',
|
'time',
|
||||||
|
|
||||||
|
@ -168,22 +176,24 @@ async def increment_ohlc_buffer(
|
||||||
0, # vlm
|
0, # vlm
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: in theory we could make this faster by copying the
|
# TODO: in theory we could make this faster by
|
||||||
# "last" readable value into the underlying larger buffer's
|
# copying the "last" readable value into the
|
||||||
# next value and then incrementing the counter instead of
|
# underlying larger buffer's next value and then
|
||||||
# using ``.push()``?
|
# incrementing the counter instead of using
|
||||||
|
# ``.push()``?
|
||||||
|
|
||||||
# write to the buffer
|
# write to the buffer
|
||||||
shm.push(last)
|
shm.push(last)
|
||||||
|
|
||||||
await broadcast(delay_s, shm=lowest_shm)
|
await self.broadcast(delay_s, shm=lowest_shm)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
async def broadcast(
|
async def broadcast(
|
||||||
|
self,
|
||||||
delay_s: int,
|
delay_s: int,
|
||||||
shm: ShmArray | None = None,
|
shm: ShmArray | None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Broadcast the given ``shm: ShmArray``'s buffer index step to any
|
Broadcast the given ``shm: ShmArray``'s buffer index step to any
|
||||||
subscribers for a given sample period.
|
subscribers for a given sample period.
|
||||||
|
@ -192,17 +202,17 @@ async def broadcast(
|
||||||
the buffer's non-empty data.
|
the buffer's non-empty data.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
subs = sampler.subscribers.get(delay_s, ())
|
subs = self.subscribers.get(delay_s, ())
|
||||||
first = last = -1
|
first = last = -1
|
||||||
|
|
||||||
if shm is None:
|
if shm is None:
|
||||||
periods = sampler.ohlcv_shms.keys()
|
periods = self.ohlcv_shms.keys()
|
||||||
# if this is an update triggered by a history update there
|
# if this is an update triggered by a history update there
|
||||||
# might not actually be any sampling bus setup since there's
|
# might not actually be any sampling bus setup since there's
|
||||||
# no "live feed" active yet.
|
# no "live feed" active yet.
|
||||||
if periods:
|
if periods:
|
||||||
lowest = min(periods)
|
lowest = min(periods)
|
||||||
shm = sampler.ohlcv_shms[lowest][0]
|
shm = self.ohlcv_shms[lowest][0]
|
||||||
first = shm._first.value
|
first = shm._first.value
|
||||||
last = shm._last.value
|
last = shm._last.value
|
||||||
|
|
||||||
|
@ -227,6 +237,32 @@ async def broadcast(
|
||||||
f'{stream._ctx.chan.uid} sub already removed!?'
|
f'{stream._ctx.chan.uid} sub already removed!?'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def broadcast_all(self) -> None:
|
||||||
|
for delay_s in self.subscribers:
|
||||||
|
await self.broadcast(delay_s)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def maybe_open_global_sampler(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
brokername: str,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
get_console_log(tractor.current_actor().loglevel)
|
||||||
|
|
||||||
|
global Sampler
|
||||||
|
|
||||||
|
async with trio.open_nursery() as service_nursery:
|
||||||
|
Sampler.service_nursery = service_nursery
|
||||||
|
|
||||||
|
# unblock caller
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
# we pin this task to keep the feeds manager active until the
|
||||||
|
# parent actor decides to tear it down
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def iter_ohlc_periods(
|
async def iter_ohlc_periods(
|
||||||
|
@ -241,7 +277,7 @@ async def iter_ohlc_periods(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# add our subscription
|
# add our subscription
|
||||||
subs = sampler.subscribers.setdefault(delay_s, [])
|
subs = Sampler.subscribers.setdefault(delay_s, [])
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
subs.append(stream)
|
subs.append(stream)
|
||||||
|
|
|
@ -74,9 +74,7 @@ from ._source import (
|
||||||
)
|
)
|
||||||
from ..ui import _search
|
from ..ui import _search
|
||||||
from ._sampling import (
|
from ._sampling import (
|
||||||
sampler,
|
Sampler,
|
||||||
broadcast,
|
|
||||||
increment_ohlc_buffer,
|
|
||||||
sample_and_broadcast,
|
sample_and_broadcast,
|
||||||
uniform_rate_send,
|
uniform_rate_send,
|
||||||
_default_delay_s,
|
_default_delay_s,
|
||||||
|
@ -327,8 +325,7 @@ async def start_backfill(
|
||||||
# TODO: *** THIS IS A BUG ***
|
# TODO: *** THIS IS A BUG ***
|
||||||
# we need to only broadcast to subscribers for this fqsn..
|
# we need to only broadcast to subscribers for this fqsn..
|
||||||
# otherwise all fsps get reset on every chart..
|
# otherwise all fsps get reset on every chart..
|
||||||
for delay_s in sampler.subscribers:
|
await Sampler.broadcast_all()
|
||||||
await broadcast(delay_s)
|
|
||||||
|
|
||||||
# signal that backfilling to tsdb's end datum is complete
|
# signal that backfilling to tsdb's end datum is complete
|
||||||
bf_done = trio.Event()
|
bf_done = trio.Event()
|
||||||
|
@ -496,8 +493,7 @@ async def start_backfill(
|
||||||
# in the block above to avoid entering new ``frames``
|
# in the block above to avoid entering new ``frames``
|
||||||
# values while we're pipelining the current ones to
|
# values while we're pipelining the current ones to
|
||||||
# memory...
|
# memory...
|
||||||
for delay_s in sampler.subscribers:
|
await Sampler.broadcast_all()
|
||||||
await broadcast(delay_s)
|
|
||||||
|
|
||||||
# short-circuit (for now)
|
# short-circuit (for now)
|
||||||
bf_done.set()
|
bf_done.set()
|
||||||
|
@ -738,8 +734,7 @@ async def tsdb_backfill(
|
||||||
# (usually a chart showing graphics for said fsp)
|
# (usually a chart showing graphics for said fsp)
|
||||||
# which tells the chart to conduct a manual full
|
# which tells the chart to conduct a manual full
|
||||||
# graphics loop cycle.
|
# graphics loop cycle.
|
||||||
for delay_s in sampler.subscribers:
|
await Sampler.broadcast_all()
|
||||||
await broadcast(delay_s)
|
|
||||||
|
|
||||||
# TODO: write new data to tsdb to be ready to for next read.
|
# TODO: write new data to tsdb to be ready to for next read.
|
||||||
|
|
||||||
|
@ -1037,7 +1032,7 @@ async def allocate_persistent_feed(
|
||||||
|
|
||||||
# insert 1s ohlc into the increment buffer set
|
# insert 1s ohlc into the increment buffer set
|
||||||
# to update and shift every second
|
# to update and shift every second
|
||||||
sampler.ohlcv_shms.setdefault(
|
Sampler.ohlcv_shms.setdefault(
|
||||||
1,
|
1,
|
||||||
[]
|
[]
|
||||||
).append(rt_shm)
|
).append(rt_shm)
|
||||||
|
@ -1053,13 +1048,13 @@ async def allocate_persistent_feed(
|
||||||
|
|
||||||
# insert 1m ohlc into the increment buffer set
|
# insert 1m ohlc into the increment buffer set
|
||||||
# to shift every 60s.
|
# to shift every 60s.
|
||||||
sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
|
Sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
|
||||||
|
|
||||||
# create buffer a single incrementer task broker backend
|
# create buffer a single incrementer task broker backend
|
||||||
# (aka `brokerd`) using the lowest sampler period.
|
# (aka `brokerd`) using the lowest sampler period.
|
||||||
if sampler.incrementers.get(_default_delay_s) is None:
|
if Sampler.incrementers.get(_default_delay_s) is None:
|
||||||
await bus.start_task(
|
await bus.start_task(
|
||||||
increment_ohlc_buffer,
|
Sampler.increment_ohlc_buffer,
|
||||||
_default_delay_s,
|
_default_delay_s,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue