Add client side multi-provider feed symbol search

symbol_search
Tyler Goodlet 2021-05-06 16:37:23 -04:00
parent 4b818ea2f2
commit 534553a6f5
1 changed files with 90 additions and 12 deletions

View File

@ -17,6 +17,8 @@
"""
Data feed apis and infra.
This module is enabled for ``brokerd`` daemons.
"""
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
@ -25,13 +27,14 @@ from types import ModuleType
from typing import (
Dict, Any, Sequence,
AsyncIterator, Optional,
List
List, Awaitable, Callable
)
import trio
from trio_typing import TaskStatus
import tractor
from pydantic import BaseModel
from fuzzywuzzy import process as fuzzy
from ..brokers import get_brokermod
from ..log import get_logger, get_console_log
@ -43,6 +46,7 @@ from ._sharedmem import (
attach_shm_array,
ShmArray,
)
from .ingest import get_ingestormod
from ._source import base_iohlc_dtype, Symbol
from ._sampling import (
_shms,
@ -51,7 +55,6 @@ from ._sampling import (
iter_ohlc_periods,
sample_and_broadcast,
)
from .ingest import get_ingestormod
log = get_logger(__name__)
@ -172,7 +175,7 @@ async def allocate_persistent_feed(
)
# do history validation?
assert opened, f'Persistent shm for {symbol} was already open?!'
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
@ -235,6 +238,7 @@ async def allocate_persistent_feed(
@tractor.stream
async def attach_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
@ -313,6 +317,8 @@ class Feed:
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
_max_sample_rate: int = 0
search: Callable[..., Awaitable] = None
# cache of symbol info messages received as first message when
# a stream startsc.
symbols: Dict[str, Symbol] = field(default_factory=dict)
@ -335,6 +341,7 @@ class Feed:
iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate,
) as self._index_stream:
yield self._index_stream
else:
yield self._index_stream
@ -366,8 +373,29 @@ class Feed:
) as self._trade_stream:
yield self._trade_stream
else:
yield self._trade_stream
@asynccontextmanager
async def open_symbol_search(self) -> AsyncIterator[dict]:
async with self._brokerd_portal.open_context(
self.mod.open_symbol_search,
) as (ctx, cache):
async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]:
await stream.send(text)
return await stream.receive()
# deliver search func to consumer
self.search = search
yield search
self.search = None
def sym_to_shm_key(
broker: str,
@ -376,6 +404,39 @@ def sym_to_shm_key(
return f'{broker}.{symbol}'
# cache of brokernames to feeds
_cache: Dict[str, Feed] = {}
_cache_lock: trio.Lock = trio.Lock()
def get_multi_search() -> Callable[..., Awaitable]:
global _cache
async def multisearcher(
pattern: str,
) -> dict:
matches = {}
async def pack_matches(
brokername: str,
pattern: str,
) -> None:
matches[brokername] = await feed.search(pattern)
# TODO: make this an async stream?
async with trio.open_nursery() as n:
for (brokername, startup_sym), feed in _cache.items():
if feed.search:
n.start_soon(pack_matches, brokername, pattern)
return matches
return multisearcher
@asynccontextmanager
async def open_feed(
brokername: str,
@ -383,20 +444,34 @@ async def open_feed(
loglevel: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes.
"""
global _cache, _cache_lock
sym = symbols[0]
# TODO: feed cache locking, right now this is causing
# issues when reconncting to a long running emsd?
# async with _cache_lock:
# feed = _cache.get((brokername, sym))
# # if feed is not None and sym in feed.symbols:
# if feed is not None:
# yield feed
# # short circuit
# return
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# TODO: do all!
sym = symbols[0]
# TODO: compress these to one line with py3.9+
async with maybe_spawn_brokerd(brokername, loglevel=loglevel) as portal:
# no feed for broker exists so maybe spawn a data brokerd
async with maybe_spawn_brokerd(
brokername,
loglevel=loglevel
) as portal:
async with portal.open_stream_from(
@ -449,8 +524,11 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
_cache[(brokername, sym)] = feed
try:
yield feed
async with feed.open_symbol_search():
yield feed
finally:
# always cancel the far end producer task