Speedup: load provider searches async at startup

symbol_search
Tyler Goodlet 2021-05-24 08:31:53 -04:00
parent c9cf72d554
commit 9bfc230dde
2 changed files with 84 additions and 69 deletions

View File

@ -148,12 +148,14 @@ async def _setup_persistent_brokerd(
async def allocate_persistent_feed( async def allocate_persistent_feed(
ctx: tractor.Context, ctx: tractor.Context,
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
try: try:
@ -204,7 +206,6 @@ async def allocate_persistent_feed(
bus.feeds[symbol] = (cs, init_msg, first_quote) bus.feeds[symbol] = (cs, init_msg, first_quote)
if opened: if opened:
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
@ -243,9 +244,9 @@ async def attach_feed_bus(
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
) -> None: ) -> None:
# try:
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
@ -256,15 +257,15 @@ async def attach_feed_bus(
assert 'brokerd' in tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
async with bus.task_lock:
task_cs = bus.feeds.get(symbol)
sub_only: bool = False sub_only: bool = False
entry = bus.feeds.get(symbol)
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery
if task_cs is None: async with bus.task_lock:
if entry is None:
init_msg, first_quote = await bus.nursery.start( init_msg, first_quote = await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
@ -276,6 +277,8 @@ async def attach_feed_bus(
) )
) )
bus._subscribers.setdefault(symbol, []).append(ctx) bus._subscribers.setdefault(symbol, []).append(ctx)
assert isinstance(bus.feeds[symbol], tuple)
else: else:
sub_only = True sub_only = True
@ -371,9 +374,9 @@ class Feed:
# more then one? # more then one?
topics=['local_trades'], topics=['local_trades'],
) as self._trade_stream: ) as self._trade_stream:
yield self._trade_stream yield self._trade_stream
else: else:
yield self._trade_stream yield self._trade_stream
@ -386,9 +389,12 @@ def sym_to_shm_key(
@asynccontextmanager @asynccontextmanager
async def install_brokerd_search( async def install_brokerd_search(
portal: tractor._portal.Portal, portal: tractor._portal.Portal,
brokermod: ModuleType, brokermod: ModuleType,
) -> None: ) -> None:
async with portal.open_context( async with portal.open_context(
brokermod.open_symbol_search brokermod.open_symbol_search
) as (ctx, cache): ) as (ctx, cache):
@ -402,6 +408,7 @@ async def install_brokerd_search(
return await stream.receive() return await stream.receive()
async with _search.register_symbol_search( async with _search.register_symbol_search(
provider_name=brokermod.name, provider_name=brokermod.name,
search_routine=search, search_routine=search,
pause_period=brokermod._search_conf.get('pause_period'), pause_period=brokermod._search_conf.get('pause_period'),
@ -412,18 +419,20 @@ async def install_brokerd_search(
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
brokername: str, brokername: str,
symbols: Sequence[str], symbols: Sequence[str],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes. '''
Open a "data feed" which provides streamed real-time quotes.
"""
'''
sym = symbols[0].lower() sym = symbols[0].lower()
# TODO: feed cache locking, right now this is causing # TODO: feed cache locking, right now this is causing
# issues when reconncting to a long running emsd? # issues when reconnecting to a long running emsd?
# global _searcher_cache # global _searcher_cache
# async with _cache_lock: # async with _cache_lock:
@ -441,6 +450,7 @@ async def open_feed(
mod = get_ingestormod(brokername) mod = get_ingestormod(brokername)
# no feed for broker exists so maybe spawn a data brokerd # no feed for broker exists so maybe spawn a data brokerd
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
brokername, brokername,
loglevel=loglevel loglevel=loglevel
@ -497,12 +507,4 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
if brokername in _search._searcher_cache:
yield feed
else:
async with install_brokerd_search(
feed._brokerd_portal,
mod,
):
yield feed yield feed

View File

@ -1480,7 +1480,6 @@ async def chart_symbol(
f'tick:{symbol.tick_size}' f'tick:{symbol.tick_size}'
) )
# await tractor.breakpoint()
linked_charts = chart_app.linkedcharts linked_charts = chart_app.linkedcharts
linked_charts._symbol = symbol linked_charts._symbol = symbol
chart = linked_charts.plot_ohlc_main(symbol, bars) chart = linked_charts.plot_ohlc_main(symbol, bars)
@ -1535,6 +1534,7 @@ async def chart_symbol(
}, },
}) })
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
@ -1558,9 +1558,8 @@ async def chart_symbol(
) )
# wait for a first quote before we start any update tasks # wait for a first quote before we start any update tasks
quote = await feed.receive() # quote = await feed.receive()
# log.info(f'Received first quote {quote}')
log.info(f'Received first quote {quote}')
n.start_soon( n.start_soon(
check_for_new_bars, check_for_new_bars,
@ -1581,6 +1580,41 @@ async def chart_symbol(
await start_order_mode(chart, symbol, brokername) await start_order_mode(chart, symbol, brokername)
async def load_providers(
brokernames: list[str],
loglevel: str,
) -> None:
# TODO: seems like our incentive for brokerd caching lelel
backends = {}
async with AsyncExitStack() as stack:
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
# spin up broker daemons for each provider
portal = await stack.enter_async_context(
maybe_spawn_brokerd(
broker,
loglevel=loglevel
)
)
backends[broker] = portal
await stack.enter_async_context(
feed.install_brokerd_search(
portal,
get_brokermod(broker),
)
)
# keep search engines up until cancelled
await trio.sleep_forever()
async def _async_main( async def _async_main(
# implicit required argument provided by ``qtractor_run()`` # implicit required argument provided by ``qtractor_run()``
widgets: Dict[str, Any], widgets: Dict[str, Any],
@ -1644,31 +1678,9 @@ async def _async_main(
# this internally starts a ``chart_symbol()`` task above # this internally starts a ``chart_symbol()`` task above
chart_app.load_symbol(provider, symbol, loglevel) chart_app.load_symbol(provider, symbol, loglevel)
# TODO: seems like our incentive for brokerd caching lelel root_n.start_soon(load_providers, brokernames, loglevel)
backends = {}
async with AsyncExitStack() as stack:
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
portal = await stack.enter_async_context(
maybe_spawn_brokerd(
broker,
loglevel=loglevel
)
)
backends[broker] = portal
await stack.enter_async_context(
feed.install_brokerd_search(
portal,
get_brokermod(broker),
)
)
# spin up a search engine for the local cached symbol set
async with _search.register_symbol_search( async with _search.register_symbol_search(
provider_name='cache', provider_name='cache',
@ -1678,6 +1690,7 @@ async def _async_main(
), ),
): ):
# start handling search bar kb inputs
async with open_key_stream( async with open_key_stream(
search.bar, search.bar,
) as key_stream: ) as key_stream: