diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 1c081935..1cfb60dd 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -2,14 +2,16 @@ Questrade broker testing """ import time -import logging import trio from trio.testing import trio_test -import tractor -from tractor.testing import tractor_test from piker.brokers import questrade as qt import pytest +import tractor +from tractor.testing import tractor_test + +from piker.brokers import get_brokermod +from piker.brokers.data import DataFeed log = tractor.get_logger('tests') @@ -129,6 +131,7 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel): for tries in range(30): log.info(f"{tries}: GETTING QUOTES!") quotes = await quoter(us_symbols) + assert quotes await trio.sleep(0.1) async def intermittently_refresh_tokens(client): @@ -197,7 +200,10 @@ async def test_option_chain(tmx_symbols): quotes = await client.option_chains(contracts) # verify contents match what we expect for quote in quotes: - assert quote['underlying'] in tmx_symbols + underlying = quote['underlying'] + # XXX: sometimes it's '' for old expiries? + if underlying: + assert underlying in tmx_symbols for key in _ex_quotes['option']: quote.pop(key) assert not quote @@ -230,43 +236,34 @@ async def test_option_quote_latency(tmx_symbols): await trio.sleep(0.1) -async def stream_option_chain(portal, symbols): +async def stream_option_chain(feed, symbols): """Start up an option quote stream. ``symbols`` arg is ignored here. """ symbol = symbols[0] - async with qt.get_client() as client: - contracts = await client.get_all_contracts([symbol]) + contracts = await feed.call_client( + 'get_all_contracts', symbols=[symbol]) contractkey = next(iter(contracts)) subs_keys = list( - map(lambda item: (item.symbol, item.expiry), contracts)) + # map(lambda item: (item.symbol, item.expiry), contracts)) + map(lambda item: (item[0], item[2]), contracts)) sub = subs_keys[0] - agen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='questrade', - symbols=[sub], - feed_type='option', - rate=3, - diff_cached=False, - ) # latency arithmetic loops = 8 period = 1/3. # 3 rps - timeout = loops / period + timeout = float('inf') #loops / period try: - # wait on the data streamer to actually start - # delivering - await agen.__anext__() - # it'd sure be nice to have an asyncitertools here... with trio.fail_after(timeout): + stream, first_quotes = await feed.open_stream( + [sub], 'option', rate=4, diff_cached=False, + ) count = 0 - async for quotes in agen: + async for quotes in stream: # print(f'got quotes for {quotes.keys()}') # we should receive all calls and puts assert len(quotes) == len(contracts[contractkey]) * 2 @@ -282,21 +279,12 @@ async def stream_option_chain(portal, symbols): # switch the subscription and make sure # stream is still working sub = subs_keys[1] - await agen.aclose() - agen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='questrade', - symbols=[sub], - feed_type='option', - rate=4, - diff_cached=False, - ) - - await agen.__anext__() with trio.fail_after(timeout): + stream, first_quotes = await feed.open_stream( + [sub], 'option', rate=4, diff_cached=False, + ) count = 0 - async for quotes in agen: + async for quotes in stream: for symbol, quote in quotes.items(): assert quote['key'] == sub count += 1 @@ -304,29 +292,32 @@ async def stream_option_chain(portal, symbols): break finally: # unsub - await agen.aclose() + await stream.aclose() -async def stream_stocks(portal, symbols): +async def stream_stocks(feed, symbols): """Start up a stock quote stream. """ - agen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='questrade', - symbols=symbols, - diff_cached=False, - ) + stream, first_quotes = await feed.open_stream( + symbols, 'stock', rate=3, diff_cached=False) + # latency arithmetic + loops = 8 + period = 1/3. # 3 rps + timeout = loops / period + try: # it'd sure be nice to have an asyncitertools here... - async for quotes in agen: + count = 0 + async for quotes in stream: assert quotes for key in quotes: assert key in symbols - break + count += 1 + if count == loops: + break finally: # unsub - await agen.aclose() + await stream.aclose() @pytest.mark.parametrize( @@ -339,8 +330,10 @@ async def stream_stocks(portal, symbols): (stream_option_chain, stream_option_chain), ], ids=[ - 'stocks', 'options', - 'stocks_and_options', 'stocks_and_stocks', + 'stocks', + 'options', + 'stocks_and_options', + 'stocks_and_stocks', 'options_and_options', ], ) @@ -348,6 +341,7 @@ async def stream_stocks(portal, symbols): async def test_quote_streaming(tmx_symbols, loglevel, stream_what): """Set up option streaming using the broker daemon. """ + brokermod = get_brokermod('questrade') async with tractor.find_actor('brokerd') as portal: async with tractor.open_nursery() as nursery: # only one per host address, spawns an actor if None @@ -360,6 +354,8 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): 'piker.brokers.core' ], ) + feed = DataFeed(portal, brokermod) + if len(stream_what) > 1: # stream disparate symbol sets per task first, *tail = tmx_symbols @@ -369,7 +365,7 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): async with trio.open_nursery() as n: for syms, func in zip(symbols, stream_what): - n.start_soon(func, portal, syms) + n.start_soon(func, feed, syms) # stop all spawned subactors await nursery.cancel()