diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 7a5170b4..482c1e37 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -178,8 +178,98 @@ async def test_option_quote_latency(tmx_symbols): await trio.sleep(0.1) +async def stream_option_chain(portal, symbols): + """Start up an option quote stream. + + ``symbols`` arg is ignored here. + """ + symbol = 'APHA.TO' # your fave greenhouse LP + async with qt.get_client() as client: + contracts = await client.get_all_contracts([symbol]) + + contractkey = next(iter(contracts)) + subs_keys = list( + map(lambda item: (item.symbol, item.expiry), contracts)) + sub = subs_keys[0] + + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=[sub], + feed_type='option', + diff_cached=False, + ) + try: + # wait on the data streamer to actually start + # delivering + await agen.__anext__() + + # it'd sure be nice to have an asyncitertools here... + with trio.fail_after(2.1): + loops = 8 + count = 0 + async for quotes in agen: + # print(f'got quotes for {quotes.keys()}') + # we should receive all calls and puts + assert len(quotes) == len(contracts[contractkey]) * 2 + for symbol, quote in quotes.items(): + assert quote['key'] == sub + for key in _ex_quotes['option']: + quote.pop(key) + assert not quote + count += 1 + if count == loops: + break + finally: + # unsub + await portal.run( + 'piker.brokers.data', + 'modify_quote_stream', + broker='questrade', + feed_type='option', + symbols=[], + ) + + +async def stream_stocks(portal, symbols): + """Start up a stock quote stream. + """ + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=symbols, + ) + try: + # it'd sure be nice to have an asyncitertools here... + async for quotes in agen: + assert quotes + for key in quotes: + assert key in symbols + break + finally: + # unsub + await portal.run( + 'piker.brokers.data', + 'modify_quote_stream', + broker='questrade', + feed_type='stock', + symbols=[], + ) + + +@pytest.mark.parametrize( + 'stream_what', + [ + (stream_stocks,), + (stream_option_chain,), + (stream_stocks, stream_option_chain), + ], + ids=['stocks', 'options', 'stocks_and_options'], +) @tractor_test -async def test_option_streaming(tmx_symbols, loglevel): +async def test_quote_streaming(tmx_symbols, loglevel, stream_what): """Set up option streaming using the broker daemon. """ async with tractor.find_actor('brokerd') as portal: @@ -194,53 +284,9 @@ async def test_option_streaming(tmx_symbols, loglevel): 'piker.brokers.core' ], ) + async with trio.open_nursery() as n: + for func in stream_what: + n.start_soon(func, portal, tmx_symbols) - symbol = 'APHA.TO' # your fave greenhouse LP - async with qt.get_client() as client: - contracts = await client.get_all_contracts([symbol]) - - contractkey = next(iter(contracts)) - subs_keys = list( - map(lambda item: (item.symbol, item.expiry), contracts)) - sub = subs_keys[0] - - agen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='questrade', - symbols=[sub], - feed_type='option', - diff_cached=False, - ) - try: - # wait on the data streamer to actually start - # delivering - await agen.__anext__() - - # it'd sure be nice to have an asyncitertools here... - with trio.fail_after(2.1): - loops = 8 - count = 0 - async for quotes in agen: - # print(f'got quotes for {quotes.keys()}') - # we should receive all calls and puts - assert len(quotes) == len(contracts[contractkey]) * 2 - for symbol, quote in quotes.items(): - assert quote['key'] == sub - for key in _ex_quotes['option']: - quote.pop(key) - assert not quote - count += 1 - if count == loops: - break - finally: - # unsub - await portal.run( - 'piker.brokers.data', - 'modify_quote_stream', - broker='questrade', - feed_type='option', - tickers=[], - ) - # stop all spawned subactors - await nursery.cancel() + # stop all spawned subactors + await nursery.cancel() diff --git a/tests/test_tractor.py b/tests/test_tractor.py deleted file mode 100644 index 7afc5d77..00000000 --- a/tests/test_tractor.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -Actor model API testing -""" -import pytest -import tractor - - -async def rx_price_quotes_from_brokerd(us_symbols): - """Verify we can spawn a daemon actor and retrieve streamed price data. - """ - async with tractor.find_actor('brokerd') as portals: - if not portals: - # only one per host address, spawns an actor if None - async with tractor.open_nursery() as nursery: - # no brokerd actor found - portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=['piker.brokers.data'], - ) - - # gotta expose in a broker agnostic way... - # retrieve initial symbol data - # sd = await portal.run( - # 'piker.brokers.data', 'symbol_data', symbols=us_symbols) - # assert list(sd.keys()) == us_symbols - - gen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='robinhood', - tickers=us_symbols, - ) - # it'd sure be nice to have an asyncitertools here... - async for quotes in gen: - assert quotes - for key in quotes: - assert key in us_symbols - break - # terminate far-end async-gen - # await gen.asend(None) - # break - - # stop all spawned subactors - await nursery.cancel() - - # arbitter is cancelled here due to `find_actors()` internals - # (which internally uses `get_arbiter` which kills its channel - # server scope on exit) - - -def test_rx_price_quotes_from_brokerd(us_symbols): - tractor.run( - rx_price_quotes_from_brokerd, - us_symbols, - name='arbiter', - )