Add an options streaming test

kivy_mainline_and_py3.8
Tyler Goodlet 2018-11-30 08:18:54 -05:00
parent 48a9c389c5
commit 15dec65ba1
2 changed files with 149 additions and 81 deletions

View File

@ -4,7 +4,9 @@ Questrade broker testing
import time import time
import trio import trio
import tractor
from trio.testing import trio_test from trio.testing import trio_test
from tractor.testing import tractor_test
from piker.brokers import questrade as qt from piker.brokers import questrade as qt
import pytest import pytest
@ -18,7 +20,8 @@ def check_qt_conf_section(brokerconf):
# stock quote # stock quote
_ex_quote = { _ex_quotes = {
'stock': {
"VWAP": 7.383792, "VWAP": 7.383792,
"askPrice": 7.56, "askPrice": 7.56,
"askSize": 2, "askSize": 2,
@ -27,6 +30,7 @@ _ex_quote = {
"delay": 0, "delay": 0,
"high52w": 9.68, "high52w": 9.68,
"highPrice": 8, "highPrice": 8,
"key": "EMH.VN",
"isHalted": 'false', "isHalted": 'false',
"lastTradePrice": 6.96, "lastTradePrice": 6.96,
"lastTradePriceTrHrs": 6.97, "lastTradePriceTrHrs": 6.97,
@ -40,11 +44,8 @@ _ex_quote = {
"symbolId": 10164524, "symbolId": 10164524,
"tier": "", "tier": "",
"volume": 5357805 "volume": 5357805
} },
'option': {
# option quote
_ex_contract = {
'VWAP': 0, 'VWAP': 0,
'askPrice': None, 'askPrice': None,
'askSize': 0, 'askSize': 0,
@ -55,6 +56,7 @@ _ex_contract = {
'gamma': 0.003524, 'gamma': 0.003524,
'highPrice': 0, 'highPrice': 0,
'isHalted': False, 'isHalted': False,
"key": ["WEED.TO", '2018-10-23T00:00:00.000000-04:00'],
'lastTradePrice': 22, 'lastTradePrice': 22,
'lastTradePriceTrHrs': None, 'lastTradePriceTrHrs': None,
'lastTradeSize': 0, 'lastTradeSize': 0,
@ -72,18 +74,21 @@ _ex_contract = {
'vega': 0.220885, 'vega': 0.220885,
'volatility': 75.514171, 'volatility': 75.514171,
'volume': 0 'volume': 0
}
} }
def match_packet(symbols, quotes): def match_packet(symbols, quotes, feed_type='stock'):
"""Verify target ``symbols`` match keys in ``quotes`` packet. """Verify target ``symbols`` match keys in ``quotes`` packet.
""" """
assert len(quotes) == len(symbols) assert len(quotes) == len(symbols)
for ticker in symbols: # for ticker in symbols:
quote = quotes.pop(ticker) for quote in quotes.copy():
assert quote['key'] in symbols
quotes.remove(quote)
# verify the quote packet format hasn't changed # verify the quote packet format hasn't changed
for key in _ex_quote: for key in _ex_quotes[feed_type]:
quote.pop(key) quote.pop(key)
# no additional fields either # no additional fields either
@ -104,11 +109,11 @@ async def test_batched_stock_quote(us_symbols):
@trio_test @trio_test
async def test_quoter_context(us_symbols): async def test_stock_quoter_context(us_symbols):
"""Test that a quoter "context" used by the data feed daemon. """Test that a quoter "context" used by the data feed daemon.
""" """
async with qt.get_client() as client: async with qt.get_client() as client:
quoter = await qt.quoter(client, us_symbols) quoter = await qt.stock_quoter(client, us_symbols)
quotes = await quoter(us_symbols) quotes = await quoter(us_symbols)
match_packet(us_symbols, quotes) match_packet(us_symbols, quotes)
@ -119,12 +124,14 @@ async def test_option_contracts(tmx_symbols):
""" """
async with qt.get_client() as client: async with qt.get_client() as client:
for symbol in tmx_symbols: for symbol in tmx_symbols:
id, contracts = await client.symbol2contracts(symbol) contracts = await client.symbol2contracts(symbol)
assert isinstance(id, int) key, byroot = next(iter(contracts.items()))
assert isinstance(contracts, dict) assert isinstance(key.id, int)
for dt in contracts: assert isinstance(byroot, dict)
assert dt.isoformat( for key in contracts:
timespec='microseconds') == contracts[dt]['expiryDate'] # check that datetime is same as reported in contract
assert key.expiry.isoformat(
timespec='microseconds') == contracts[key]['expiryDate']
@trio_test @trio_test
@ -133,17 +140,15 @@ async def test_option_chain(tmx_symbols):
""" """
async with qt.get_client() as client: async with qt.get_client() as client:
# contract lookup - should be cached # contract lookup - should be cached
contracts = await client.get_all_contracts(tmx_symbols) contracts = await client.get_all_contracts([tmx_symbols[0]])
# chains quote for all symbols # chains quote for all symbols
quotes = await client.option_chains(contracts) quotes = await client.option_chains(contracts)
for key in tmx_symbols: # verify contents match what we expect
contracts = quotes.pop(key) for quote in quotes:
for key, quote in contracts.items(): assert quote['underlying'] in tmx_symbols
for key in _ex_contract: for key in _ex_quotes['option']:
quote.pop(key) quote.pop(key)
assert not quote assert not quote
# chains for each symbol were retreived
assert not quotes
@trio_test @trio_test
@ -163,7 +168,7 @@ async def test_option_quote_latency(tmx_symbols):
# NOTE: request latency is usually 2x faster that these # NOTE: request latency is usually 2x faster that these
(5, contracts), (0.5, single) (5, contracts), (0.5, single)
]: ]:
for _ in range(10): for _ in range(3):
# chains quote for all symbols # chains quote for all symbols
start = time.time() start = time.time()
await client.option_chains(contract) await client.option_chains(contract)
@ -171,3 +176,71 @@ async def test_option_quote_latency(tmx_symbols):
print(f"Request took {took}") print(f"Request took {took}")
assert took <= expected_latency assert took <= expected_latency
await trio.sleep(0.1) await trio.sleep(0.1)
@tractor_test
async def test_option_streaming(tmx_symbols, loglevel):
"""Set up option streaming using the broker daemon.
"""
async with tractor.find_actor('brokerd') as portal:
async with tractor.open_nursery() as nursery:
# only one per host address, spawns an actor if None
if not portal:
# no brokerd actor found
portal = await nursery.start_actor(
'data_feed',
rpc_module_paths=[
'piker.brokers.data',
'piker.brokers.core'
],
)
symbol = 'APHA.TO' # your fave greenhouse LP
async with qt.get_client() as client:
contracts = await client.get_all_contracts([symbol])
contractkey = next(iter(contracts))
subs_keys = list(
map(lambda item: (item.symbol, item.expiry), contracts))
sub = subs_keys[0]
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=[sub],
feed_type='option',
diff_cached=False,
)
try:
# wait on the data streamer to actually start
# delivering
await agen.__anext__()
# it'd sure be nice to have an asyncitertools here...
with trio.fail_after(2.1):
loops = 8
count = 0
async for quotes in agen:
# print(f'got quotes for {quotes.keys()}')
# we should receive all calls and puts
assert len(quotes) == len(contracts[contractkey]) * 2
for symbol, quote in quotes.items():
assert quote['key'] == sub
for key in _ex_quotes['option']:
quote.pop(key)
assert not quote
count += 1
if count == loops:
break
finally:
# unsub
await portal.run(
'piker.brokers.data',
'modify_quote_stream',
broker='questrade',
feed_type='option',
tickers=[],
)
# stop all spawned subactors
await nursery.cancel()

View File

@ -16,11 +16,6 @@ async def rx_price_quotes_from_brokerd(us_symbols):
portal = await nursery.start_actor( portal = await nursery.start_actor(
'brokerd', 'brokerd',
rpc_module_paths=['piker.brokers.data'], rpc_module_paths=['piker.brokers.data'],
statespace={
'broker2tickersubs': {},
'clients': {},
'dtasks': set()
},
) )
# gotta expose in a broker agnostic way... # gotta expose in a broker agnostic way...