Decouple symbol search from feed type

symbol_search
Tyler Goodlet 2021-05-12 08:36:18 -04:00
parent e5e9a7c582
commit f19f4348e0
1 changed files with 48 additions and 42 deletions

View File

@ -34,7 +34,6 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pydantic import BaseModel from pydantic import BaseModel
from fuzzywuzzy import process as fuzzy
from ..brokers import get_brokermod from ..brokers import get_brokermod
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
@ -376,38 +375,6 @@ class Feed:
yield self._trade_stream yield self._trade_stream
@asynccontextmanager
async def open_symbol_search(self) -> AsyncIterator[dict]:
open_search = getattr(self.mod, 'open_symbol_search', None)
if open_search is None:
# just return a pure pass through searcher
async def passthru(text: str) -> Dict[str, Any]:
return text
self.search = passthru
yield self.search
self.search = None
return
async with self._brokerd_portal.open_context(
open_search,
) as (ctx, cache):
# shield here since we expect the search rpc to be
# cancellable by the user as they see fit.
async with ctx.open_stream(shield=True) as stream:
async def search(text: str) -> Dict[str, Any]:
await stream.send(text)
return await stream.receive()
# deliver search func to consumer
self.search = search
yield search
self.search = None
def sym_to_shm_key( def sym_to_shm_key(
broker: str, broker: str,
@ -417,7 +384,7 @@ def sym_to_shm_key(
# cache of brokernames to feeds # cache of brokernames to feeds
_cache: Dict[str, Feed] = {} _cache: Dict[str, Callable] = {}
_cache_lock: trio.Lock = trio.Lock() _cache_lock: trio.Lock = trio.Lock()
@ -434,21 +401,60 @@ def get_multi_search() -> Callable[..., Awaitable]:
async def pack_matches( async def pack_matches(
brokername: str, brokername: str,
pattern: str, pattern: str,
search: Callable[..., Awaitable[dict]],
) -> None: ) -> None:
matches[brokername] = await feed.search(pattern) log.debug(f'Searching {brokername} for "{pattern}"')
matches[brokername] = await search(pattern)
# TODO: make this an async stream? # TODO: make this an async stream?
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for (brokername, startup_sym), feed in _cache.items(): for brokername, search in _cache.items():
if feed.search: n.start_soon(pack_matches, brokername, pattern, search)
n.start_soon(pack_matches, brokername, pattern)
return matches return matches
return multisearcher return multisearcher
@asynccontextmanager
async def open_symbol_search(
brokermod: ModuleType,
brokerd_portal: tractor._portal.Portal,
) -> AsyncIterator[dict]:
global _cache
open_search = getattr(brokermod, 'open_symbol_search', None)
if open_search is None:
# just return a pure pass through searcher
async def passthru(text: str) -> Dict[str, Any]:
return text
yield passthru
return
async with brokerd_portal.open_context(
open_search,
) as (ctx, cache):
# shield here since we expect the search rpc to be
# cancellable by the user as they see fit.
async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]:
await stream.send(text)
return await stream.receive()
# deliver search func to consumer
try:
_cache[brokermod.name] = search
yield search
finally:
_cache.pop(brokermod.name)
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
brokername: str, brokername: str,
@ -536,8 +542,8 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
_cache[(brokername, sym)] = feed if brokername in _cache:
async with feed.open_symbol_search():
yield feed yield feed
else:
async with open_symbol_search(mod, feed._brokerd_portal):
yield feed