Support loading multi-brokerds search at startup

symbol_search
Tyler Goodlet 2021-05-18 11:22:29 -04:00
parent fd8dc4f1a3
commit 1bd0ee8746
2 changed files with 115 additions and 36 deletions

View File

@ -384,6 +384,32 @@ def sym_to_shm_key(
return f'{broker}.{symbol}' return f'{broker}.{symbol}'
@asynccontextmanager
async def install_brokerd_search(
portal: tractor._portal.Portal,
brokermod: ModuleType,
) -> None:
async with portal.open_context(
brokermod.open_symbol_search
) as (ctx, cache):
# shield here since we expect the search rpc to be
# cancellable by the user as they see fit.
async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]:
await stream.send(text)
return await stream.receive()
async with _search.register_symbol_search(
provider_name=brokermod.name,
search_routine=search,
pause_period=brokermod._search_conf.get('pause_period'),
):
yield
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
brokername: str, brokername: str,
@ -475,22 +501,28 @@ async def open_feed(
yield feed yield feed
else: else:
async with feed._brokerd_portal.open_context( async with install_brokerd_search(
mod.open_symbol_search feed._brokerd_portal,
) as (ctx, cache): mod,
):
yield feed
# shield here since we expect the search rpc to be # async with feed._brokerd_portal.open_context(
# cancellable by the user as they see fit. # mod.open_symbol_search
async with ctx.open_stream() as stream: # ) as (ctx, cache):
async def search(text: str) -> Dict[str, Any]: # # shield here since we expect the search rpc to be
await stream.send(text) # # cancellable by the user as they see fit.
return await stream.receive() # async with ctx.open_stream() as stream:
async with _search.register_symbol_search( # async def search(text: str) -> Dict[str, Any]:
provider_name=brokername, # await stream.send(text)
search_routine=search, # return await stream.receive()
pause_period=mod._search_conf.get('pause_period'),
): # async with _search.register_symbol_search(
yield feed # provider_name=brokername,
# search_routine=search,
# pause_period=mod._search_conf.get('pause_period'),
# ):
# yield feed

View File

@ -22,6 +22,7 @@ import time
from typing import Tuple, Dict, Any, Optional, Callable from typing import Tuple, Dict, Any, Optional, Callable
from types import ModuleType from types import ModuleType
from functools import partial from functools import partial
from contextlib import AsyncExitStack
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
@ -30,6 +31,10 @@ import pyqtgraph as pg
import tractor import tractor
import trio import trio
from .._daemon import (
maybe_spawn_brokerd,
)
from ..brokers import get_brokermod
from ._axes import ( from ._axes import (
DynamicDateAxis, DynamicDateAxis,
PriceAxis, PriceAxis,
@ -67,6 +72,7 @@ from ._exec import run_qtractor, current_screen
from ._interaction import ChartView from ._interaction import ChartView
from .order_mode import start_order_mode from .order_mode import start_order_mode
from .. import fsp from .. import fsp
from ..data import feed
log = get_logger(__name__) log = get_logger(__name__)
@ -104,6 +110,19 @@ class ChartSpace(QtGui.QWidget):
self._root_n: Optional[trio.Nursery] = None self._root_n: Optional[trio.Nursery] = None
def set_chart_symbol(
self,
symbol_key: str, # of form <fqsn>.<providername>
linked_charts: 'LinkedSplitCharts', # type: ignore
) -> None:
self._chart_cache[symbol_key] = linked_charts
def get_chart_symbol(
self,
symbol_key: str,
) -> 'LinkedSplitCharts': # type: ignore
return self._chart_cache.get(symbol_key)
def init_timeframes_ui(self): def init_timeframes_ui(self):
self.tf_layout = QtGui.QHBoxLayout() self.tf_layout = QtGui.QHBoxLayout()
self.tf_layout.setSpacing(0) self.tf_layout.setSpacing(0)
@ -128,7 +147,7 @@ class ChartSpace(QtGui.QWidget):
def load_symbol( def load_symbol(
self, self,
brokername: str, providername: str,
symbol_key: str, symbol_key: str,
loglevel: str, loglevel: str,
ohlc: bool = True, ohlc: bool = True,
@ -142,7 +161,10 @@ class ChartSpace(QtGui.QWidget):
# our symbol key style is always lower case # our symbol key style is always lower case
symbol_key = symbol_key.lower() symbol_key = symbol_key.lower()
linkedcharts = self._chart_cache.get(symbol_key) # fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([symbol_key, providername])
linkedcharts = self.get_chart_symbol(fqsn)
if not self.vbox.isEmpty(): if not self.vbox.isEmpty():
# XXX: this is CRITICAL especially with pixel buffer caching # XXX: this is CRITICAL especially with pixel buffer caching
@ -162,13 +184,13 @@ class ChartSpace(QtGui.QWidget):
self._root_n.start_soon( self._root_n.start_soon(
chart_symbol, chart_symbol,
self, self,
brokername, providername,
symbol_key, symbol_key,
loglevel, loglevel,
) )
self.vbox.addWidget(linkedcharts) self.vbox.addWidget(linkedcharts)
self._chart_cache[symbol_key] = linkedcharts self.set_chart_symbol(fqsn, linkedcharts)
# chart is already in memory so just focus it # chart is already in memory so just focus it
if self.linkedcharts: if self.linkedcharts:
@ -1619,28 +1641,53 @@ async def _async_main(
# this internally starts a ``chart_symbol()`` task above # this internally starts a ``chart_symbol()`` task above
chart_app.load_symbol(brokernames[0], sym, loglevel) chart_app.load_symbol(brokernames[0], sym, loglevel)
async with _search.register_symbol_search( # TODO: seems like our incentive for brokerd caching lelel
backends = {}
provider_name='cache', async with AsyncExitStack() as stack:
search_routine=partial(
_search.search_simple_dict,
source=chart_app._chart_cache,
),
): # TODO: spawn these async in nursery.
async with open_key_stream(
search.bar,
) as key_stream:
# start kb handling task for searcher # load all requested brokerd's at startup and load their
root_n.start_soon( # search engines.
_search.handle_keyboard_input, for broker in brokernames:
# chart_app, portal = await stack.enter_async_context(
search, maybe_spawn_brokerd(
key_stream, broker,
loglevel=loglevel
)
) )
await trio.sleep_forever() backends[broker] = portal
await stack.enter_async_context(
feed.install_brokerd_search(
portal,
get_brokermod(broker),
)
)
async with _search.register_symbol_search(
provider_name='cache',
search_routine=partial(
_search.search_simple_dict,
source=chart_app._chart_cache,
),
):
async with open_key_stream(
search.bar,
) as key_stream:
# start kb handling task for searcher
root_n.start_soon(
_search.handle_keyboard_input,
# chart_app,
search,
key_stream,
)
await trio.sleep_forever()
def _main( def _main(