First draft aggregate feedz support

Orient shm-flow-arrays around the new idea of a `Flume` which provides
access, mgmt and basic measure of real-time data flow sets (see water
flow management semantics).

- We discard the previous idea of a "init message" which contained all
  the shm attachment info and instead send a startup message full of
  `Flume.to_msg()`s which are symmetrically loaded on the caller actor
  side.

- Create data-flows "entries" for every passed in fqsn such that the consumer gets back
  streams and shm for each, now all wrapped in `Flume` types. For now we
  allocate `brokermod.stream_quotes()` tasks 1-to-1 for each fqsn
  (instead of expecting each backend to do multi-plexing, though we
  might want that eventually) as well a `_FeedsBus._subscriber` entry
  for each. The pause/resume management loop is adjusted to match.
  Previously `Feed`s were  allocated 1-to-1 with each fqsn.

- Make `Feed` a `Struct` subtype instead of a `@dataclass` and move all
  flow specific attrs to the new `Flume`:
  - move `.index_stream()`, `.get_ds_info()` to `Flume`.
  - drop `.receive()`: each fqsn entry will now require knowledge of
    separate streams by feed users.
  - add multi-fqsn tables: `.flumes`, `.streams` which point to the
    appropriate per-symbol entries.

- Async load all `Flume`s from all contexts and all quote streams using
  `tractor.trionics.gather_contexts()` on the client `open_feed()` side.

- Update feeds test to include streaming 2 symbols on the same (binance)
  backend.
agg_feedz
Tyler Goodlet 2022-11-08 16:09:18 -05:00
parent 5bf3cb8e4b
commit 18dc8b08e4
2 changed files with 742 additions and 491 deletions

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,7 @@ Data feed layer APIs, performance, msg throttling.
from pprint import pprint from pprint import pprint
import pytest import pytest
import tractor
import trio import trio
from piker import ( from piker import (
open_piker_runtime, open_piker_runtime,
@ -16,7 +17,7 @@ from piker.data import ShmArray
@pytest.mark.parametrize( @pytest.mark.parametrize(
'fqsns', 'fqsns',
[ [
['btcusdt.binance'] ['btcusdt.binance', 'ethusdt.binance']
], ],
ids=lambda param: f'fqsns={param}', ids=lambda param: f'fqsns={param}',
) )
@ -30,7 +31,13 @@ def test_basic_rt_feed(
''' '''
async def main(): async def main():
async with ( async with (
open_piker_runtime('test_basic_rt_feed'), open_piker_runtime(
'test_basic_rt_feed',
# XXX tractor BUG: this doesn't translate through to the
# ``tractor._state._runtimevars``...
registry_addr=('127.0.0.1', 6666),
debug_mode=True,
),
open_feed( open_feed(
fqsns, fqsns,
loglevel='info', loglevel='info',
@ -42,24 +49,38 @@ def test_basic_rt_feed(
) as feed ) as feed
): ):
# verify shm buffers exist
for fqin in fqsns: for fqin in fqsns:
assert feed.symbols[fqin] flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
ohlcv: ShmArray = feed.rt_shm quote_count: int = 0
hist_ohlcv: ShmArray = feed.hist_shm stream = feed.streams['binance']
async for quotes in stream:
for fqsn, quote in quotes.items():
count: int = 0 # await tractor.breakpoint()
async for quotes in feed.stream: flume = feed.flumes[fqsn]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
# print quote msg, rt and history # print quote msg, rt and history
# buffer values on console. # buffer values on console.
pprint(quotes) rt_row = ohlcv.array[-1]
pprint(ohlcv.array[-1]) hist_row = hist_ohlcv.array[-1]
pprint(hist_ohlcv.array[-1]) # last = quote['last']
if count >= 100: # assert last == rt_row['close']
# assert last == hist_row['close']
pprint(
f'{fqsn}: {quote}\n'
f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n'
)
quote_count += 1
if quote_count >= 100:
break break
count += 1
trio.run(main) trio.run(main)