""" Questrade broker testing """ import time import trio from trio.testing import trio_test from piker.brokers import questrade as qt import pytest import tractor from tractor.testing import tractor_test import piker from piker.brokers import get_brokermod from piker.brokers.data import DataFeed log = piker.log.get_logger('tests') pytestmark = pytest.mark.skipif( True, reason="questrade tests can only be run locally with an API key", ) # stock quote _ex_quotes = { 'stock': { "VWAP": 7.383792, "askPrice": 7.56, "askSize": 2, "bidPrice": 6.1, "bidSize": 2, "delay": 0, "high52w": 9.68, "highPrice": 8, "key": "EMH.VN", "isHalted": 'false', "lastTradePrice": 6.96, "lastTradePriceTrHrs": 6.97, "lastTradeSize": 2000, "lastTradeTick": "Down", "lastTradeTime": "2018-02-07T15:59:59.259000-05:00", "low52w": 1.03, "lowPrice": 6.88, "openPrice": 7.64, "symbol": "EMH.VN", "symbolId": 10164524, "tier": "", "volume": 5357805 }, 'option': { 'VWAP': 0, 'askPrice': None, 'askSize': 0, 'bidPrice': None, 'bidSize': 0, 'contract_type': 'call', 'delay': 0, 'delta': -0.212857, "expiry": "2021-01-15T00:00:00.000000-05:00", 'gamma': 0.003524, 'highPrice': 0, 'isHalted': False, "key": ["WEED.TO", '2018-10-23T00:00:00.000000-04:00'], 'lastTradePrice': 22, 'lastTradePriceTrHrs': None, 'lastTradeSize': 0, 'lastTradeTick': 'Equal', 'lastTradeTime': '2018-10-23T00:00:00.000000-04:00', 'lowPrice': 0, 'openInterest': 1, 'openPrice': 0, 'rho': -0.891868, "strike": 8, 'symbol': 'WEED15Jan21P54.00.MX', 'symbolId': 22739148, 'theta': -0.012911, 'underlying': 'WEED.TO', 'underlyingId': 16529510, 'vega': 0.220885, 'volatility': 75.514171, 'volume': 0 } } def match_packet(symbols, quotes, feed_type='stock'): """Verify target ``symbols`` match keys in ``quotes`` packet. """ assert len(quotes) == len(symbols) # for ticker in symbols: for quote in quotes.copy(): assert quote['key'] in symbols quotes.remove(quote) # verify the quote packet format hasn't changed for key in _ex_quotes[feed_type]: quote.pop(key) # no additional fields either assert not quote # not more quotes then in target set assert not quotes @tractor_test async def test_concurrent_tokens_refresh(us_symbols, loglevel): """Verify that concurrent requests from mulitple tasks work alongside random token refreshing which simulates an access token expiry + refresh scenario. The API does not support concurrent requests when refreshing tokens (i.e. when hitting the auth endpoint). This tests ensures that when multiple tasks use the same client concurrency works and access token expiry will result in a reliable token set update. """ async with qt.get_client() as client: # async with tractor.open_nursery() as n: # await n.run_in_actor('other', intermittently_refresh_tokens) async with trio.open_nursery() as n: quoter = await qt.stock_quoter(client, us_symbols) async def get_quotes(): for tries in range(15): log.info(f"{tries}: GETTING QUOTES!") quotes = await quoter(us_symbols) assert quotes await trio.sleep(0.2) async def intermittently_refresh_tokens(client): while True: try: await client.ensure_access( force_refresh=True, ask_user=False) log.info(f"last token data is {client.access_data}") await trio.sleep(1) except Exception: log.exception("Token refresh failed") n.start_soon(intermittently_refresh_tokens, client) # run 2 quote polling tasks n.start_soon(get_quotes) await get_quotes() # shutdown # await n.cancel() n.cancel_scope.cancel() @trio_test async def test_batched_stock_quote(us_symbols): """Use the client stock quote api and verify quote response format. """ async with qt.get_client() as client: quotes = await client.quote(us_symbols) assert len(quotes) == len(us_symbols) match_packet(us_symbols, quotes) @trio_test async def test_stock_quoter_context(us_symbols): """Test that a quoter "context" used by the data feed daemon. """ async with qt.get_client() as client: quoter = await qt.stock_quoter(client, us_symbols) quotes = await quoter(us_symbols) match_packet(us_symbols, quotes) @trio_test async def test_option_contracts(tmx_symbols): """Verify we can retrieve contracts by expiry. """ async with qt.get_client() as client: for symbol in tmx_symbols: contracts = await client.symbol2contracts(symbol) key, byroot = next(iter(contracts.items())) assert isinstance(key.id, int) assert isinstance(byroot, dict) for key in contracts: # check that datetime is same as reported in contract assert key.expiry.isoformat( timespec='microseconds') == contracts[key]['expiryDate'] @trio_test async def test_option_chain(tmx_symbols): """Verify we can retrieve all option chains for a list of symbols. """ async with qt.get_client() as client: # contract lookup - should be cached contracts = await client.get_all_contracts([tmx_symbols[0]]) # chains quote for all symbols quotes = await client.option_chains(contracts) # verify contents match what we expect for quote in quotes: underlying = quote['underlying'] # XXX: sometimes it's '' for old expiries? if underlying: assert underlying in tmx_symbols for key in _ex_quotes['option']: quote.pop(key) assert not quote @trio_test async def test_option_quote_latency(tmx_symbols): """Audit option quote latencies. """ async with qt.get_client() as client: # all contracts lookup - should be cached contracts = await client.get_all_contracts(['WEED.TO']) # build single expriry contract id, by_expiry = next(iter(contracts.items())) dt, by_strike = next(iter(by_expiry.items())) single = {id: {dt: None}} for expected_latency, contract in [ # NOTE: request latency is usually 2x faster that these (5, contracts), (0.5, single) ]: for _ in range(3): # chains quote for all symbols start = time.time() await client.option_chains(contract) took = time.time() - start print(f"Request took {took}") assert took <= expected_latency await trio.sleep(0.1) async def stream_option_chain(feed, symbols): """Start up an option quote stream. ``symbols`` arg is ignored here. """ symbol = symbols[0] contracts = await feed.call_client( 'get_all_contracts', symbols=[symbol]) contractkey = next(iter(contracts)) subs_keys = list( # map(lambda item: (item.symbol, item.expiry), contracts)) map(lambda item: (item[0], item[2]), contracts)) sub = subs_keys[0] # latency arithmetic loops = 8 period = 1/3. # 3 rps timeout = float('inf') #loops / period try: # it'd sure be nice to have an asyncitertools here... with trio.fail_after(timeout): stream, first_quotes = await feed.open_stream( [sub], 'option', rate=4, diff_cached=False, ) count = 0 async for quotes in stream: # print(f'got quotes for {quotes.keys()}') # we should receive all calls and puts assert len(quotes) == len(contracts[contractkey]) * 2 for symbol, quote in quotes.items(): assert quote['key'] == sub for key in _ex_quotes['option']: quote.pop(key) assert not quote count += 1 if count == loops: break # switch the subscription and make sure # stream is still working sub = subs_keys[1] with trio.fail_after(timeout): stream, first_quotes = await feed.open_stream( [sub], 'option', rate=4, diff_cached=False, ) count = 0 async for quotes in stream: for symbol, quote in quotes.items(): assert quote['key'] == sub count += 1 if count == loops: break finally: # unsub await stream.aclose() async def stream_stocks(feed, symbols): """Start up a stock quote stream. """ stream, first_quotes = await feed.open_stream( symbols, 'stock', rate=3, diff_cached=False) # latency arithmetic loops = 8 period = 1/3. # 3 rps timeout = loops / period try: # it'd sure be nice to have an asyncitertools here... count = 0 async for quotes in stream: assert quotes for key in quotes: assert key in symbols count += 1 if count == loops: break finally: # unsub await stream.aclose() @pytest.mark.parametrize( 'stream_what', [ (stream_stocks,), (stream_option_chain,), (stream_stocks, stream_option_chain), (stream_stocks, stream_stocks), (stream_option_chain, stream_option_chain), ], ids=[ 'stocks', 'options', 'stocks_and_options', 'stocks_and_stocks', 'options_and_options', ], ) @tractor_test async def test_quote_streaming(tmx_symbols, loglevel, stream_what): """Set up option streaming using the broker daemon. """ brokermod = get_brokermod('questrade') async with tractor.find_actor('brokerd') as portal: async with tractor.open_nursery() as nursery: # only one per host address, spawns an actor if None if not portal: # no brokerd actor found portal = await nursery.start_actor( 'data_feed', rpc_module_paths=[ 'piker.brokers.data', 'piker.brokers.core' ], ) feed = DataFeed(portal, brokermod) if len(stream_what) > 1: # stream disparate symbol sets per task first, *tail = tmx_symbols symbols = ([first], tail) else: symbols = [tmx_symbols] async with trio.open_nursery() as n: for syms, func in zip(symbols, stream_what): n.start_soon(func, feed, syms) # stop all spawned subactors await nursery.cancel()