From 534553a6f59d97c9662121675e4edef09c972949 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 May 2021 16:37:23 -0400 Subject: [PATCH] Add client side multi-provider feed symbol search --- piker/data/feed.py | 102 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 90 insertions(+), 12 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 76c2bc23..b9df8595 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -17,6 +17,8 @@ """ Data feed apis and infra. +This module is enabled for ``brokerd`` daemons. + """ from dataclasses import dataclass, field from contextlib import asynccontextmanager @@ -25,13 +27,14 @@ from types import ModuleType from typing import ( Dict, Any, Sequence, AsyncIterator, Optional, - List + List, Awaitable, Callable ) import trio from trio_typing import TaskStatus import tractor from pydantic import BaseModel +from fuzzywuzzy import process as fuzzy from ..brokers import get_brokermod from ..log import get_logger, get_console_log @@ -43,6 +46,7 @@ from ._sharedmem import ( attach_shm_array, ShmArray, ) +from .ingest import get_ingestormod from ._source import base_iohlc_dtype, Symbol from ._sampling import ( _shms, @@ -51,7 +55,6 @@ from ._sampling import ( iter_ohlc_periods, sample_and_broadcast, ) -from .ingest import get_ingestormod log = get_logger(__name__) @@ -172,7 +175,7 @@ async def allocate_persistent_feed( ) # do history validation? - assert opened, f'Persistent shm for {symbol} was already open?!' + # assert opened, f'Persistent shm for {symbol} was already open?!' # if not opened: # raise RuntimeError("Persistent shm for sym was already open?!") @@ -235,6 +238,7 @@ async def allocate_persistent_feed( @tractor.stream async def attach_feed_bus( + ctx: tractor.Context, brokername: str, symbol: str, @@ -313,6 +317,8 @@ class Feed: _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _max_sample_rate: int = 0 + search: Callable[..., Awaitable] = None + # cache of symbol info messages received as first message when # a stream startsc. symbols: Dict[str, Symbol] = field(default_factory=dict) @@ -335,6 +341,7 @@ class Feed: iter_ohlc_periods, delay_s=delay_s or self._max_sample_rate, ) as self._index_stream: + yield self._index_stream else: yield self._index_stream @@ -366,8 +373,29 @@ class Feed: ) as self._trade_stream: yield self._trade_stream else: + yield self._trade_stream + @asynccontextmanager + async def open_symbol_search(self) -> AsyncIterator[dict]: + + async with self._brokerd_portal.open_context( + + self.mod.open_symbol_search, + + ) as (ctx, cache): + + async with ctx.open_stream() as stream: + + async def search(text: str) -> Dict[str, Any]: + await stream.send(text) + return await stream.receive() + + # deliver search func to consumer + self.search = search + yield search + self.search = None + def sym_to_shm_key( broker: str, @@ -376,6 +404,39 @@ def sym_to_shm_key( return f'{broker}.{symbol}' +# cache of brokernames to feeds +_cache: Dict[str, Feed] = {} +_cache_lock: trio.Lock = trio.Lock() + + +def get_multi_search() -> Callable[..., Awaitable]: + + global _cache + + async def multisearcher( + pattern: str, + ) -> dict: + + matches = {} + + async def pack_matches( + brokername: str, + pattern: str, + ) -> None: + matches[brokername] = await feed.search(pattern) + + # TODO: make this an async stream? + async with trio.open_nursery() as n: + + for (brokername, startup_sym), feed in _cache.items(): + if feed.search: + n.start_soon(pack_matches, brokername, pattern) + + return matches + + return multisearcher + + @asynccontextmanager async def open_feed( brokername: str, @@ -383,20 +444,34 @@ async def open_feed( loglevel: Optional[str] = None, ) -> AsyncIterator[Dict[str, Any]]: """Open a "data feed" which provides streamed real-time quotes. + """ + global _cache, _cache_lock + + sym = symbols[0] + + # TODO: feed cache locking, right now this is causing + # issues when reconncting to a long running emsd? + + # async with _cache_lock: + # feed = _cache.get((brokername, sym)) + + # # if feed is not None and sym in feed.symbols: + # if feed is not None: + # yield feed + # # short circuit + # return + try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) - if loglevel is None: - loglevel = tractor.current_actor().loglevel - - # TODO: do all! - sym = symbols[0] - - # TODO: compress these to one line with py3.9+ - async with maybe_spawn_brokerd(brokername, loglevel=loglevel) as portal: + # no feed for broker exists so maybe spawn a data brokerd + async with maybe_spawn_brokerd( + brokername, + loglevel=loglevel + ) as portal: async with portal.open_stream_from( @@ -449,8 +524,11 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) + _cache[(brokername, sym)] = feed + try: - yield feed + async with feed.open_symbol_search(): + yield feed finally: # always cancel the far end producer task