Load provider search engines in tasks instead of exit stack

asyncify_input_modes
Tyler Goodlet 2021-07-05 09:53:19 -04:00
parent 90588018a6
commit 83ad071cb4
1 changed files with 49 additions and 63 deletions

View File

@ -19,7 +19,6 @@ High level Qt chart widgets.
""" """
import time import time
from contextlib import AsyncExitStack
from typing import Tuple, Dict, Any, Optional from typing import Tuple, Dict, Any, Optional
from types import ModuleType from types import ModuleType
from functools import partial from functools import partial
@ -844,7 +843,7 @@ class ChartPlotWidget(pg.PlotWidget):
# istart=max(lbar, l), iend=min(rbar, r), just_history=True) # istart=max(lbar, l), iend=min(rbar, r), just_history=True)
# bars_len = rbar - lbar # bars_len = rbar - lbar
# log.trace( # log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n" # f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n" # f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}" # f"begin: {begin}, end: {end}, extra: {extra}"
@ -1474,7 +1473,6 @@ async def display_symbol_data(
) as feed, ) as feed,
trio.open_nursery() as n,
): ):
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
@ -1542,77 +1540,66 @@ async def display_symbol_data(
}, },
}) })
# load initial fsp chain (otherwise known as "indicators") async with trio.open_nursery() as n:
n.start_soon( # load initial fsp chain (otherwise known as "indicators")
spawn_fsps, n.start_soon(
linkedsplits, spawn_fsps,
fsp_conf, linkedsplits,
sym, fsp_conf,
ohlcv, sym,
brokermod, ohlcv,
loading_sym_key, brokermod,
loglevel, loading_sym_key,
) loglevel,
)
# start graphics update loop(s)after receiving first live quote # start graphics update loop(s)after receiving first live quote
n.start_soon( n.start_soon(
chart_from_quotes, chart_from_quotes,
chart, chart,
feed.stream, feed.stream,
ohlcv, ohlcv,
wap_in_history, wap_in_history,
) )
# TODO: instead we should start based on instrument trading hours? # TODO: instead we should start based on instrument trading hours?
# wait for a first quote before we start any update tasks # wait for a first quote before we start any update tasks
# quote = await feed.receive() # quote = await feed.receive()
# log.info(f'Received first quote {quote}') # log.info(f'Received first quote {quote}')
n.start_soon( n.start_soon(
check_for_new_bars, check_for_new_bars,
feed, feed,
ohlcv, ohlcv,
linkedsplits linkedsplits
) )
await start_order_mode(chart, symbol, provider, order_mode_started) await start_order_mode(chart, symbol, provider, order_mode_started)
async def load_providers( async def load_provider_search(
brokernames: list[str], broker: str,
loglevel: str, loglevel: str,
) -> None: ) -> None:
# TODO: seems like our incentive for brokerd caching lelel log.info(f'loading brokerd for {broker}..')
backends = {}
async with AsyncExitStack() as stack: async with (
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
log.info(f'loading brokerd for {broker}..') maybe_spawn_brokerd(
# spin up broker daemons for each provider broker,
portal = await stack.enter_async_context( loglevel=loglevel
maybe_spawn_brokerd( ) as portal,
broker,
loglevel=loglevel
)
)
backends[broker] = portal feed.install_brokerd_search(
portal,
get_brokermod(broker),
),
):
await stack.enter_async_context( # keep search engine stream up until cancelled
feed.install_brokerd_search(
portal,
get_brokermod(broker),
)
)
# keep search engines up until cancelled
await trio.sleep_forever() await trio.sleep_forever()
@ -1653,9 +1640,7 @@ async def _async_main(
sbar = godwidget.window.status_bar sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz') starting_done = sbar.open_status('starting ze sexy chartz')
async with ( async with trio.open_nursery() as root_n:
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds # set root nursery and task stack for spawning other charts/feeds
# that run cached in the bg # that run cached in the bg
@ -1694,7 +1679,8 @@ async def _async_main(
): ):
# load other providers into search **after** # load other providers into search **after**
# the chart's select cache # the chart's select cache
root_n.start_soon(load_providers, brokernames, loglevel) for broker in brokernames:
root_n.start_soon(load_provider_search, broker, loglevel)
await order_mode_ready.wait() await order_mode_ready.wait()