From c9c686c98d4640a660fe490453844b41e536c3c3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 16 May 2021 20:53:21 -0400 Subject: [PATCH] Register context-stream with multi-search for each feed --- piker/data/feed.py | 100 ++++++++++----------------------------------- 1 file changed, 22 insertions(+), 78 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index bde8fe72..5300bb85 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -27,7 +27,7 @@ from types import ModuleType from typing import ( Dict, Any, Sequence, AsyncIterator, Optional, - List, Awaitable, Callable + List, Awaitable, Callable, ) import trio @@ -47,6 +47,7 @@ from ._sharedmem import ( ) from .ingest import get_ingestormod from ._source import base_iohlc_dtype, Symbol +from ..ui import _search from ._sampling import ( _shms, _incrementers, @@ -383,78 +384,6 @@ def sym_to_shm_key( return f'{broker}.{symbol}' -# cache of brokernames to feeds -_cache: Dict[str, Callable] = {} -_cache_lock: trio.Lock = trio.Lock() - - -def get_multi_search() -> Callable[..., Awaitable]: - - global _cache - - async def multisearcher( - pattern: str, - ) -> dict: - - matches = {} - - async def pack_matches( - brokername: str, - pattern: str, - search: Callable[..., Awaitable[dict]], - ) -> None: - log.debug(f'Searching {brokername} for "{pattern}"') - matches[brokername] = await search(pattern) - - # TODO: make this an async stream? - async with trio.open_nursery() as n: - - for brokername, search in _cache.items(): - n.start_soon(pack_matches, brokername, pattern, search) - - return matches - - return multisearcher - - -@asynccontextmanager -async def open_symbol_search( - brokermod: ModuleType, - brokerd_portal: tractor._portal.Portal, -) -> AsyncIterator[dict]: - - global _cache - - open_search = getattr(brokermod, 'open_symbol_search', None) - if open_search is None: - - # just return a pure pass through searcher - async def passthru(text: str) -> Dict[str, Any]: - return text - - yield passthru - return - - async with brokerd_portal.open_context( - open_search, - ) as (ctx, cache): - - # shield here since we expect the search rpc to be - # cancellable by the user as they see fit. - async with ctx.open_stream() as stream: - - async def search(text: str) -> Dict[str, Any]: - await stream.send(text) - return await stream.receive() - - # deliver search func to consumer - try: - _cache[brokermod.name] = search - yield search - finally: - _cache.pop(brokermod.name) - - @asynccontextmanager async def open_feed( brokername: str, @@ -464,15 +393,15 @@ async def open_feed( """Open a "data feed" which provides streamed real-time quotes. """ - global _cache, _cache_lock sym = symbols[0].lower() # TODO: feed cache locking, right now this is causing # issues when reconncting to a long running emsd? + # global _searcher_cache # async with _cache_lock: - # feed = _cache.get((brokername, sym)) + # feed = _searcher_cache.get((brokername, sym)) # # if feed is not None and sym in feed.symbols: # if feed is not None: @@ -542,8 +471,23 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - if brokername in _cache: + if brokername in _search._searcher_cache: yield feed else: - async with open_symbol_search(mod, feed._brokerd_portal): - yield feed + async with feed._brokerd_portal.open_context( + mod.open_symbol_search + ) as (ctx, cache): + + # shield here since we expect the search rpc to be + # cancellable by the user as they see fit. + async with ctx.open_stream() as stream: + + async def search(text: str) -> Dict[str, Any]: + await stream.send(text) + return await stream.receive() + + async with _search.register_symbol_search( + provider_name=brokername, + search_routine=search, + ): + yield feed