Aggregate streaming tests and test stocks + options together

kivy_mainline_and_py3.8
Tyler Goodlet 2018-12-01 16:14:33 -05:00
parent 2df5c76828
commit 12d5627860
2 changed files with 96 additions and 106 deletions

View File

@ -178,8 +178,98 @@ async def test_option_quote_latency(tmx_symbols):
await trio.sleep(0.1)
async def stream_option_chain(portal, symbols):
"""Start up an option quote stream.
``symbols`` arg is ignored here.
"""
symbol = 'APHA.TO' # your fave greenhouse LP
async with qt.get_client() as client:
contracts = await client.get_all_contracts([symbol])
contractkey = next(iter(contracts))
subs_keys = list(
map(lambda item: (item.symbol, item.expiry), contracts))
sub = subs_keys[0]
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=[sub],
feed_type='option',
diff_cached=False,
)
try:
# wait on the data streamer to actually start
# delivering
await agen.__anext__()
# it'd sure be nice to have an asyncitertools here...
with trio.fail_after(2.1):
loops = 8
count = 0
async for quotes in agen:
# print(f'got quotes for {quotes.keys()}')
# we should receive all calls and puts
assert len(quotes) == len(contracts[contractkey]) * 2
for symbol, quote in quotes.items():
assert quote['key'] == sub
for key in _ex_quotes['option']:
quote.pop(key)
assert not quote
count += 1
if count == loops:
break
finally:
# unsub
await portal.run(
'piker.brokers.data',
'modify_quote_stream',
broker='questrade',
feed_type='option',
symbols=[],
)
async def stream_stocks(portal, symbols):
"""Start up a stock quote stream.
"""
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=symbols,
)
try:
# it'd sure be nice to have an asyncitertools here...
async for quotes in agen:
assert quotes
for key in quotes:
assert key in symbols
break
finally:
# unsub
await portal.run(
'piker.brokers.data',
'modify_quote_stream',
broker='questrade',
feed_type='stock',
symbols=[],
)
@pytest.mark.parametrize(
'stream_what',
[
(stream_stocks,),
(stream_option_chain,),
(stream_stocks, stream_option_chain),
],
ids=['stocks', 'options', 'stocks_and_options'],
)
@tractor_test
async def test_option_streaming(tmx_symbols, loglevel):
async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
"""Set up option streaming using the broker daemon.
"""
async with tractor.find_actor('brokerd') as portal:
@ -194,53 +284,9 @@ async def test_option_streaming(tmx_symbols, loglevel):
'piker.brokers.core'
],
)
async with trio.open_nursery() as n:
for func in stream_what:
n.start_soon(func, portal, tmx_symbols)
symbol = 'APHA.TO' # your fave greenhouse LP
async with qt.get_client() as client:
contracts = await client.get_all_contracts([symbol])
contractkey = next(iter(contracts))
subs_keys = list(
map(lambda item: (item.symbol, item.expiry), contracts))
sub = subs_keys[0]
agen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='questrade',
symbols=[sub],
feed_type='option',
diff_cached=False,
)
try:
# wait on the data streamer to actually start
# delivering
await agen.__anext__()
# it'd sure be nice to have an asyncitertools here...
with trio.fail_after(2.1):
loops = 8
count = 0
async for quotes in agen:
# print(f'got quotes for {quotes.keys()}')
# we should receive all calls and puts
assert len(quotes) == len(contracts[contractkey]) * 2
for symbol, quote in quotes.items():
assert quote['key'] == sub
for key in _ex_quotes['option']:
quote.pop(key)
assert not quote
count += 1
if count == loops:
break
finally:
# unsub
await portal.run(
'piker.brokers.data',
'modify_quote_stream',
broker='questrade',
feed_type='option',
tickers=[],
)
# stop all spawned subactors
await nursery.cancel()
# stop all spawned subactors
await nursery.cancel()

View File

@ -1,56 +0,0 @@
"""
Actor model API testing
"""
import pytest
import tractor
async def rx_price_quotes_from_brokerd(us_symbols):
"""Verify we can spawn a daemon actor and retrieve streamed price data.
"""
async with tractor.find_actor('brokerd') as portals:
if not portals:
# only one per host address, spawns an actor if None
async with tractor.open_nursery() as nursery:
# no brokerd actor found
portal = await nursery.start_actor(
'brokerd',
rpc_module_paths=['piker.brokers.data'],
)
# gotta expose in a broker agnostic way...
# retrieve initial symbol data
# sd = await portal.run(
# 'piker.brokers.data', 'symbol_data', symbols=us_symbols)
# assert list(sd.keys()) == us_symbols
gen = await portal.run(
'piker.brokers.data',
'start_quote_stream',
broker='robinhood',
tickers=us_symbols,
)
# it'd sure be nice to have an asyncitertools here...
async for quotes in gen:
assert quotes
for key in quotes:
assert key in us_symbols
break
# terminate far-end async-gen
# await gen.asend(None)
# break
# stop all spawned subactors
await nursery.cancel()
# arbitter is cancelled here due to `find_actors()` internals
# (which internally uses `get_arbiter` which kills its channel
# server scope on exit)
def test_rx_price_quotes_from_brokerd(us_symbols):
tractor.run(
rx_price_quotes_from_brokerd,
us_symbols,
name='arbiter',
)