Rename fqsn -> fqme in feeds tests
							parent
							
								
									48cae3c178
								
							
						
					
					
						commit
						b810de3089
					
				| 
						 | 
					@ -13,13 +13,14 @@ from piker.data import (
 | 
				
			||||||
    ShmArray,
 | 
					    ShmArray,
 | 
				
			||||||
    open_feed,
 | 
					    open_feed,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					from piker.data.flows import Flume
 | 
				
			||||||
from piker.accounting._mktinfo import (
 | 
					from piker.accounting._mktinfo import (
 | 
				
			||||||
    unpack_fqsn,
 | 
					    unpack_fqsn,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@pytest.mark.parametrize(
 | 
					@pytest.mark.parametrize(
 | 
				
			||||||
    'fqsns',
 | 
					    'fqmes',
 | 
				
			||||||
    [
 | 
					    [
 | 
				
			||||||
        # binance
 | 
					        # binance
 | 
				
			||||||
        (100, {'btcusdt.binance', 'ethusdt.binance'}, False),
 | 
					        (100, {'btcusdt.binance', 'ethusdt.binance'}, False),
 | 
				
			||||||
| 
						 | 
					@ -30,20 +31,20 @@ from piker.accounting._mktinfo import (
 | 
				
			||||||
        # binance + kraken
 | 
					        # binance + kraken
 | 
				
			||||||
        (100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
 | 
					        (100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
    ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
 | 
					    ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
def test_multi_fqsn_feed(
 | 
					def test_multi_fqsn_feed(
 | 
				
			||||||
    open_test_pikerd: AsyncContextManager,
 | 
					    open_test_pikerd: AsyncContextManager,
 | 
				
			||||||
    fqsns: set[str],
 | 
					    fqmes: set[str],
 | 
				
			||||||
    loglevel: str,
 | 
					    loglevel: str,
 | 
				
			||||||
    ci_env: bool
 | 
					    ci_env: bool
 | 
				
			||||||
):
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Start a real-time data feed for provided fqsn and pull
 | 
					    Start a real-time data feed for provided fqme and pull
 | 
				
			||||||
    a few quotes then simply shut down.
 | 
					    a few quotes then simply shut down.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    max_quotes, fqsns, run_in_ci = fqsns
 | 
					    max_quotes, fqmes, run_in_ci = fqmes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (
 | 
					    if (
 | 
				
			||||||
        ci_env
 | 
					        ci_env
 | 
				
			||||||
| 
						 | 
					@ -52,15 +53,15 @@ def test_multi_fqsn_feed(
 | 
				
			||||||
        pytest.skip('Skipping CI disabled test due to feed restrictions')
 | 
					        pytest.skip('Skipping CI disabled test due to feed restrictions')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    brokers = set()
 | 
					    brokers = set()
 | 
				
			||||||
    for fqsn in fqsns:
 | 
					    for fqme in fqmes:
 | 
				
			||||||
        brokername, key, suffix = unpack_fqsn(fqsn)
 | 
					        brokername, key, suffix = unpack_fqsn(fqme)
 | 
				
			||||||
        brokers.add(brokername)
 | 
					        brokers.add(brokername)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def main():
 | 
					    async def main():
 | 
				
			||||||
        async with (
 | 
					        async with (
 | 
				
			||||||
            open_test_pikerd(),
 | 
					            open_test_pikerd(),
 | 
				
			||||||
            open_feed(
 | 
					            open_feed(
 | 
				
			||||||
                fqsns,
 | 
					                fqmes,
 | 
				
			||||||
                loglevel=loglevel,
 | 
					                loglevel=loglevel,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # TODO: ensure throttle rate is applied
 | 
					                # TODO: ensure throttle rate is applied
 | 
				
			||||||
| 
						 | 
					@ -71,20 +72,20 @@ def test_multi_fqsn_feed(
 | 
				
			||||||
            ) as feed
 | 
					            ) as feed
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            # verify shm buffers exist
 | 
					            # verify shm buffers exist
 | 
				
			||||||
            for fqin in fqsns:
 | 
					            for fqin in fqmes:
 | 
				
			||||||
                flume = feed.flumes[fqin]
 | 
					                flume = feed.flumes[fqin]
 | 
				
			||||||
                ohlcv: ShmArray = flume.rt_shm
 | 
					                ohlcv: ShmArray = flume.rt_shm
 | 
				
			||||||
                hist_ohlcv: ShmArray = flume.hist_shm
 | 
					                hist_ohlcv: ShmArray = flume.hist_shm
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            async with feed.open_multi_stream(brokers) as stream:
 | 
					            async with feed.open_multi_stream(brokers) as stream:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # pull the first startup quotes, one for each fqsn, and
 | 
					                # pull the first startup quotes, one for each fqme, and
 | 
				
			||||||
                # ensure they match each flume's startup quote value.
 | 
					                # ensure they match each flume's startup quote value.
 | 
				
			||||||
                fqsns_copy = fqsns.copy()
 | 
					                fqsns_copy = fqmes.copy()
 | 
				
			||||||
                with trio.fail_after(0.5):
 | 
					                with trio.fail_after(0.5):
 | 
				
			||||||
                    for _ in range(1):
 | 
					                    for _ in range(1):
 | 
				
			||||||
                        first_quotes = await stream.receive()
 | 
					                        first_quotes = await stream.receive()
 | 
				
			||||||
                        for fqsn, quote in first_quotes.items():
 | 
					                        for fqme, quote in first_quotes.items():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # XXX: TODO: WTF apparently this error will get
 | 
					                            # XXX: TODO: WTF apparently this error will get
 | 
				
			||||||
                            # supressed and only show up in the teardown
 | 
					                            # supressed and only show up in the teardown
 | 
				
			||||||
| 
						 | 
					@ -92,18 +93,18 @@ def test_multi_fqsn_feed(
 | 
				
			||||||
                            # <tractorbugurl>
 | 
					                            # <tractorbugurl>
 | 
				
			||||||
                            # assert 0
 | 
					                            # assert 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            fqsns_copy.remove(fqsn)
 | 
					                            fqsns_copy.remove(fqme)
 | 
				
			||||||
                            flume = feed.flumes[fqsn]
 | 
					                            flume: Flume = feed.flumes[fqme]
 | 
				
			||||||
                            assert quote['last'] == flume.first_quote['last']
 | 
					                            assert quote['last'] == flume.first_quote['last']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                cntr = Counter()
 | 
					                cntr = Counter()
 | 
				
			||||||
                with trio.fail_after(6):
 | 
					                with trio.fail_after(6):
 | 
				
			||||||
                    async for quotes in stream:
 | 
					                    async for quotes in stream:
 | 
				
			||||||
                        for fqsn, quote in quotes.items():
 | 
					                        for fqme, quote in quotes.items():
 | 
				
			||||||
                            cntr[fqsn] += 1
 | 
					                            cntr[fqme] += 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # await tractor.breakpoint()
 | 
					                            # await tractor.breakpoint()
 | 
				
			||||||
                            flume = feed.flumes[fqsn]
 | 
					                            flume = feed.flumes[fqme]
 | 
				
			||||||
                            ohlcv: ShmArray = flume.rt_shm
 | 
					                            ohlcv: ShmArray = flume.rt_shm
 | 
				
			||||||
                            hist_ohlcv: ShmArray = flume.hist_shm
 | 
					                            hist_ohlcv: ShmArray = flume.hist_shm
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -116,7 +117,7 @@ def test_multi_fqsn_feed(
 | 
				
			||||||
                            # assert last == rt_row['close']
 | 
					                            # assert last == rt_row['close']
 | 
				
			||||||
                            # assert last == hist_row['close']
 | 
					                            # assert last == hist_row['close']
 | 
				
			||||||
                            pprint(
 | 
					                            pprint(
 | 
				
			||||||
                               f'{fqsn}: {quote}\n'
 | 
					                               f'{fqme}: {quote}\n'
 | 
				
			||||||
                               f'rt_ohlc: {rt_row}\n'
 | 
					                               f'rt_ohlc: {rt_row}\n'
 | 
				
			||||||
                               f'hist_ohlc: {hist_row}\n'
 | 
					                               f'hist_ohlc: {hist_row}\n'
 | 
				
			||||||
                            )
 | 
					                            )
 | 
				
			||||||
| 
						 | 
					@ -124,6 +125,6 @@ def test_multi_fqsn_feed(
 | 
				
			||||||
                        if cntr.total() >= max_quotes:
 | 
					                        if cntr.total() >= max_quotes:
 | 
				
			||||||
                            break
 | 
					                            break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            assert set(cntr.keys()) == fqsns
 | 
					            assert set(cntr.keys()) == fqmes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    trio.run(main)
 | 
					    trio.run(main)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue