Async load data history, allow "offline" feed use

Break up real-time quote feed and history loading into 2 separate tasks
and deliver a client side `data.Feed` as soon as history is loaded
(instead of waiting for a rt quote - the previous logic). If
a symbol doesn't have history then likely the feed shouldn't be loaded
(since presumably client code will need at least "some" datums history
to do anything) and waiting on a real-time quote is dumb, since it'll
hang if the market isn't open XD. If a symbol doesn't have history we
can always write a zero/null array when we run into that case. This also
greatly speeds up feed loading when both history and quotes are available.

TL;DR summary:
- add a `_Feedsbus.start_task()` one-cancel-scope-per-task method for
  assisting with (re-)starting and stopping long running persistent
  feeds (basically a "one cancels one" style nursery API).
- add a `manage_history()` task which does all history loading (and
  eventually real-time writing) which has an independent signal and
  start it in a separate task.
- drop the "sample rate per symbol" stuff since client code doesn't really
  care when it can just inspect shm indexing/time-steps itself.
- run throttle tasks in the bus nursery thus avoiding cancelling the
  underlying sampler task on feed client disconnects.
- don't store a repeated ref the bus nursery's cancel scope..
async_hist_loading
Tyler Goodlet 2022-02-22 14:42:31 -05:00
parent 1d3ed6c333
commit bf3b58e861
1 changed files with 210 additions and 89 deletions

View File

