Include option stream subscription change in test
parent
743ca6bfe3
commit
1d1be9dd77
|
@ -51,6 +51,7 @@ _ex_quotes = {
|
|||
'askSize': 0,
|
||||
'bidPrice': None,
|
||||
'bidSize': 0,
|
||||
'contract_type': 'call',
|
||||
'delay': 0,
|
||||
'delta': -0.212857,
|
||||
"expiry": "2021-01-15T00:00:00.000000-05:00",
|
||||
|
@ -200,6 +201,7 @@ async def stream_option_chain(portal, symbols):
|
|||
broker='questrade',
|
||||
symbols=[sub],
|
||||
feed_type='option',
|
||||
rate=4,
|
||||
diff_cached=False,
|
||||
)
|
||||
try:
|
||||
|
@ -223,15 +225,34 @@ async def stream_option_chain(portal, symbols):
|
|||
count += 1
|
||||
if count == loops:
|
||||
break
|
||||
|
||||
# switch the subscription and make sure
|
||||
# stream is still working
|
||||
sub = subs_keys[1]
|
||||
await agen.aclose()
|
||||
agen = await portal.run(
|
||||
'piker.brokers.data',
|
||||
'start_quote_stream',
|
||||
broker='questrade',
|
||||
symbols=[sub],
|
||||
feed_type='option',
|
||||
rate=4,
|
||||
diff_cached=False,
|
||||
)
|
||||
|
||||
await agen.__anext__()
|
||||
with trio.fail_after(2.1):
|
||||
loops = 8
|
||||
count = 0
|
||||
async for quotes in agen:
|
||||
for symbol, quote in quotes.items():
|
||||
assert quote['key'] == sub
|
||||
count += 1
|
||||
if count == loops:
|
||||
break
|
||||
finally:
|
||||
# unsub
|
||||
await portal.run(
|
||||
'piker.brokers.data',
|
||||
'modify_quote_stream',
|
||||
broker='questrade',
|
||||
feed_type='option',
|
||||
symbols=[],
|
||||
)
|
||||
await agen.aclose()
|
||||
|
||||
|
||||
async def stream_stocks(portal, symbols):
|
||||
|
@ -252,13 +273,7 @@ async def stream_stocks(portal, symbols):
|
|||
break
|
||||
finally:
|
||||
# unsub
|
||||
await portal.run(
|
||||
'piker.brokers.data',
|
||||
'modify_quote_stream',
|
||||
broker='questrade',
|
||||
feed_type='stock',
|
||||
symbols=[],
|
||||
)
|
||||
await agen.aclose()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -286,9 +301,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
|
|||
'piker.brokers.core'
|
||||
],
|
||||
)
|
||||
async with trio.open_nursery() as n:
|
||||
for func in stream_what:
|
||||
n.start_soon(func, portal, tmx_symbols)
|
||||
async with trio.open_nursery() as n:
|
||||
for func in stream_what:
|
||||
n.start_soon(func, portal, tmx_symbols)
|
||||
|
||||
# stop all spawned subactors
|
||||
await nursery.cancel()
|
||||
|
|
Loading…
Reference in New Issue