From 6cc8b4cc2fa89172bab5cbbd757ae6771b97f020 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 23 Dec 2018 21:27:47 -0500 Subject: [PATCH] Test duplicate feed type quoting --- tests/test_questrade.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 39b64cc7..6730884d 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -186,7 +186,7 @@ async def stream_option_chain(portal, symbols): ``symbols`` arg is ignored here. """ - symbol = 'APHA.TO' # your fave greenhouse LP + symbol = symbols[0] async with qt.get_client() as client: contracts = await client.get_all_contracts([symbol]) @@ -204,14 +204,18 @@ async def stream_option_chain(portal, symbols): rate=4, diff_cached=False, ) + # latency arithmetic + loops = 8 + rate = 1/3. # 3 rps + timeout = loops / rate + try: # wait on the data streamer to actually start # delivering await agen.__anext__() # it'd sure be nice to have an asyncitertools here... - with trio.fail_after(2.1): - loops = 8 + with trio.fail_after(timeout): count = 0 async for quotes in agen: # print(f'got quotes for {quotes.keys()}') @@ -241,8 +245,7 @@ async def stream_option_chain(portal, symbols): ) await agen.__anext__() - with trio.fail_after(2.1): - loops = 8 + with trio.fail_after(timeout): count = 0 async for quotes in agen: for symbol, quote in quotes.items(): @@ -263,6 +266,7 @@ async def stream_stocks(portal, symbols): 'start_quote_stream', broker='questrade', symbols=symbols, + diff_cached=False, ) try: # it'd sure be nice to have an asyncitertools here... @@ -282,8 +286,14 @@ async def stream_stocks(portal, symbols): (stream_stocks,), (stream_option_chain,), (stream_stocks, stream_option_chain), + (stream_stocks, stream_stocks), + (stream_option_chain, stream_option_chain), + ], + ids=[ + 'stocks', 'options', + 'stocks_and_options', 'stocks_and_stocks', + 'options_and_options', ], - ids=['stocks', 'options', 'stocks_and_options'], ) @tractor_test async def test_quote_streaming(tmx_symbols, loglevel, stream_what): @@ -301,9 +311,16 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): 'piker.brokers.core' ], ) + if len(stream_what) > 1: + # stream disparate symbol sets per task + first, *tail = tmx_symbols + symbols = ([first], tail) + else: + symbols = [tmx_symbols] + async with trio.open_nursery() as n: - for func in stream_what: - n.start_soon(func, portal, tmx_symbols) + for syms, func in zip(symbols, stream_what): + n.start_soon(func, portal, syms) # stop all spawned subactors await nursery.cancel()