From e348968113a6b517c62afba8aaac521341f74c5d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Nov 2022 17:27:02 -0500 Subject: [PATCH] Add multi-broker streaming test using both `binance` and `kraken` --- tests/test_feeds.py | 99 +++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 49 deletions(-) diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 5bb9a510..b471c319 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -6,7 +6,7 @@ from collections import Counter from pprint import pprint import pytest -import tractor +# import tractor import trio from piker import ( open_piker_runtime, @@ -21,12 +21,18 @@ from piker.data._source import ( @pytest.mark.parametrize( 'fqsns', [ + # binance (100, {'btcusdt.binance', 'ethusdt.binance'}), - (50, {'xbteur.kraken', 'xbtusd.kraken'}), + + # kraken + (20, {'xbteur.kraken', 'xbtusd.kraken'}), + + # binance + kraken + (200, {'btcusdt.binance', 'xbtusd.kraken'}), ], ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}', ) -def test_basic_rt_feed( +def test_multi_fqsn_feed( fqsns: set[str], ): ''' @@ -41,10 +47,6 @@ def test_basic_rt_feed( brokername, key, suffix = unpack_fqsn(fqsn) brokers.add(brokername) - # NOTE: we only have single broker-backed multi-symbol streams - # currently. - assert len(brokers) == 1 - async def main(): async with ( open_piker_runtime( @@ -52,9 +54,9 @@ def test_basic_rt_feed( # XXX tractor BUG: this doesn't translate through to the # ``tractor._state._runtimevars``... - registry_addr=('127.0.0.1', 6666), + # registry_addr=('127.0.0.1', 6666), - # debug_mode=True, + debug_mode=True, ), open_feed( fqsns, @@ -73,55 +75,54 @@ def test_basic_rt_feed( ohlcv: ShmArray = flume.rt_shm hist_ohlcv: ShmArray = flume.hist_shm - # stream some ticks and ensure we see data from both symbol - # subscriptions. - stream = feed.streams[brokername] + async with feed.open_multi_stream(brokers) as stream: - # pull the first startup quotes, one for each fqsn, and - # ensure they match each flume's startup quote value. - fqsns_copy = fqsns.copy() - for _ in range(1): - first_quotes = await stream.receive() - for fqsn, quote in first_quotes.items(): + # pull the first startup quotes, one for each fqsn, and + # ensure they match each flume's startup quote value. + fqsns_copy = fqsns.copy() + with trio.fail_after(0.5): + for _ in range(1): + first_quotes = await stream.receive() + for fqsn, quote in first_quotes.items(): - # XXX: TODO: WTF apparently this error will get - # supressed and only show up in the teardown - # excgroup if we don't have the fix from - # - # assert 0 + # XXX: TODO: WTF apparently this error will get + # supressed and only show up in the teardown + # excgroup if we don't have the fix from + # + # assert 0 - fqsns_copy.remove(fqsn) - flume = feed.flumes[fqsn] - assert quote['last'] == flume.first_quote['last'] + fqsns_copy.remove(fqsn) + flume = feed.flumes[fqsn] + assert quote['last'] == flume.first_quote['last'] - cntr = Counter() - async for quotes in stream: - for fqsn, quote in quotes.items(): - cntr[fqsn] += 1 + cntr = Counter() + with trio.fail_after(3): + async for quotes in stream: + for fqsn, quote in quotes.items(): + cntr[fqsn] += 1 - # await tractor.breakpoint() - flume = feed.flumes[fqsn] - ohlcv: ShmArray = flume.rt_shm - hist_ohlcv: ShmArray = flume.hist_shm + # await tractor.breakpoint() + flume = feed.flumes[fqsn] + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm - # print quote msg, rt and history - # buffer values on console. - rt_row = ohlcv.array[-1] - hist_row = hist_ohlcv.array[-1] - # last = quote['last'] + # print quote msg, rt and history + # buffer values on console. + rt_row = ohlcv.array[-1] + hist_row = hist_ohlcv.array[-1] + # last = quote['last'] - # assert last == rt_row['close'] - # assert last == hist_row['close'] - pprint( - f'{fqsn}: {quote}\n' - f'rt_ohlc: {rt_row}\n' - f'hist_ohlc: {hist_row}\n' - ) + # assert last == rt_row['close'] + # assert last == hist_row['close'] + pprint( + f'{fqsn}: {quote}\n' + f'rt_ohlc: {rt_row}\n' + f'hist_ohlc: {hist_row}\n' + ) - if cntr.total() >= 100: - break + if cntr.total() >= max_quotes: + break - # await tractor.breakpoint() assert set(cntr.keys()) == fqsns trio.run(main)