Use `DataFeed` api in streaming tests

kivy_mainline_and_py3.8
Tyler Goodlet 2019-02-21 23:10:24 -05:00
parent 7ee731faac
commit 1a0427db08
1 changed files with 47 additions and 51 deletions

View File

@ -2,14 +2,16 @@
Questrade broker testing Questrade broker testing
""" """
import time import time
import logging
import trio import trio
from trio.testing import trio_test from trio.testing import trio_test
import tractor
from tractor.testing import tractor_test
from piker.brokers import questrade as qt from piker.brokers import questrade as qt
import pytest import pytest
import tractor
from tractor.testing import tractor_test
from piker.brokers import get_brokermod
from piker.brokers.data import DataFeed
log = tractor.get_logger('tests') log = tractor.get_logger('tests')
@ -129,6 +131,7 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
for tries in range(30): for tries in range(30):
log.info(f"{tries}: GETTING QUOTES!") log.info(f"{tries}: GETTING QUOTES!")
quotes = await quoter(us_symbols) quotes = await quoter(us_symbols)
assert quotes
await trio.sleep(0.1) await trio.sleep(0.1)
async def intermittently_refresh_tokens(client): async def intermittently_refresh_tokens(client):
@ -197,7 +200,10 @@ async def test_option_chain(tmx_symbols):
quotes = await client.option_chains(contracts) quotes = await client.option_chains(contracts)
# verify contents match what we expect # verify contents match what we expect
for quote in quotes: for quote in quotes:
assert quote['underlying'] in tmx_symbols underlying = quote['underlying']
# XXX: sometimes it's '' for old expiries?
if underlying:
assert underlying in tmx_symbols
for key in _ex_quotes['option']: for key in _ex_quotes['option']:
quote.pop(key) quote.pop(key)
assert not quote assert not quote
@ -230,43 +236,34 @@ async def test_option_quote_latency(tmx_symbols):
await trio.sleep(0.1) await trio.sleep(0.1)
async def stream_option_chain(portal, symbols): async def stream_option_chain(feed, symbols):
"""Start up an option quote stream. """Start up an option quote stream.
``symbols`` arg is ignored here. ``symbols`` arg is ignored here.
""" """
symbol = symbols[0] symbol = symbols[0]
async with qt.get_client() as client: contracts = await feed.call_client(
contracts = await client.get_all_contracts([symbol]) 'get_all_contracts', symbols=[symbol])
contractkey = next(iter(contracts)) contractkey = next(iter(contracts))
subs_keys = list( subs_keys = list(
map(lambda item: (item.symbol, item.expiry), contracts)) # map(lambda item: (item.symbol, item.expiry), contracts))
map(lambda item: (item[0], item[2]), contracts))
sub = subs_keys[0] sub = subs_keys[0]
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=[sub],
feed_type='option',
rate=3,
diff_cached=False,
)
# latency arithmetic # latency arithmetic
loops = 8 loops = 8
period = 1/3. # 3 rps period = 1/3. # 3 rps
timeout = loops / period timeout = float('inf') #loops / period
try: try:
# wait on the data streamer to actually start
# delivering
await agen.__anext__()
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
with trio.fail_after(timeout): with trio.fail_after(timeout):
stream, first_quotes = await feed.open_stream(
[sub], 'option', rate=4, diff_cached=False,
)
count = 0 count = 0
async for quotes in agen: async for quotes in stream:
# print(f'got quotes for {quotes.keys()}') # print(f'got quotes for {quotes.keys()}')
# we should receive all calls and puts # we should receive all calls and puts
assert len(quotes) == len(contracts[contractkey]) * 2 assert len(quotes) == len(contracts[contractkey]) * 2
@ -282,21 +279,12 @@ async def stream_option_chain(portal, symbols):
# switch the subscription and make sure # switch the subscription and make sure
# stream is still working # stream is still working
sub = subs_keys[1] sub = subs_keys[1]
await agen.aclose()
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=[sub],
feed_type='option',
rate=4,
diff_cached=False,
)
await agen.__anext__()
with trio.fail_after(timeout): with trio.fail_after(timeout):
stream, first_quotes = await feed.open_stream(
[sub], 'option', rate=4, diff_cached=False,
)
count = 0 count = 0
async for quotes in agen: async for quotes in stream:
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
assert quote['key'] == sub assert quote['key'] == sub
count += 1 count += 1
@ -304,29 +292,32 @@ async def stream_option_chain(portal, symbols):
break break
finally: finally:
# unsub # unsub
await agen.aclose() await stream.aclose()
async def stream_stocks(portal, symbols): async def stream_stocks(feed, symbols):
"""Start up a stock quote stream. """Start up a stock quote stream.
""" """
agen = await portal.run( stream, first_quotes = await feed.open_stream(
'piker.brokers.data', symbols, 'stock', rate=3, diff_cached=False)
'start_quote_stream', # latency arithmetic
broker='questrade', loops = 8
symbols=symbols, period = 1/3. # 3 rps
diff_cached=False, timeout = loops / period
)
try: try:
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
async for quotes in agen: count = 0
async for quotes in stream:
assert quotes assert quotes
for key in quotes: for key in quotes:
assert key in symbols assert key in symbols
break count += 1
if count == loops:
break
finally: finally:
# unsub # unsub
await agen.aclose() await stream.aclose()
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -339,8 +330,10 @@ async def stream_stocks(portal, symbols):
(stream_option_chain, stream_option_chain), (stream_option_chain, stream_option_chain),
], ],
ids=[ ids=[
'stocks', 'options', 'stocks',
'stocks_and_options', 'stocks_and_stocks', 'options',
'stocks_and_options',
'stocks_and_stocks',
'options_and_options', 'options_and_options',
], ],
) )
@ -348,6 +341,7 @@ async def stream_stocks(portal, symbols):
async def test_quote_streaming(tmx_symbols, loglevel, stream_what): async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
"""Set up option streaming using the broker daemon. """Set up option streaming using the broker daemon.
""" """
brokermod = get_brokermod('questrade')
async with tractor.find_actor('brokerd') as portal: async with tractor.find_actor('brokerd') as portal:
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
# only one per host address, spawns an actor if None # only one per host address, spawns an actor if None
@ -360,6 +354,8 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
'piker.brokers.core' 'piker.brokers.core'
], ],
) )
feed = DataFeed(portal, brokermod)
if len(stream_what) > 1: if len(stream_what) > 1:
# stream disparate symbol sets per task # stream disparate symbol sets per task
first, *tail = tmx_symbols first, *tail = tmx_symbols
@ -369,7 +365,7 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for syms, func in zip(symbols, stream_what): for syms, func in zip(symbols, stream_what):
n.start_soon(func, portal, syms) n.start_soon(func, feed, syms)
# stop all spawned subactors # stop all spawned subactors
await nursery.cancel() await nursery.cancel()