Rename fqsn -> fqme in feeds tests

pre_overruns_ctxcancelled
Tyler Goodlet 2023-04-11 23:59:50 -04:00
parent 1bd4fd80f8
commit f30d866710
1 changed files with 20 additions and 19 deletions

View File

@ -13,13 +13,14 @@ from piker.data import (
ShmArray,
open_feed,
)
from piker.data.flows import Flume
from piker.accounting._mktinfo import (
unpack_fqsn,
)
@pytest.mark.parametrize(
'fqsns',
'fqmes',
[
# binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
@ -30,20 +31,20 @@ from piker.accounting._mktinfo import (
# binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
],
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
)
def test_multi_fqsn_feed(
open_test_pikerd: AsyncContextManager,
fqsns: set[str],
fqmes: set[str],
loglevel: str,
ci_env: bool
):
'''
Start a real-time data feed for provided fqsn and pull
Start a real-time data feed for provided fqme and pull
a few quotes then simply shut down.
'''
max_quotes, fqsns, run_in_ci = fqsns
max_quotes, fqmes, run_in_ci = fqmes
if (
ci_env
@ -52,15 +53,15 @@ def test_multi_fqsn_feed(
pytest.skip('Skipping CI disabled test due to feed restrictions')
brokers = set()
for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqsn)
for fqme in fqmes:
brokername, key, suffix = unpack_fqsn(fqme)
brokers.add(brokername)
async def main():
async with (
open_test_pikerd(),
open_feed(
fqsns,
fqmes,
loglevel=loglevel,
# TODO: ensure throttle rate is applied
@ -71,20 +72,20 @@ def test_multi_fqsn_feed(
) as feed
):
# verify shm buffers exist
for fqin in fqsns:
for fqin in fqmes:
flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
async with feed.open_multi_stream(brokers) as stream:
# pull the first startup quotes, one for each fqsn, and
# pull the first startup quotes, one for each fqme, and
# ensure they match each flume's startup quote value.
fqsns_copy = fqsns.copy()
fqsns_copy = fqmes.copy()
with trio.fail_after(0.5):
for _ in range(1):
first_quotes = await stream.receive()
for fqsn, quote in first_quotes.items():
for fqme, quote in first_quotes.items():
# XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown
@ -92,18 +93,18 @@ def test_multi_fqsn_feed(
# <tractorbugurl>
# assert 0
fqsns_copy.remove(fqsn)
flume = feed.flumes[fqsn]
fqsns_copy.remove(fqme)
flume: Flume = feed.flumes[fqme]
assert quote['last'] == flume.first_quote['last']
cntr = Counter()
with trio.fail_after(6):
async for quotes in stream:
for fqsn, quote in quotes.items():
cntr[fqsn] += 1
for fqme, quote in quotes.items():
cntr[fqme] += 1
# await tractor.breakpoint()
flume = feed.flumes[fqsn]
flume = feed.flumes[fqme]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
@ -116,7 +117,7 @@ def test_multi_fqsn_feed(
# assert last == rt_row['close']
# assert last == hist_row['close']
pprint(
f'{fqsn}: {quote}\n'
f'{fqme}: {quote}\n'
f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n'
)
@ -124,6 +125,6 @@ def test_multi_fqsn_feed(
if cntr.total() >= max_quotes:
break
assert set(cntr.keys()) == fqsns
assert set(cntr.keys()) == fqmes
trio.run(main)