@ -27,6 +27,7 @@ from types import ModuleType
from typing import ( from typing import (
Any, Sequence, Any, Sequence,
AsyncIterator, Optional, AsyncIterator, Optional,
Awaitable,
) )
import trio import trio
@ -72,12 +73,24 @@ class _FeedsBus(BaseModel):
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. streams that can be allocated and left alive indefinitely. A bus is
associated one-to-one with a particular broker backend where the
"bus" refers so a multi-symbol bus where quotes are interleaved in
time.
Each "entry" in the bus includes:
- a stream used to push real time quotes (up to tick rates)
which is executed as a lone task that is cancellable via
a dedicated cancel scope.
''' '''
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
brokername: str brokername: str
nursery: trio.Nursery nursery: trio.Nursery
feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {} feeds: dict[str, tuple[dict, dict]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
@ -91,14 +104,27 @@ class _FeedsBus(BaseModel):
list[tuple[tractor.MsgStream, Optional[float]]] list[tuple[tractor.MsgStream, Optional[float]]]
] = {} ] = {}
class Config: async def start_task(
arbitrary_types_allowed = True self,
underscore_attrs_are_private = False target: Awaitable,
*args,
) -> None:
async def cancel_all(self) -> None: async def start_with_cs(
for sym, (cs, msg, quote) in self.feeds.items(): task_status: TaskStatus[
log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') trio.CancelScope] = trio.TASK_STATUS_IGNORED,
cs.cancel() ) -> None:
with trio.CancelScope() as cs:
self.nursery.start_soon(
target,
*args,
)
task_status.started(cs)
return await self.nursery.start(start_with_cs)
def cancel_task(self, task: trio.Task) -> bool:
pass
_bus: _FeedsBus = None _bus: _FeedsBus = None
@ -156,7 +182,78 @@ async def _setup_persistent_brokerd(
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
# TODO: this needs to be shielded? # TODO: this needs to be shielded?
await bus.cancel_all() bus.nursery.cancel_scope.cancel()
async def manage_history(
mod: ModuleType,
shm: ShmArray,
bus: _FeedsBus,
symbol: str,
we_opened_shm: bool,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
) -> None:
'''
Load and manage historical data including the loading of any
available series from `marketstore` as well as conducting real-time
update of both that existing db and the allocated shared memory
buffer.
'''
# TODO:
# history retreival, see if we can pull from an existing
# ``marketstored`` daemon
# log.info('Scanning for existing `marketstored`')
# from .marketstore import load_history
# arrays = await load_history(symbol)
arrays = {}
opened = we_opened_shm
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
if opened:
if arrays:
# push to shm
# set data ready
# some_data_ready.set()
raise ValueError('this should never execute yet')
else:
# ask broker backend for new history
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
cs = await bus.nursery.start(mod.backfill_bars, symbol, shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if _incrementers.get(delay_s) is None:
cs = await bus.start_task(increment_ohlc_buffer, delay_s)
await trio.sleep_forever()
cs.cancel()
async def allocate_persistent_feed( async def allocate_persistent_feed(
@ -168,17 +265,30 @@ async def allocate_persistent_feed(
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Create and maintain a "feed bus" which allocates tasks for real-time
streaming and optional historical data storage per broker/data provider
backend; this normally task runs *in* a `brokerd` actor.
If none exists, this allocates a ``_FeedsBus`` which manages the
lifetimes of streaming tasks created for each requested symbol.
2 tasks are created:
- a real-time streaming task which connec
'''
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
except ImportError: except ImportError:
mod = get_ingestormod(brokername) mod = get_ingestormod(brokername)
# allocate shm array for this broker/symbol fqsn = mk_fqsn(brokername, symbol)
# XXX: we should get an error here if one already exists
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array( shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(brokername, symbol), key=fqsn,
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -187,69 +297,73 @@ async def allocate_persistent_feed(
readonly=False, readonly=False,
) )
# do history validation? # mem chan handed to broker backend so it can push real-time
# assert opened, f'Persistent shm for {symbol} was already open?!' # quotes to this task for sampling and history storage (see below).
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
send, quote_stream = trio.open_memory_channel(10) send, quote_stream = trio.open_memory_channel(10)
# data sync signals for both history loading and market quotes
some_data_ready = trio.Event()
feed_is_live = trio.Event() feed_is_live = trio.Event()
# establish broker backend quote stream # run 2 tasks:
# ``stream_quotes()`` is a required backend func # - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured).
bus.nursery.start_soon(
manage_history,
mod,
shm,
bus,
symbol,
opened,
some_data_ready,
feed_is_live,
)
# establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quotes = await bus.nursery.start( init_msg, first_quotes = await bus.nursery.start(
partial( partial(
mod.stream_quotes, mod.stream_quotes,
send_chan=send, send_chan=send,
feed_is_live=feed_is_live, feed_is_live=feed_is_live,
symbols=[symbol], symbols=[symbol],
shm=shm,
loglevel=loglevel, loglevel=loglevel,
) )
) )
# we hand an IPC-msg compatible shm token to the caller so it
# can read directly from the memory which will be written by
# this task.
init_msg[symbol]['shm_token'] = shm.token init_msg[symbol]['shm_token'] = shm.token
cs = bus.nursery.cancel_scope
# TODO: make this into a composed type which also
# contains the backfiller cs for individual super-based
# resspawns when needed.
# XXX: the ``symbol`` here is put into our native piker format (i.e.
# lower case).
bus.feeds[symbol.lower()] = (cs, init_msg, first_quotes)
if opened:
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
await bus.nursery.start(mod.backfill_bars, symbol, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
# for most purposes.
# pass OHLC sample rate in seconds (be sure to use python int type) # pass OHLC sample rate in seconds (be sure to use python int type)
init_msg[symbol]['sample_rate'] = int(delay_s) # init_msg[symbol]['sample_rate'] = 1 #int(delay_s)
# yield back control to starting nursery # yield back control to starting nursery once we receive either
# some history or a real-time quote.
await some_data_ready.wait()
bus.feeds[symbol.lower()] = (init_msg, first_quotes)
task_status.started((init_msg, first_quotes)) task_status.started((init_msg, first_quotes))
# backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
if opened:
_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling
if _incrementers.get(delay_s) is None:
cs = await bus.nursery.start(increment_ohlc_buffer, delay_s)
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
# start sample loop # start sample loop
try: try:
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) await sample_and_broadcast(
bus,
shm,
quote_stream,
sum_tick_vlm
)
finally: finally:
log.warning(f'{symbol}@{brokername} feed task terminated') log.warning(f'{symbol}@{brokername} feed task terminated')
@ -265,36 +379,43 @@ async def open_feed_bus(
start_stream: bool = True, start_stream: bool = True,
) -> None: ) -> None:
'''
Open a data feed "bus": an actor-persistent per-broker task-oriented
data feed registry which allows managing real-time quote streams per
symbol.
'''
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
# local state sanity checks
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are # ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
bus._subscribers.setdefault(symbol, [])
fqsn = mk_fqsn(brokername, symbol)
entry = bus.feeds.get(symbol) entry = bus.feeds.get(symbol)
bus._subscribers.setdefault(symbol, [])
fs = mk_fqsn(symbol, brokername)
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery
async with bus.task_lock:
if entry is None: if entry is None:
if not start_stream: if not start_stream:
raise RuntimeError( raise RuntimeError(
f'No stream feed exists for {fs}?\n' f'No stream feed exists for {fqsn}?\n'
f'You may need a `brokerd` started first.' f'You may need a `brokerd` started first.'
) )
# allocate a new actor-local stream bus which will persist for
# this `brokerd`.
async with bus.task_lock:
init_msg, first_quotes = await bus.nursery.start( init_msg, first_quotes = await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
@ -310,25 +431,25 @@ async def open_feed_bus(
loglevel=loglevel, loglevel=loglevel,
) )
) )
# TODO: we can remove this?
assert isinstance(bus.feeds[symbol], tuple) assert isinstance(bus.feeds[symbol], tuple)
# XXX: ``first_quotes`` may be outdated here if this is secondary # XXX: ``first_quotes`` may be outdated here if this is secondary
# subscriber # subscriber
cs, init_msg, first_quotes = bus.feeds[symbol] init_msg, first_quotes = bus.feeds[symbol]
# send this even to subscribers to existing feed? # send this even to subscribers to existing feed?
# deliver initial info message a first quote asap # deliver initial info message a first quote asap
await ctx.started((init_msg, first_quotes)) await ctx.started((init_msg, first_quotes))
if not start_stream: if not start_stream:
log.warning(f'Not opening real-time stream for {fs}') log.warning(f'Not opening real-time stream for {fqsn}')
await trio.sleep_forever() await trio.sleep_forever()
# real-time stream loop
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
trio.open_nursery() as n,
): ):
if tick_throttle: if tick_throttle:
# open a bg task which receives quotes over a mem chan # open a bg task which receives quotes over a mem chan
@ -336,7 +457,7 @@ async def open_feed_bus(
# a max ``tick_throttle`` instantaneous rate. # a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
n.start_soon( cs = await bus.start_task(
uniform_rate_send, uniform_rate_send,
tick_throttle, tick_throttle,
recv, recv,
@ -358,21 +479,24 @@ async def open_feed_bus(
if msg == 'pause': if msg == 'pause':
if sub in subs: if sub in subs:
log.info( log.info(
f'Pausing {fs} feed for {uid}') f'Pausing {fqsn} feed for {uid}')
subs.remove(sub) subs.remove(sub)
elif msg == 'resume': elif msg == 'resume':
if sub not in subs: if sub not in subs:
log.info( log.info(
f'Resuming {fs} feed for {uid}') f'Resuming {fqsn} feed for {uid}')
subs.append(sub) subs.append(sub)
else: else:
raise ValueError(msg) raise ValueError(msg)
finally: finally:
log.info( log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle: if tick_throttle:
n.cancel_scope.cancel() # TODO: a one-cancels-one nursery
# n.cancel_scope.cancel()
cs.cancel()
try: try:
bus._subscribers[symbol].remove(sub) bus._subscribers[symbol].remove(sub)
except ValueError: except ValueError:
@ -385,6 +509,7 @@ async def open_sample_step_stream(
delay_s: int, delay_s: int,
) -> tractor.ReceiveMsgStream: ) -> tractor.ReceiveMsgStream:
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
@ -407,13 +532,15 @@ async def open_sample_step_stream(
@dataclass @dataclass
class Feed: class Feed:
"""A data feed for client-side interaction with far-process# }}} '''
real-time data sources. A data feed for client-side interaction with far-process real-time
data sources.
This is an thin abstraction on top of ``tractor``'s portals for This is an thin abstraction on top of ``tractor``'s portals for
interacting with IPC streams and conducting automatic interacting with IPC streams and storage APIs (shm and time-series
memory buffer orchestration. db).
"""
'''
name: str name: str
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
@ -425,7 +552,7 @@ class Feed:
throttle_rate: Optional[int] = None throttle_rate: Optional[int] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 0 _max_sample_rate: int = 1
# cache of symbol info messages received as first message when # cache of symbol info messages received as first message when
# a stream startsc. # a stream startsc.
@ -460,13 +587,6 @@ class Feed:
await self.stream.send('resume') await self.stream.send('resume')
def sym_to_shm_key(
broker: str,
symbol: str,
) -> str:
return f'{broker}.{symbol}'
@asynccontextmanager @asynccontextmanager
async def install_brokerd_search( async def install_brokerd_search(
@ -527,13 +647,15 @@ async def open_feed(
# no feed for broker exists so maybe spawn a data brokerd # no feed for broker exists so maybe spawn a data brokerd
async with ( async with (
# if no `brokerd` for this backend exists yet we spawn
# and actor for one.
maybe_spawn_brokerd( maybe_spawn_brokerd(
brokername, brokername,
loglevel=loglevel loglevel=loglevel
) as portal, ) as portal,
# (allocate and) connect to any feed bus for this broker
portal.open_context( portal.open_context(
open_feed_bus, open_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=sym,
@ -566,12 +688,10 @@ async def open_feed(
_portal=portal, _portal=portal,
throttle_rate=tick_throttle, throttle_rate=tick_throttle,
) )
ohlc_sample_rates = []
for sym, data in init_msg.items(): for sym, data in init_msg.items():
si = data['symbol_info'] si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = mk_symbol( symbol = mk_symbol(
key=sym, key=sym,
@ -592,9 +712,8 @@ async def open_feed(
assert shm_token == shm.token # sanity assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = 1
# yield feed
try: try:
yield feed yield feed
finally: finally:
@ -627,14 +746,16 @@ async def maybe_open_feed(
'symbols': [sym], 'symbols': [sym],
'loglevel': loglevel, 'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'), 'tick_throttle': kwargs.get('tick_throttle'),
'backpressure': kwargs.get('backpressure'),
# XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=sym, key=sym,
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
print('USING CACHED FEED') log.info(f'Using cached feed for {brokername}.{sym}')
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
# if this feed is likely already in use # if this feed is likely already in use
async with feed.stream.subscribe() as bstream: async with feed.stream.subscribe() as bstream: