Begin formalizing `Sampler` singleton API

We're moving toward a single actor managing sampler work and distributed
independently of `brokerd` services such that a user can run samplers on
different hosts then real-time data feed infra. Most of the
implementation details include aggregating `.data._sampling` routines
into a new `Sampler` singleton type.

Move the following methods to class methods:
- `.increment_ohlc_buffer()` to allow a single task to increment all
  registered shm buffers.
- `.broadcast()` for IPC relay to all registered clients/shms.

Further add a new `maybe_open_global_sampler()` which allocates
a service nursery and assigns it to the `Sampler.service_nursery`; this
is prep for putting the step incrementer in a singleton service task
higher up the data-layer actor tree.
epoch_index_backup
Tyler Goodlet 2023-01-03 11:54:18 -05:00
parent 141f4cf018
commit 33e7e204d8
2 changed files with 174 additions and 143 deletions

View File

@ -33,7 +33,10 @@ import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from ..log import get_logger from ..log import (
get_logger,
get_console_log,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ShmArray from ._sharedmem import ShmArray
@ -45,7 +48,7 @@ log = get_logger(__name__)
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
class sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -53,6 +56,8 @@ class sampler:
sample period logic. sample period logic.
''' '''
service_nursery: None | trio.Nursery = None
# TODO: we could stick these in a composed type to avoid # TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn). # angering the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[int, list[ShmArray]] = {} ohlcv_shms: dict[int, list[ShmArray]] = {}
@ -67,165 +72,196 @@ class sampler:
# notified on a step. # notified on a step.
subscribers: dict[int, tractor.Context] = {} subscribers: dict[int, tractor.Context] = {}
@classmethod
async def increment_ohlc_buffer(
self,
delay_s: int,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
):
'''
Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds.
async def increment_ohlc_buffer( This task fulfills 2 purposes:
delay_s: int, - it takes the subscribed set of shm arrays and increments them
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, on a common time period
): - broadcast of this increment "signal" message to other actor
''' subscribers
Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds.
This task fulfills 2 purposes: Note that if **no** actor has initiated this task then **none** of
- it takes the subscribed set of shm arrays and increments them the underlying buffers will actually be incremented.
on a common time period
- broadcast of this increment "signal" message to other actor
subscribers
Note that if **no** actor has initiated this task then **none** of '''
the underlying buffers will actually be incremented. # # wait for brokerd to signal we should start sampling
# await shm_incrementing(shm_token['shm_name']).wait()
''' # TODO: right now we'll spin printing bars if the last time stamp is
# # wait for brokerd to signal we should start sampling # before a large period of no market activity. Likely the best way
# await shm_incrementing(shm_token['shm_name']).wait() # to solve this is to make this task aware of the instrument's
# tradable hours?
# TODO: right now we'll spin printing bars if the last time stamp is # adjust delay to compensate for trio processing time
# before a large period of no market activity. Likely the best way ad = min(self.ohlcv_shms.keys()) - 0.001
# to solve this is to make this task aware of the instrument's
# tradable hours?
# adjust delay to compensate for trio processing time total_s = 0 # total seconds counted
ad = min(sampler.ohlcv_shms.keys()) - 0.001 lowest = min(self.ohlcv_shms.keys())
lowest_shm = self.ohlcv_shms[lowest][0]
ad = lowest - 0.001
total_s = 0 # total seconds counted with trio.CancelScope() as cs:
lowest = min(sampler.ohlcv_shms.keys())
lowest_shm = sampler.ohlcv_shms[lowest][0]
ad = lowest - 0.001
with trio.CancelScope() as cs: # register this time period step as active
self.incrementers[delay_s] = cs
task_status.started(cs)
# register this time period step as active while True:
sampler.incrementers[delay_s] = cs # TODO: do we want to support dynamically
task_status.started(cs) # adding a "lower" lowest increment period?
await trio.sleep(ad)
total_s += delay_s
while True: # increment all subscribed shm arrays
# TODO: do we want to support dynamically # TODO:
# adding a "lower" lowest increment period? # - this in ``numba``
await trio.sleep(ad) # - just lookup shms for this step instead of iterating?
total_s += delay_s for this_delay_s, shms in self.ohlcv_shms.items():
# increment all subscribed shm arrays # short-circuit on any not-ready because slower sample
# TODO: # rate consuming shm buffers.
# - this in ``numba`` if total_s % this_delay_s != 0:
# - just lookup shms for this step instead of iterating? # print(f'skipping `{this_delay_s}s` sample update')
for this_delay_s, shms in sampler.ohlcv_shms.items(): continue
# short-circuit on any not-ready because slower sample # TODO: ``numba`` this!
# rate consuming shm buffers. for shm in shms:
if total_s % this_delay_s != 0: # append new entry to buffer thus "incrementing"
# print(f'skipping `{this_delay_s}s` sample update') # the bar
continue array = shm.array
last = array[-1:][shm._write_fields].copy()
# TODO: ``numba`` this! (t, close) = last[0][[
for shm in shms: 'time',
# append new entry to buffer thus "incrementing" the bar 'close',
array = shm.array ]]
last = array[-1:][shm._write_fields].copy()
(t, close) = last[0][[ next_t = t + this_delay_s
'time', i_epoch = round(time.time())
'close',
]]
next_t = t + this_delay_s if this_delay_s <= 1:
i_epoch = round(time.time()) next_t = i_epoch
if this_delay_s <= 1: # print(f'epoch {shm.token["shm_name"]}: {next_t}')
next_t = i_epoch
# print(f'epoch {shm.token["shm_name"]}: {next_t}') # this copies non-std fields (eg. vwap) from the
# last datum
last[[
'time',
# this copies non-std fields (eg. vwap) from the last datum 'open',
last[[ 'high',
'time', 'low',
'close',
'open', 'volume',
'high', ]][0] = (
'low', # epoch timestamp
'close', next_t,
'volume', # OHLC
]][0] = ( close,
# epoch timestamp close,
next_t, close,
close,
# OHLC 0, # vlm
close, )
close,
close,
close,
0, # vlm # TODO: in theory we could make this faster by
# copying the "last" readable value into the
# underlying larger buffer's next value and then
# incrementing the counter instead of using
# ``.push()``?
# write to the buffer
shm.push(last)
await self.broadcast(delay_s, shm=lowest_shm)
@classmethod
async def broadcast(
self,
delay_s: int,
shm: ShmArray | None = None,
) -> None:
'''
Broadcast the given ``shm: ShmArray``'s buffer index step to any
subscribers for a given sample period.
The sent msg will include the first and last index which slice into
the buffer's non-empty data.
'''
subs = self.subscribers.get(delay_s, ())
first = last = -1
if shm is None:
periods = self.ohlcv_shms.keys()
# if this is an update triggered by a history update there
# might not actually be any sampling bus setup since there's
# no "live feed" active yet.
if periods:
lowest = min(periods)
shm = self.ohlcv_shms[lowest][0]
first = shm._first.value
last = shm._last.value
for stream in subs:
try:
await stream.send({
'first': first,
'last': last,
'index': last,
})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
try:
subs.remove(stream)
except ValueError:
log.warning(
f'{stream._ctx.chan.uid} sub already removed!?'
) )
# TODO: in theory we could make this faster by copying the @classmethod
# "last" readable value into the underlying larger buffer's async def broadcast_all(self) -> None:
# next value and then incrementing the counter instead of for delay_s in self.subscribers:
# using ``.push()``? await self.broadcast(delay_s)
# write to the buffer
shm.push(last)
await broadcast(delay_s, shm=lowest_shm)
async def broadcast( @tractor.context
delay_s: int, async def maybe_open_global_sampler(
shm: ShmArray | None = None, ctx: tractor.Context,
brokername: str,
) -> None: ) -> None:
''' get_console_log(tractor.current_actor().loglevel)
Broadcast the given ``shm: ShmArray``'s buffer index step to any
subscribers for a given sample period.
The sent msg will include the first and last index which slice into global Sampler
the buffer's non-empty data.
''' async with trio.open_nursery() as service_nursery:
subs = sampler.subscribers.get(delay_s, ()) Sampler.service_nursery = service_nursery
first = last = -1
if shm is None: # unblock caller
periods = sampler.ohlcv_shms.keys() await ctx.started()
# if this is an update triggered by a history update there
# might not actually be any sampling bus setup since there's
# no "live feed" active yet.
if periods:
lowest = min(periods)
shm = sampler.ohlcv_shms[lowest][0]
first = shm._first.value
last = shm._last.value
for stream in subs: # we pin this task to keep the feeds manager active until the
try: # parent actor decides to tear it down
await stream.send({ await trio.sleep_forever()
'first': first,
'last': last,
'index': last,
})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
try:
subs.remove(stream)
except ValueError:
log.warning(
f'{stream._ctx.chan.uid} sub already removed!?'
)
@tractor.context @tractor.context
@ -241,7 +277,7 @@ async def iter_ohlc_periods(
''' '''
# add our subscription # add our subscription
subs = sampler.subscribers.setdefault(delay_s, []) subs = Sampler.subscribers.setdefault(delay_s, [])
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
subs.append(stream) subs.append(stream)

View File

@ -74,9 +74,7 @@ from ._source import (
) )
from ..ui import _search from ..ui import _search
from ._sampling import ( from ._sampling import (
sampler, Sampler,
broadcast,
increment_ohlc_buffer,
sample_and_broadcast, sample_and_broadcast,
uniform_rate_send, uniform_rate_send,
_default_delay_s, _default_delay_s,
@ -327,8 +325,7 @@ async def start_backfill(
# TODO: *** THIS IS A BUG *** # TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn.. # we need to only broadcast to subscribers for this fqsn..
# otherwise all fsps get reset on every chart.. # otherwise all fsps get reset on every chart..
for delay_s in sampler.subscribers: await Sampler.broadcast_all()
await broadcast(delay_s)
# signal that backfilling to tsdb's end datum is complete # signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event() bf_done = trio.Event()
@ -496,8 +493,7 @@ async def start_backfill(
# in the block above to avoid entering new ``frames`` # in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to # values while we're pipelining the current ones to
# memory... # memory...
for delay_s in sampler.subscribers: await Sampler.broadcast_all()
await broadcast(delay_s)
# short-circuit (for now) # short-circuit (for now)
bf_done.set() bf_done.set()
@ -738,8 +734,7 @@ async def tsdb_backfill(
# (usually a chart showing graphics for said fsp) # (usually a chart showing graphics for said fsp)
# which tells the chart to conduct a manual full # which tells the chart to conduct a manual full
# graphics loop cycle. # graphics loop cycle.
for delay_s in sampler.subscribers: await Sampler.broadcast_all()
await broadcast(delay_s)
# TODO: write new data to tsdb to be ready to for next read. # TODO: write new data to tsdb to be ready to for next read.
@ -1036,7 +1031,7 @@ async def allocate_persistent_feed(
# insert 1s ohlc into the increment buffer set # insert 1s ohlc into the increment buffer set
# to update and shift every second # to update and shift every second
sampler.ohlcv_shms.setdefault( Sampler.ohlcv_shms.setdefault(
1, 1,
[] []
).append(rt_shm) ).append(rt_shm)
@ -1052,13 +1047,13 @@ async def allocate_persistent_feed(
# insert 1m ohlc into the increment buffer set # insert 1m ohlc into the increment buffer set
# to shift every 60s. # to shift every 60s.
sampler.ohlcv_shms.setdefault(60, []).append(hist_shm) Sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
# create buffer a single incrementer task broker backend # create buffer a single incrementer task broker backend
# (aka `brokerd`) using the lowest sampler period. # (aka `brokerd`) using the lowest sampler period.
if sampler.incrementers.get(_default_delay_s) is None: if Sampler.incrementers.get(_default_delay_s) is None:
await bus.start_task( await bus.start_task(
increment_ohlc_buffer, Sampler.increment_ohlc_buffer,
_default_delay_s, _default_delay_s,
) )