From 1d1be9dd77f4b5aaa3fd58a10817f6d2d119469a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Dec 2018 17:10:36 -0500 Subject: [PATCH] Include option stream subscription change in test --- tests/test_questrade.py | 49 +++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 4c3ac059..39b64cc7 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -51,6 +51,7 @@ _ex_quotes = { 'askSize': 0, 'bidPrice': None, 'bidSize': 0, + 'contract_type': 'call', 'delay': 0, 'delta': -0.212857, "expiry": "2021-01-15T00:00:00.000000-05:00", @@ -200,6 +201,7 @@ async def stream_option_chain(portal, symbols): broker='questrade', symbols=[sub], feed_type='option', + rate=4, diff_cached=False, ) try: @@ -223,15 +225,34 @@ async def stream_option_chain(portal, symbols): count += 1 if count == loops: break + + # switch the subscription and make sure + # stream is still working + sub = subs_keys[1] + await agen.aclose() + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=[sub], + feed_type='option', + rate=4, + diff_cached=False, + ) + + await agen.__anext__() + with trio.fail_after(2.1): + loops = 8 + count = 0 + async for quotes in agen: + for symbol, quote in quotes.items(): + assert quote['key'] == sub + count += 1 + if count == loops: + break finally: # unsub - await portal.run( - 'piker.brokers.data', - 'modify_quote_stream', - broker='questrade', - feed_type='option', - symbols=[], - ) + await agen.aclose() async def stream_stocks(portal, symbols): @@ -252,13 +273,7 @@ async def stream_stocks(portal, symbols): break finally: # unsub - await portal.run( - 'piker.brokers.data', - 'modify_quote_stream', - broker='questrade', - feed_type='stock', - symbols=[], - ) + await agen.aclose() @pytest.mark.parametrize( @@ -286,9 +301,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): 'piker.brokers.core' ], ) - async with trio.open_nursery() as n: - for func in stream_what: - n.start_soon(func, portal, tmx_symbols) + async with trio.open_nursery() as n: + for func in stream_what: + n.start_soon(func, portal, tmx_symbols) # stop all spawned subactors await nursery.cancel()