Test duplicate feed type quoting

kivy_mainline_and_py3.8
Tyler Goodlet 2018-12-23 21:27:47 -05:00
parent a7fb55179c
commit 6cc8b4cc2f
1 changed files with 25 additions and 8 deletions

View File

@ -186,7 +186,7 @@ async def stream_option_chain(portal, symbols):
``symbols`` arg is ignored here. ``symbols`` arg is ignored here.
""" """
symbol = 'APHA.TO' # your fave greenhouse LP symbol = symbols[0]
async with qt.get_client() as client: async with qt.get_client() as client:
contracts = await client.get_all_contracts([symbol]) contracts = await client.get_all_contracts([symbol])
@ -204,14 +204,18 @@ async def stream_option_chain(portal, symbols):
rate=4, rate=4,
diff_cached=False, diff_cached=False,
) )
# latency arithmetic
loops = 8
rate = 1/3. # 3 rps
timeout = loops / rate
try: try:
# wait on the data streamer to actually start # wait on the data streamer to actually start
# delivering # delivering
await agen.__anext__() await agen.__anext__()
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
with trio.fail_after(2.1): with trio.fail_after(timeout):
loops = 8
count = 0 count = 0
async for quotes in agen: async for quotes in agen:
# print(f'got quotes for {quotes.keys()}') # print(f'got quotes for {quotes.keys()}')
@ -241,8 +245,7 @@ async def stream_option_chain(portal, symbols):
) )
await agen.__anext__() await agen.__anext__()
with trio.fail_after(2.1): with trio.fail_after(timeout):
loops = 8
count = 0 count = 0
async for quotes in agen: async for quotes in agen:
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
@ -263,6 +266,7 @@ async def stream_stocks(portal, symbols):
'start_quote_stream', 'start_quote_stream',
broker='questrade', broker='questrade',
symbols=symbols, symbols=symbols,
diff_cached=False,
) )
try: try:
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
@ -282,8 +286,14 @@ async def stream_stocks(portal, symbols):
(stream_stocks,), (stream_stocks,),
(stream_option_chain,), (stream_option_chain,),
(stream_stocks, stream_option_chain), (stream_stocks, stream_option_chain),
(stream_stocks, stream_stocks),
(stream_option_chain, stream_option_chain),
],
ids=[
'stocks', 'options',
'stocks_and_options', 'stocks_and_stocks',
'options_and_options',
], ],
ids=['stocks', 'options', 'stocks_and_options'],
) )
@tractor_test @tractor_test
async def test_quote_streaming(tmx_symbols, loglevel, stream_what): async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
@ -301,9 +311,16 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
'piker.brokers.core' 'piker.brokers.core'
], ],
) )
if len(stream_what) > 1:
# stream disparate symbol sets per task
first, *tail = tmx_symbols
symbols = ([first], tail)
else:
symbols = [tmx_symbols]
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for func in stream_what: for syms, func in zip(symbols, stream_what):
n.start_soon(func, portal, tmx_symbols) n.start_soon(func, portal, syms)
# stop all spawned subactors # stop all spawned subactors
await nursery.cancel() await nursery.cancel()