Further feed syncing fixes wrt to `Flumes`

Sync per-symbol sampler loop start to subscription registers such that
the loop can't start until the consumer's stream subscription is added;
the task-sync uses a `trio.Event`. This patch also drops a ton of
commented cruft.

Further adjustments needed to get parity with prior functionality:
- pass init msg 'symbol_info' field to the `Symbol.broker_info: dict`.
- ensure the `_FeedsBus._subscriptions` table uses the broker specific
  (without brokername suffix) as keys for lookup so that the sampler
  loop doesn't have to append in the brokername as a suffix.
- ensure the `open_feed_bus()` flumes-table-msg returned sent by
  `tractor.Context.started()` uses the `.to_msg()` form of all flume
  structs.
- ensure `maybe_open_feed()` uses `tractor.MsgStream.subscribe()` on all
  `Flume.stream`s on cache hits using the
  `tractor.trionics.gather_contexts()` helper.
agg_feedz
Tyler Goodlet 2022-11-09 18:57:15 -05:00
parent 25bfe6f035
commit bb6452b969
2 changed files with 103 additions and 161 deletions

View File

@ -22,10 +22,6 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
# from dataclasses import (
# dataclass,
# field,
# )
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
@ -36,7 +32,6 @@ from typing import (
Callable, Callable,
Optional, Optional,
Awaitable, Awaitable,
Sequence,
TYPE_CHECKING, TYPE_CHECKING,
Union, Union,
) )
@ -243,7 +238,7 @@ def diff_history(
time = array['time'] time = array['time']
to_push = array[time >= last_tsdb_dt.timestamp()] to_push = array[time >= last_tsdb_dt.timestamp()]
log.info( log.debug(
f'Pushing partial frame {to_push.size} to shm' f'Pushing partial frame {to_push.size} to shm'
) )
@ -359,7 +354,7 @@ async def start_backfill(
# last retrieved start dt to the next request as # last retrieved start dt to the next request as
# it's end dt. # it's end dt.
while start_dt > last_tsdb_dt: while start_dt > last_tsdb_dt:
log.info( log.debug(
f'Requesting {step_size_s}s frame ending in {start_dt}' f'Requesting {step_size_s}s frame ending in {start_dt}'
) )
@ -721,13 +716,18 @@ async def manage_history(
''' '''
from tractor._state import _runtime_vars # TODO: is there a way to make each shm file key
port = _runtime_vars['_root_mailbox'][1] # actor-tree-discovery-addr unique so we avoid collisions
# when doing tests which also allocate shms for certain instruments
# that may be in use on the system by some other running daemons?
# from tractor._state import _runtime_vars
# port = _runtime_vars['_root_mailbox'][1]
# (maybe) allocate shm array for this broker/symbol which will # (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing. # be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array( hist_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_hist', #_p{port}', # key=f'{fqsn}_hist_p{port}',
key=f'{fqsn}_hist',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -744,7 +744,8 @@ async def manage_history(
) )
rt_shm, opened = maybe_open_shm_array( rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt', #_p{port}', # key=f'{fqsn}_rt_p{port}',
key=f'{fqsn}_rt',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -874,6 +875,7 @@ class Flume(Struct):
izero_hist: int = 0 izero_hist: int = 0
izero_rt: int = 0 izero_rt: int = 0
throttle_rate: int | None = None throttle_rate: int | None = None
feed: Feed | None = None
@property @property
def rt_shm(self) -> ShmArray: def rt_shm(self) -> ShmArray:
@ -907,12 +909,15 @@ class Flume(Struct):
) -> AsyncIterator[int]: ) -> AsyncIterator[int]:
if not self.feed:
raise RuntimeError('This flume is not part of any ``Feed``?')
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
async with maybe_open_context( async with maybe_open_context(
acm_func=partial( acm_func=partial(
self.portal.open_context, self.feed.portal.open_context,
iter_ohlc_periods, iter_ohlc_periods,
), ),
kwargs={'delay_s': delay_s}, kwargs={'delay_s': delay_s},
@ -964,11 +969,12 @@ class Flume(Struct):
def to_msg(self) -> dict: def to_msg(self) -> dict:
msg = self.to_dict() msg = self.to_dict()
msg['symbol'] = msg['symbol'].to_dict() msg['symbol'] = msg['symbol'].to_dict()
# can't serialize the stream object, it's
# expected you'll have a ref to it since # can't serialize the stream or feed objects, it's expected
# this msg should be rxed on a stream on # you'll have a ref to it since this msg should be rxed on
# whatever far end IPC.. # a stream on whatever far end IPC..
msg.pop('stream') msg.pop('stream')
msg.pop('feed')
return msg return msg
@classmethod @classmethod
@ -982,6 +988,7 @@ class Flume(Struct):
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
sub_registered: trio.Event,
brokername: str, brokername: str,
symstr: str, symstr: str,
@ -1064,8 +1071,9 @@ async def allocate_persistent_feed(
symbol = Symbol.from_fqsn( symbol = Symbol.from_fqsn(
fqsn=fqsn, fqsn=fqsn,
info=msg, info=msg['symbol_info'],
) )
assert symbol.type_key
# HISTORY storage, run 2 tasks: # HISTORY storage, run 2 tasks:
# - a history loader / maintainer # - a history loader / maintainer
@ -1090,46 +1098,16 @@ async def allocate_persistent_feed(
feed_is_live, feed_is_live,
) )
# we hand an IPC-msg compatible shm token to the caller so it
# can read directly from the memory which will be written by
# this task.
# msg['hist_shm_token'] = hist_shm.token
# msg['izero_hist'] = izero_hist
# msg['izero_rt'] = izero_rt
# msg['rt_shm_token'] = rt_shm.token
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
# init_msg[fqsn] = msg
# init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
# for most purposes.
# pass OHLC sample rate in seconds (be sure to use python int type)
# init_msg[symbol]['sample_rate'] = 1 #int(delay_s)
# yield back control to starting nursery once we receive either # yield back control to starting nursery once we receive either
# some history or a real-time quote. # some history or a real-time quote.
log.info(f'waiting on history to load: {fqsn}') log.info(f'waiting on history to load: {fqsn}')
await some_data_ready.wait() await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol
# acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
# generic_first_quotes = {
# acceptable_not_fqsn_with_broker_suffix: first_quote,
# fqsn: first_quote,
# }
flume = Flume( flume = Flume(
symbol=symbol, symbol=symbol,
_hist_shm_token=hist_shm.token, _hist_shm_token=hist_shm.token,
_rt_shm_token=rt_shm.token, _rt_shm_token=rt_shm.token,
first_quote=first_quote, first_quote=first_quote,
# stream=stream,
izero_hist=izero_hist, izero_hist=izero_hist,
izero_rt=izero_rt, izero_rt=izero_rt,
# throttle_rate=tick_throttle, # throttle_rate=tick_throttle,
@ -1138,9 +1116,6 @@ async def allocate_persistent_feed(
# for ambiguous names we simply apply the retreived # for ambiguous names we simply apply the retreived
# feed to that name (for now). # feed to that name (for now).
bus.feeds[symstr] = bus.feeds[bfqsn] = flume bus.feeds[symstr] = bus.feeds[bfqsn] = flume
# init_msg,
# generic_first_quotes,
# )
# insert 1s ohlc into the increment buffer set # insert 1s ohlc into the increment buffer set
# to update and shift every second # to update and shift every second
@ -1184,9 +1159,14 @@ async def allocate_persistent_feed(
rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1]
rt_shm.array['volume'][-2] = 0 rt_shm.array['volume'][-2] = 0
# wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop.
await sub_registered.wait()
# start sample loop and shm incrementer task for OHLC style sampling # start sample loop and shm incrementer task for OHLC style sampling
# at the above registered step periods. # at the above registered step periods.
try: try:
log.info(f'Starting sampler task for {fqsn}')
await sample_and_broadcast( await sample_and_broadcast(
bus, bus,
rt_shm, rt_shm,
@ -1235,14 +1215,16 @@ async def open_feed_bus(
assert brokername in servicename assert brokername in servicename
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
sub_registered = trio.Event()
flumes: dict[str, Flume] = {} flumes: dict[str, Flume] = {}
for symbol in symbols: for symbol in symbols:
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery
entry = bus.feeds.get(symbol) flume = bus.feeds.get(symbol)
if entry is None: if flume is None:
# allocate a new actor-local stream bus which # allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime. # will persist for this `brokerd`'s service lifetime.
async with bus.task_lock: async with bus.task_lock:
@ -1251,6 +1233,7 @@ async def open_feed_bus(
allocate_persistent_feed, allocate_persistent_feed,
bus=bus, bus=bus,
sub_registered=sub_registered,
brokername=brokername, brokername=brokername,
# here we pass through the selected symbol in native # here we pass through the selected symbol in native
# "format" (i.e. upper vs. lowercase depending on # "format" (i.e. upper vs. lowercase depending on
@ -1263,24 +1246,17 @@ async def open_feed_bus(
# TODO: we can remove this? # TODO: we can remove this?
# assert isinstance(bus.feeds[symbol], tuple) # assert isinstance(bus.feeds[symbol], tuple)
# XXX: ``first_quotes`` may be outdated here if this is secondary # XXX: ``.first_quote`` may be outdated here if this is secondary
# subscriber # subscriber
# init_msg, first_quotes = bus.feeds[symbol]
flume = bus.feeds[symbol] flume = bus.feeds[symbol]
# assert bus.feeds[bfqsn] is flume sym = flume.symbol
bfqsn = sym.key
fqsn = sym.fqsn # true fqsn
assert bfqsn in fqsn and brokername in fqsn
# msg = init_msg[symbol] if sym.suffix:
# bfqsn = msg['fqsn'].lower() bfqsn = fqsn.rstrip(f'.{brokername}')
bfqsn = flume.symbol.key log.warning(f'{brokername} expanded symbol {symbol} -> {bfqsn}')
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn == flume.symbol.fqsn
# assert fqsn in first_quotes
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
# bsym = symbol + f'.{brokername}'
# assert bsym in first_quotes
# pack for ``.started()`` sync msg # pack for ``.started()`` sync msg
flumes[fqsn] = flume flumes[fqsn] = flume
@ -1290,13 +1266,12 @@ async def open_feed_bus(
# expected to append it's own name to the fqsn, so we filter # expected to append it's own name to the fqsn, so we filter
# on keys which *do not* include that name (e.g .ib) . # on keys which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bfqsn, []) bus._subscribers.setdefault(bfqsn, [])
# await tractor.breakpoint()
# send this even to subscribers to existing feed? # sync feed subscribers with flume handles
# deliver initial info message a first quote asap await ctx.started(
await ctx.started(flumes) {fqsn: flume.to_msg() for fqsn, flume in flumes.items()}
# init_msg, )
# first_quotes,
# ))
if not start_stream: if not start_stream:
log.warning(f'Not opening real-time stream for {fqsn}') log.warning(f'Not opening real-time stream for {fqsn}')
@ -1352,10 +1327,13 @@ async def open_feed_bus(
# maybe use the current task-id to key the sub list that's # maybe use the current task-id to key the sub list that's
# added / removed? Or maybe we can add a general # added / removed? Or maybe we can add a general
# pause-resume by sub-key api? # pause-resume by sub-key api?
bfqsn = fqsn.rstrip(f'.{brokername}')
bus_subs = bus._subscribers[bfqsn] bus_subs = bus._subscribers[bfqsn]
bus_subs.append(sub) bus_subs.append(sub)
local_subs.append(sub) local_subs.append(sub)
sub_registered.set()
try: try:
uid = ctx.chan.uid uid = ctx.chan.uid
@ -1396,7 +1374,6 @@ async def open_feed_bus(
log.warning(f'{sub} for {symbol} was already removed?') log.warning(f'{sub} for {symbol} was already removed?')
# @dataclass
class Feed(Struct): class Feed(Struct):
''' '''
A per-provider API for client-side consumption from real-time data A per-provider API for client-side consumption from real-time data
@ -1410,31 +1387,17 @@ class Feed(Struct):
similarly allocated shm arrays. similarly allocated shm arrays.
''' '''
# name: str
# hist_shm: ShmArray
# rt_shm: ShmArray
mod: ModuleType mod: ModuleType
_portal: tractor.Portal _portal: tractor.Portal
# symbol names to first quote dicts
# shms: dict[str, tuple[ShmArray, Shmarray]]
flumes: dict[str, Flume] = {} flumes: dict[str, Flume] = {}
# first_quotes: dict[str, dict] = {}
streams: dict[ streams: dict[
str, str,
trio.abc.ReceiveChannel[dict[str, Any]], trio.abc.ReceiveChannel[dict[str, Any]],
] = {} ] = {}
status: dict[str, Any] status: dict[str, Any]
# izero_hist: int = 0
# izero_rt: int = 0
# throttle_rate: Optional[int] = None
_max_sample_rate: int = 1 _max_sample_rate: int = 1
# cache of symbol info messages received as first message when
# a stream startsc.
# symbols: dict[str, Symbol] = {}
@property @property
def portal(self) -> tractor.Portal: def portal(self) -> tractor.Portal:
return self._portal return self._portal
@ -1496,8 +1459,6 @@ async def open_feed(
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
''' '''
# fqsn = fqsns[0].lower()
providers: dict[ModuleType, list[str]] = {} providers: dict[ModuleType, list[str]] = {}
for fqsn in fqsns: for fqsn in fqsns:
@ -1531,7 +1492,7 @@ async def open_feed(
brokerd_ctxs, brokerd_ctxs,
) as portals: ) as portals:
bus_ctxs = [] bus_ctxs: list[AsyncContextManager] = []
for ( for (
portal, portal,
(brokermod, bfqsns), (brokermod, bfqsns),
@ -1551,8 +1512,9 @@ async def open_feed(
'actor_name': feed.portal.channel.uid[0], 'actor_name': feed.portal.channel.uid[0],
'host': host, 'host': host,
'port': port, 'port': port,
# 'shm': f'{humanize(feed.hist_shm._shm.size)}', 'hist_shm': 'NA',
# 'throttle_rate': feed.throttle_rate, 'rt_shm': 'NA',
'throttle_rate': tick_throttle,
}) })
# feed.status.update(init_msg.pop('status', {})) # feed.status.update(init_msg.pop('status', {}))
@ -1571,6 +1533,7 @@ async def open_feed(
async with ( async with (
gather_contexts(bus_ctxs) as ctxs, gather_contexts(bus_ctxs) as ctxs,
): ):
remote_scopes = []
for ( for (
(ctx, flumes_msg_dict), (ctx, flumes_msg_dict),
(brokermod, bfqsns), (brokermod, bfqsns),
@ -1581,20 +1544,7 @@ async def open_feed(
flume = Flume.from_msg(flume_msg) flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn assert flume.symbol.fqsn == fqsn
feed.flumes[fqsn] = flume feed.flumes[fqsn] = flume
flume.feed = feed
# TODO: this is ugly but eventually we could
# in theory do all this "tabling" of flumes on
# the brokerd-side, in which case we'll likely
# want to make each flume IPC-msg-native?
# bfqsn = list(init_msgs)[0]
# init = init_msg[bfqsn]
# si = data['symbol_info']
# fqsn = data['fqsn'] + f'.{brokername}'
# symbol = Symbol.from_fqsn(
# fqsn,
# info=si,
# )
# attach and cache shm handles # attach and cache shm handles
rt_shm = flume.rt_shm rt_shm = flume.rt_shm
@ -1602,11 +1552,18 @@ async def open_feed(
hist_shm = flume.hist_shm hist_shm = flume.hist_shm
assert hist_shm assert hist_shm
feed.status['hist_shm'] = (
f'{humanize(hist_shm._shm.size)}'
)
feed.status['rt_shm'] = f'{humanize(rt_shm._shm.size)}'
remote_scopes.append(ctx)
stream_ctxs.append( stream_ctxs.append(
ctx.open_stream( ctx.open_stream(
# XXX: be explicit about stream backpressure since we should # XXX: be explicit about stream backpressure
# **never** overrun on feeds being too fast, which will # since we should **never** overrun on feeds
# pretty much always happen with HFT XD # being too fast, which will pretty much
# always happen with HFT XD
backpressure=backpressure, backpressure=backpressure,
) )
) )
@ -1619,49 +1576,15 @@ async def open_feed(
(brokermod, bfqsns), (brokermod, bfqsns),
) in zip(streams, providers.items()): ) in zip(streams, providers.items()):
for bfqsn in bfqsns: # for bfqsn in bfqsns:
fqsn = '.'.join((bfqsn, brokermod.name)) for fqsn in flumes_msg_dict:
# apply common rt steam to each flume # apply common rt steam to each flume
# (normally one per broker) # (normally one per broker)
feed.flumes[fqsn].stream = stream feed.flumes[fqsn].stream = stream
feed.streams[brokermod.name] = stream feed.streams[brokermod.name] = stream
try:
yield feed yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()
# we can only read from shm
# hist_shm = attach_shm_array(
# token=init['hist_shm_token'],
# readonly=True,
# )
# rt_shm = attach_shm_array(
# token=init['rt_shm_token'],
# readonly=True,
# )
# for sym, data in init_msg.items():
# symbol.broker_info[brokername] = si
# feed.symbols[fqsn] = symbol
# feed.symbols[f'{sym}.{brokername}'] = symbol
# cast shm dtype to list... can't member why we need this
# for shm_key, shm in [
# ('rt_shm_token', rt_shm),
# ('hist_shm_token', hist_shm),
# ]:
# shm_token = flume[shm_key]
# # XXX: msgspec won't relay through the tuples XD
# shm_token['dtype_descr'] = tuple(
# map(tuple, shm_token['dtype_descr']))
# assert shm_token == shm.token # sanity
# assert fqsn in first_quotes
@acm @acm
@ -1703,7 +1626,13 @@ async def maybe_open_feed(
log.info(f'Using cached feed for {fqsn}') log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
# if this feed is likely already in use # if this feed is likely already in use
async with feed.stream.subscribe() as bstream:
yield feed, bstream async with gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
flume.stream = bstream
yield feed
else: else:
yield feed, feed.stream yield feed

View File

@ -2,10 +2,11 @@
Data feed layer APIs, performance, msg throttling. Data feed layer APIs, performance, msg throttling.
''' '''
from collections import Counter
from pprint import pprint from pprint import pprint
import pytest import pytest
# import tractor import tractor
import trio import trio
from piker import ( from piker import (
open_piker_runtime, open_piker_runtime,
@ -17,12 +18,12 @@ from piker.data import ShmArray
@pytest.mark.parametrize( @pytest.mark.parametrize(
'fqsns', 'fqsns',
[ [
['btcusdt.binance', 'ethusdt.binance'] {'btcusdt.binance', 'ethusdt.binance'}
], ],
ids=lambda param: f'fqsns={param}', ids=lambda param: f'fqsns={param}',
) )
def test_basic_rt_feed( def test_basic_rt_feed(
fqsns: list[str], fqsns: set[str],
): ):
''' '''
Start a real-time data feed for provided fqsn and pull Start a real-time data feed for provided fqsn and pull
@ -33,11 +34,12 @@ def test_basic_rt_feed(
async with ( async with (
open_piker_runtime( open_piker_runtime(
'test_basic_rt_feed', 'test_basic_rt_feed',
# XXX tractor BUG: this doesn't translate through to the # XXX tractor BUG: this doesn't translate through to the
# ``tractor._state._runtimevars``... # ``tractor._state._runtimevars``...
registry_addr=('127.0.0.1', 6666), registry_addr=('127.0.0.1', 6666),
debug_mode=True,
loglevel='runtime', # debug_mode=True,
), ),
open_feed( open_feed(
fqsns, fqsns,
@ -58,20 +60,29 @@ def test_basic_rt_feed(
# stream some ticks and ensure we see data from both symbol # stream some ticks and ensure we see data from both symbol
# subscriptions. # subscriptions.
quote_count: int = 0
stream = feed.streams['binance'] stream = feed.streams['binance']
# pull the first couple startup quotes and ensure # pull the first startup quotes, one for each fqsn, and
# they match the history buffer last entries. # ensure they match each flume's startup quote value.
fqsns_copy = fqsns.copy()
for _ in range(1): for _ in range(1):
first_quotes = await stream.receive() first_quotes = await stream.receive()
for fqsn, quote in first_quotes.items(): for fqsn, quote in first_quotes.items():
assert fqsn in fqsns
# XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown
# excgroup if we don't have the fix from
# <tractorbugurl>
# assert 0
fqsns_copy.remove(fqsn)
flume = feed.flumes[fqsn] flume = feed.flumes[fqsn]
assert quote['last'] == flume.first_quote['last'] assert quote['last'] == flume.first_quote['last']
cntr = Counter()
async for quotes in stream: async for quotes in stream:
for fqsn, quote in quotes.items(): for fqsn, quote in quotes.items():
cntr[fqsn] += 1
# await tractor.breakpoint() # await tractor.breakpoint()
flume = feed.flumes[fqsn] flume = feed.flumes[fqsn]
@ -91,9 +102,11 @@ def test_basic_rt_feed(
f'rt_ohlc: {rt_row}\n' f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n' f'hist_ohlc: {hist_row}\n'
) )
quote_count += 1
if quote_count >= 100: if cntr.total() >= 100:
break break
# await tractor.breakpoint()
assert set(cntr.keys()) == fqsns
trio.run(main) trio.run(main)