diff --git a/piker/data/feed.py b/piker/data/feed.py index 012df910..1a24b29a 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -148,12 +148,14 @@ async def _setup_persistent_brokerd( async def allocate_persistent_feed( + ctx: tractor.Context, bus: _FeedsBus, brokername: str, symbol: str, loglevel: str, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: try: @@ -204,7 +206,6 @@ async def allocate_persistent_feed( bus.feeds[symbol] = (cs, init_msg, first_quote) if opened: - # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars @@ -243,9 +244,9 @@ async def attach_feed_bus( brokername: str, symbol: str, loglevel: str, + ) -> None: - # try: if loglevel is None: loglevel = tractor.current_actor().loglevel @@ -256,15 +257,15 @@ async def attach_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) + sub_only: bool = False + entry = bus.feeds.get(symbol) + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery async with bus.task_lock: - task_cs = bus.feeds.get(symbol) - sub_only: bool = False - - # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in - # service nursery - if task_cs is None: + if entry is None: init_msg, first_quote = await bus.nursery.start( partial( allocate_persistent_feed, @@ -276,6 +277,8 @@ async def attach_feed_bus( ) ) bus._subscribers.setdefault(symbol, []).append(ctx) + assert isinstance(bus.feeds[symbol], tuple) + else: sub_only = True @@ -371,9 +374,9 @@ class Feed: # more then one? topics=['local_trades'], ) as self._trade_stream: + yield self._trade_stream else: - yield self._trade_stream @@ -386,9 +389,12 @@ def sym_to_shm_key( @asynccontextmanager async def install_brokerd_search( + portal: tractor._portal.Portal, brokermod: ModuleType, + ) -> None: + async with portal.open_context( brokermod.open_symbol_search ) as (ctx, cache): @@ -402,6 +408,7 @@ async def install_brokerd_search( return await stream.receive() async with _search.register_symbol_search( + provider_name=brokermod.name, search_routine=search, pause_period=brokermod._search_conf.get('pause_period'), @@ -412,18 +419,20 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( + brokername: str, symbols: Sequence[str], loglevel: Optional[str] = None, + ) -> AsyncIterator[Dict[str, Any]]: - """Open a "data feed" which provides streamed real-time quotes. - - """ + ''' + Open a "data feed" which provides streamed real-time quotes. + ''' sym = symbols[0].lower() # TODO: feed cache locking, right now this is causing - # issues when reconncting to a long running emsd? + # issues when reconnecting to a long running emsd? # global _searcher_cache # async with _cache_lock: @@ -441,6 +450,7 @@ async def open_feed( mod = get_ingestormod(brokername) # no feed for broker exists so maybe spawn a data brokerd + async with maybe_spawn_brokerd( brokername, loglevel=loglevel @@ -497,12 +507,4 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - if brokername in _search._searcher_cache: - yield feed - - else: - async with install_brokerd_search( - feed._brokerd_portal, - mod, - ): - yield feed + yield feed diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index be5aaf7a..749be1d0 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1480,7 +1480,6 @@ async def chart_symbol( f'tick:{symbol.tick_size}' ) - # await tractor.breakpoint() linked_charts = chart_app.linkedcharts linked_charts._symbol = symbol chart = linked_charts.plot_ohlc_main(symbol, bars) @@ -1535,6 +1534,7 @@ async def chart_symbol( }, }) + async with trio.open_nursery() as n: # load initial fsp chain (otherwise known as "indicators") @@ -1558,9 +1558,8 @@ async def chart_symbol( ) # wait for a first quote before we start any update tasks - quote = await feed.receive() - - log.info(f'Received first quote {quote}') + # quote = await feed.receive() + # log.info(f'Received first quote {quote}') n.start_soon( check_for_new_bars, @@ -1581,6 +1580,41 @@ async def chart_symbol( await start_order_mode(chart, symbol, brokername) +async def load_providers( + brokernames: list[str], + loglevel: str, +) -> None: + + # TODO: seems like our incentive for brokerd caching lelel + backends = {} + + async with AsyncExitStack() as stack: + # TODO: spawn these async in nursery. + # load all requested brokerd's at startup and load their + # search engines. + for broker in brokernames: + + # spin up broker daemons for each provider + portal = await stack.enter_async_context( + maybe_spawn_brokerd( + broker, + loglevel=loglevel + ) + ) + + backends[broker] = portal + + await stack.enter_async_context( + feed.install_brokerd_search( + portal, + get_brokermod(broker), + ) + ) + + # keep search engines up until cancelled + await trio.sleep_forever() + + async def _async_main( # implicit required argument provided by ``qtractor_run()`` widgets: Dict[str, Any], @@ -1644,53 +1678,32 @@ async def _async_main( # this internally starts a ``chart_symbol()`` task above chart_app.load_symbol(provider, symbol, loglevel) - # TODO: seems like our incentive for brokerd caching lelel - backends = {} + root_n.start_soon(load_providers, brokernames, loglevel) - async with AsyncExitStack() as stack: + # spin up a search engine for the local cached symbol set + async with _search.register_symbol_search( - # TODO: spawn these async in nursery. + provider_name='cache', + search_routine=partial( + _search.search_simple_dict, + source=chart_app._chart_cache, + ), - # load all requested brokerd's at startup and load their - # search engines. - for broker in brokernames: - portal = await stack.enter_async_context( - maybe_spawn_brokerd( - broker, - loglevel=loglevel - ) + ): + # start handling search bar kb inputs + async with open_key_stream( + search.bar, + ) as key_stream: + + # start kb handling task for searcher + root_n.start_soon( + _search.handle_keyboard_input, + # chart_app, + search, + key_stream, ) - backends[broker] = portal - await stack.enter_async_context( - feed.install_brokerd_search( - portal, - get_brokermod(broker), - ) - ) - - async with _search.register_symbol_search( - - provider_name='cache', - search_routine=partial( - _search.search_simple_dict, - source=chart_app._chart_cache, - ), - - ): - async with open_key_stream( - search.bar, - ) as key_stream: - - # start kb handling task for searcher - root_n.start_soon( - _search.handle_keyboard_input, - # chart_app, - search, - key_stream, - ) - - await trio.sleep_forever() + await trio.sleep_forever() def _main(