piker/tests/test_feeds.py

129 lines
4.0 KiB
Python

'''
Data feed layer APIs, performance, msg throttling.
'''
from collections import Counter
from pprint import pprint
import pytest
# import tractor
import trio
from piker import (
open_piker_runtime,
open_feed,
)
from piker.data import ShmArray
from piker.data._source import (
unpack_fqsn,
)
@pytest.mark.parametrize(
'fqsns',
[
# binance
(100, {'btcusdt.binance', 'ethusdt.binance'}),
# kraken
(20, {'xbteur.kraken', 'xbtusd.kraken'}),
# binance + kraken
(200, {'btcusdt.binance', 'xbtusd.kraken'}),
],
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
)
def test_multi_fqsn_feed(
fqsns: set[str],
):
'''
Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down.
'''
max_quotes, fqsns = fqsns
brokers = set()
for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqsn)
brokers.add(brokername)
async def main():
async with (
open_piker_runtime(
'test_basic_rt_feed',
# XXX tractor BUG: this doesn't translate through to the
# ``tractor._state._runtimevars``...
# registry_addr=('127.0.0.1', 6666),
debug_mode=True,
),
open_feed(
fqsns,
loglevel='info',
# TODO: ensure throttle rate is applied
# limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
# tick_throttle=_quote_throttle_rate,
) as feed
):
# verify shm buffers exist
for fqin in fqsns:
flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
async with feed.open_multi_stream(brokers) as stream:
# pull the first startup quotes, one for each fqsn, and
# ensure they match each flume's startup quote value.
fqsns_copy = fqsns.copy()
with trio.fail_after(0.5):
for _ in range(1):
first_quotes = await stream.receive()
for fqsn, quote in first_quotes.items():
# XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown
# excgroup if we don't have the fix from
# <tractorbugurl>
# assert 0
fqsns_copy.remove(fqsn)
flume = feed.flumes[fqsn]
assert quote['last'] == flume.first_quote['last']
cntr = Counter()
with trio.fail_after(3):
async for quotes in stream:
for fqsn, quote in quotes.items():
cntr[fqsn] += 1
# await tractor.breakpoint()
flume = feed.flumes[fqsn]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
# print quote msg, rt and history
# buffer values on console.
rt_row = ohlcv.array[-1]
hist_row = hist_ohlcv.array[-1]
# last = quote['last']
# assert last == rt_row['close']
# assert last == hist_row['close']
pprint(
f'{fqsn}: {quote}\n'
f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n'
)
if cntr.total() >= max_quotes:
break
assert set(cntr.keys()) == fqsns
trio.run(main)