Register context-stream with multi-search for each feed

symbol_search
Tyler Goodlet 2021-05-16 20:53:21 -04:00
parent 0163a582a5
commit c9c686c98d
1 changed files with 22 additions and 78 deletions

View File

@ -27,7 +27,7 @@ from types import ModuleType
from typing import ( from typing import (
Dict, Any, Sequence, Dict, Any, Sequence,
AsyncIterator, Optional, AsyncIterator, Optional,
List, Awaitable, Callable List, Awaitable, Callable,
) )
import trio import trio
@ -47,6 +47,7 @@ from ._sharedmem import (
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from ._source import base_iohlc_dtype, Symbol from ._source import base_iohlc_dtype, Symbol
from ..ui import _search
from ._sampling import ( from ._sampling import (
_shms, _shms,
_incrementers, _incrementers,
@ -383,78 +384,6 @@ def sym_to_shm_key(
return f'{broker}.{symbol}' return f'{broker}.{symbol}'
# cache of brokernames to feeds
_cache: Dict[str, Callable] = {}
_cache_lock: trio.Lock = trio.Lock()
def get_multi_search() -> Callable[..., Awaitable]:
global _cache
async def multisearcher(
pattern: str,
) -> dict:
matches = {}
async def pack_matches(
brokername: str,
pattern: str,
search: Callable[..., Awaitable[dict]],
) -> None:
log.debug(f'Searching {brokername} for "{pattern}"')
matches[brokername] = await search(pattern)
# TODO: make this an async stream?
async with trio.open_nursery() as n:
for brokername, search in _cache.items():
n.start_soon(pack_matches, brokername, pattern, search)
return matches
return multisearcher
@asynccontextmanager
async def open_symbol_search(
brokermod: ModuleType,
brokerd_portal: tractor._portal.Portal,
) -> AsyncIterator[dict]:
global _cache
open_search = getattr(brokermod, 'open_symbol_search', None)
if open_search is None:
# just return a pure pass through searcher
async def passthru(text: str) -> Dict[str, Any]:
return text
yield passthru
return
async with brokerd_portal.open_context(
open_search,
) as (ctx, cache):
# shield here since we expect the search rpc to be
# cancellable by the user as they see fit.
async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]:
await stream.send(text)
return await stream.receive()
# deliver search func to consumer
try:
_cache[brokermod.name] = search
yield search
finally:
_cache.pop(brokermod.name)
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
brokername: str, brokername: str,
@ -464,15 +393,15 @@ async def open_feed(
"""Open a "data feed" which provides streamed real-time quotes. """Open a "data feed" which provides streamed real-time quotes.
""" """
global _cache, _cache_lock
sym = symbols[0].lower() sym = symbols[0].lower()
# TODO: feed cache locking, right now this is causing # TODO: feed cache locking, right now this is causing
# issues when reconncting to a long running emsd? # issues when reconncting to a long running emsd?
# global _searcher_cache
# async with _cache_lock: # async with _cache_lock:
# feed = _cache.get((brokername, sym)) # feed = _searcher_cache.get((brokername, sym))
# # if feed is not None and sym in feed.symbols: # # if feed is not None and sym in feed.symbols:
# if feed is not None: # if feed is not None:
@ -542,8 +471,23 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
if brokername in _cache: if brokername in _search._searcher_cache:
yield feed yield feed
else: else:
async with open_symbol_search(mod, feed._brokerd_portal): async with feed._brokerd_portal.open_context(
yield feed mod.open_symbol_search
) as (ctx, cache):
# shield here since we expect the search rpc to be
# cancellable by the user as they see fit.
async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]:
await stream.send(text)
return await stream.receive()
async with _search.register_symbol_search(
provider_name=brokername,
search_routine=search,
):
yield feed