Merge pull request #168 from pikers/symbol_search

Symbol search
ems_hotfixes
goodboy 2021-05-28 14:10:15 -04:00 committed by GitHub
commit 2d7608cee9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1908 additions and 278 deletions

View File

@ -18,7 +18,6 @@
Structured, daemon tree service management. Structured, daemon tree service management.
""" """
from functools import partial
from typing import Optional, Union, Callable, Any from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from collections import defaultdict from collections import defaultdict
@ -72,7 +71,7 @@ class Services(BaseModel):
ctx, first = await self.ctx_stack.enter_async_context( ctx, first = await self.ctx_stack.enter_async_context(
portal.open_context( portal.open_context(
target, target,
**kwargs, **kwargs,
) )
) )
return ctx return ctx
@ -143,8 +142,14 @@ async def maybe_open_runtime(
Start the ``tractor`` runtime (a root actor) if none exists. Start the ``tractor`` runtime (a root actor) if none exists.
""" """
settings = _tractor_kwargs
settings.update(kwargs)
if not tractor.current_actor(err_on_no_runtime=False): if not tractor.current_actor(err_on_no_runtime=False):
async with tractor.open_root_actor(loglevel=loglevel, **kwargs): async with tractor.open_root_actor(
loglevel=loglevel,
**settings,
):
yield yield
else: else:
yield yield

View File

@ -32,6 +32,10 @@ class SymbolNotFound(BrokerError):
"Symbol not found by broker search" "Symbol not found by broker search"
class NoData(BrokerError):
"Symbol data not permitted"
def resproc( def resproc(
resp: asks.response_objects.Response, resp: asks.response_objects.Response,
log: logging.Logger, log: logging.Logger,

View File

@ -82,7 +82,7 @@ _ohlc_dtype = [
ohlc_dtype = np.dtype(_ohlc_dtype) ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = False _show_wap_in_history = False
_search_conf = {'pause_period': 0.375} _search_conf = {'pause_period': 0.0616}
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information # https://binance-docs.github.io/apidocs/spot/en/#exchange-information
@ -207,6 +207,25 @@ class Client:
return self._pairs return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = None,
) -> Dict[str, Any]:
if self._pairs is not None:
data = self._pairs
else:
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
data,
score_cutoff=50,
)
# repack in dict form
return {item[0]['symbol']: item[0]
for item in matches}
async def bars( async def bars(
self, self,
symbol: str, symbol: str,
@ -298,7 +317,7 @@ async def stream_messages(ws):
if cs.cancelled_caught: if cs.cancelled_caught:
timeouts += 1 timeouts += 1
if timeouts > 2: if timeouts > 10:
raise trio.TooSlowError("binance feed seems down?") raise trio.TooSlowError("binance feed seems down?")
continue continue

View File

@ -30,7 +30,7 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger from ..log import get_console_log, colorize_json, get_logger
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd
from ..brokers import core, get_brokermod, data from ..brokers import core, get_brokermod, data
log = get_logger('cli') log = get_logger('cli')
@ -50,7 +50,7 @@ def api(config, meth, kwargs, keys):
"""Make a broker-client API method call """Make a broker-client API method call
""" """
# global opts # global opts
broker = config['broker'] broker = config['brokers'][0]
_kwargs = {} _kwargs = {}
for kwarg in kwargs: for kwarg in kwargs:
@ -87,7 +87,7 @@ def quote(config, tickers, df_output):
"""Print symbol quotes to the console """Print symbol quotes to the console
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermod = config['brokermods'][0]
quotes = trio.run(partial(core.stocks_quote, brokermod, tickers)) quotes = trio.run(partial(core.stocks_quote, brokermod, tickers))
if not quotes: if not quotes:
@ -123,7 +123,7 @@ def bars(config, symbol, count, df_output):
"""Retreive 1m bars for symbol and print on the console """Retreive 1m bars for symbol and print on the console
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermod = config['brokermods'][0]
# broker backend should return at the least a # broker backend should return at the least a
# list of candle dictionaries # list of candle dictionaries
@ -159,7 +159,7 @@ def record(config, rate, name, dhost, filename):
"""Record client side quotes to a file on disk """Record client side quotes to a file on disk
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermod = config['brokermods'][0]
loglevel = config['loglevel'] loglevel = config['loglevel']
log = config['log'] log = config['log']
@ -222,7 +222,7 @@ def optsquote(config, symbol, df_output, date):
"""Retreive symbol option quotes on the console """Retreive symbol option quotes on the console
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermod = config['brokermods'][0]
quotes = trio.run( quotes = trio.run(
partial( partial(
@ -250,7 +250,7 @@ def symbol_info(config, tickers):
"""Print symbol quotes to the console """Print symbol quotes to the console
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermod = config['brokermods'][0]
quotes = trio.run(partial(core.symbol_info, brokermod, tickers)) quotes = trio.run(partial(core.symbol_info, brokermod, tickers))
if not quotes: if not quotes:
@ -273,13 +273,25 @@ def search(config, pattern):
"""Search for symbols from broker backend(s). """Search for symbols from broker backend(s).
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermods = config['brokermods']
quotes = tractor.run( # define tractor entrypoint
partial(core.symbol_search, brokermod, pattern), async def main(func):
start_method='forkserver',
loglevel='info', async with maybe_open_pikerd(
loglevel=config['loglevel'],
):
return await func()
quotes = trio.run(
main,
partial(
core.symbol_search,
brokermods,
pattern,
),
) )
if not quotes: if not quotes:
log.error(f"No matches could be found for {pattern}?") log.error(f"No matches could be found for {pattern}?")
return return

View File

@ -24,8 +24,12 @@ import inspect
from types import ModuleType from types import ModuleType
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
import trio
from ..log import get_logger from ..log import get_logger
from . import get_brokermod from . import get_brokermod
from .._daemon import maybe_spawn_brokerd
from .api import open_cached_client
log = get_logger(__name__) log = get_logger(__name__)
@ -126,13 +130,41 @@ async def symbol_info(
return await client.symbol_info(symbol, **kwargs) return await client.symbol_info(symbol, **kwargs)
async def search_w_brokerd(name: str, pattern: str) -> dict:
async with open_cached_client(name) as client:
# TODO: support multiple asset type concurrent searches.
return await client.search_symbols(pattern=pattern)
async def symbol_search( async def symbol_search(
brokermod: ModuleType, brokermods: list[ModuleType],
pattern: str, pattern: str,
**kwargs, **kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]: ) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return symbol info from broker. """Return symbol info from broker.
""" """
async with brokermod.get_client() as client: results = []
# TODO: support multiple asset type concurrent searches.
return await client.search_stocks(pattern=pattern, **kwargs) async def search_backend(brokername: str) -> None:
async with maybe_spawn_brokerd(
brokername,
) as portal:
results.append((
brokername,
await portal.run(
search_w_brokerd,
name=brokername,
pattern=pattern,
),
))
async with trio.open_nursery() as n:
for mod in brokermods:
n.start_soon(search_backend, mod.name)
return results

View File

@ -45,12 +45,14 @@ from ib_insync.objects import Position
import ib_insync as ibis import ib_insync as ibis
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
from fuzzywuzzy import process as fuzzy
import numpy as np
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df from ..data._source import from_df
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound from ._util import SymbolNotFound, NoData
log = get_logger(__name__) log = get_logger(__name__)
@ -87,7 +89,15 @@ _time_frames = {
'Y': 'OneYear', 'Y': 'OneYear',
} }
_show_wap_in_history = False _show_wap_in_history: bool = False
# optional search config the backend can register for
# it's symbol search handling (in this case we avoid
# accepting patterns before the kb has settled more then
# a quarter second).
_search_conf = {
'pause_period': 6/16,
}
# overrides to sidestep pretty questionable design decisions in # overrides to sidestep pretty questionable design decisions in
@ -141,11 +151,39 @@ class NonShittyIB(ibis.IB):
# map of symbols to contract ids # map of symbols to contract ids
_adhoc_cmdty_data_map = { _adhoc_cmdty_data_map = {
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
# NOTE: cmdtys don't have trade data:
# NOTE: some cmdtys/metals don't have trade data like gold/usd:
# https://groups.io/g/twsapi/message/44174 # https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
} }
_adhoc_futes_set = {
# equities
'nq.globex',
'mnq.globex',
'es.globex',
'mes.globex',
# cypto$
'brr.cmecrypto',
'ethusdrr.cmecrypto',
# metals
'xauusd.cmdty',
}
# exchanges we don't support at the moment due to not knowing
# how to do symbol-contract lookup correctly likely due
# to not having the data feeds subscribed.
_exch_skip_list = {
'ASX', # aussie stocks
'MEXI', # mexican stocks
'VALUE', # no idea
}
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
_enters = 0 _enters = 0
@ -245,27 +283,45 @@ class Client:
""" """
descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
futs = [] if descriptions is not None:
for d in descriptions:
con = d.contract
futs.append(self.ib.reqContractDetailsAsync(con))
# batch request all details futs = []
results = await asyncio.gather(*futs) for d in descriptions:
# XXX: if there is more then one entry in the details list
details = {}
for details_set in results:
# then the contract is so called "ambiguous".
for d in details_set:
con = d.contract con = d.contract
unique_sym = f'{con.symbol}.{con.primaryExchange}' if con.primaryExchange not in _exch_skip_list:
details[unique_sym] = asdict(d) if asdicts else d futs.append(self.ib.reqContractDetailsAsync(con))
if len(details) == upto: # batch request all details
return details results = await asyncio.gather(*futs)
return details # XXX: if there is more then one entry in the details list
details = {}
for details_set in results:
# then the contract is so called "ambiguous".
for d in details_set:
con = d.contract
unique_sym = f'{con.symbol}.{con.primaryExchange}'
details[unique_sym] = asdict(d) if asdicts else d
if len(details) == upto:
return details
return details
else:
return {}
async def search_symbols(
self,
pattern: str,
# how many contracts to search "up to"
upto: int = 3,
asdicts: bool = True,
) -> Dict[str, ContractDetails]:
# TODO add search though our adhoc-locally defined symbol set
# for futes/cmdtys/
return await self.search_stocks(pattern, upto, asdicts)
async def search_futes( async def search_futes(
self, self,
@ -318,7 +374,7 @@ class Client:
sym, exch = symbol.upper().rsplit('.', maxsplit=1) sym, exch = symbol.upper().rsplit('.', maxsplit=1)
except ValueError: except ValueError:
# likely there's an embedded `.` for a forex pair # likely there's an embedded `.` for a forex pair
await tractor.breakpoint() breakpoint()
# futes # futes
if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'):
@ -346,10 +402,13 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD' currency = 'CAD'
if exch in ('PURE', 'TSE'): # stupid ib...
# stupid ib... primaryExchange = exch
primaryExchange = exch exch = 'SMART'
exch = 'SMART'
else:
exch = 'SMART'
primaryExchange = exch
con = ibis.Stock( con = ibis.Stock(
symbol=sym, symbol=sym,
@ -562,7 +621,7 @@ class Client:
# default config ports # default config ports
_tws_port: int = 7497 _tws_port: int = 7497
_gw_port: int = 4002 _gw_port: int = 4002
_try_ports = [_tws_port, _gw_port] _try_ports = [_gw_port, _tws_port]
_client_ids = itertools.count() _client_ids = itertools.count()
_client_cache = {} _client_cache = {}
@ -641,6 +700,8 @@ async def _aio_run_client_method(
if to_trio and 'to_trio' in args: if to_trio and 'to_trio' in args:
kwargs['to_trio'] = to_trio kwargs['to_trio'] = to_trio
log.runtime(f'Running {meth}({kwargs})')
return await async_meth(**kwargs) return await async_meth(**kwargs)
@ -777,13 +838,76 @@ def normalize(
return data return data
async def get_bars(
sym: str,
end_dt: str = "",
) -> (dict, np.ndarray):
_err = None
fails = 0
for _ in range(2):
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
end_dt=end_dt,
)
if bars_array is None:
raise SymbolNotFound(sym)
next_dt = bars[0].date
return (bars, bars_array, next_dt), fails
except RequestError as err:
_err = err
# TODO: retreive underlying ``ib_insync`` error?
if err.code == 162:
if 'HMDS query returned no data' in err.message:
# means we hit some kind of historical "dead zone"
# and further requests seem to always cause
# throttling despite the rps being low
break
elif 'No market data permissions for' in err.message:
# TODO: signalling for no permissions searches
raise NoData(f'Symbol: {sym}')
break
else:
log.exception(
"Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
# TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here
# that restarts/resumes this task?
await tractor.breakpoint()
fails += 1
continue
return (None, None)
# else: # throttle wasn't fixed so error out immediately
# raise _err
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun underlying buffer # count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer count: int = 6, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
@ -791,10 +915,7 @@ async def backfill_bars(
https://github.com/pikers/piker/issues/128 https://github.com/pikers/piker/issues/128
""" """
first_bars, bars_array = await _trio_run_client_method( (first_bars, bars_array, next_dt), fails = await get_bars(sym)
method='bars',
symbol=sym,
)
# write historical data to buffer # write historical data to buffer
shm.push(bars_array) shm.push(bars_array)
@ -803,46 +924,24 @@ async def backfill_bars(
task_status.started(cs) task_status.started(cs)
next_dt = first_bars[0].date
i = 0 i = 0
while i < count: while i < count:
try: out, fails = await get_bars(sym, end_dt=next_dt)
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
end_dt=next_dt,
)
if bars_array is None: if fails is None or fails > 1:
raise SymbolNotFound(sym) break
shm.push(bars_array, prepend=True) if out is (None, None):
i += 1 # could be trying to retreive bars over weekend
next_dt = bars[0].date # TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
log.error(f"Can't grab bars starting at {next_dt}!?!?")
continue
except RequestError as err: bars, bars_array, next_dt = out
# TODO: retreive underlying ``ib_insync`` error? shm.push(bars_array, prepend=True)
i += 1
if err.code == 162:
if 'HMDS query returned no data' in err.message:
# means we hit some kind of historical "dead zone"
# and further requests seem to always cause
# throttling despite the rps being low
break
else:
log.exception(
"Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
# TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here
# that restarts/resumes this task?
await tractor.breakpoint()
asset_type_map = { asset_type_map = {
@ -990,23 +1089,31 @@ async def stream_quotes(
} }
} }
con = first_ticker.contract
# should be real volume for this contract by default
calc_price = False
# check for special contract types # check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange'
# should be real volume for this contract suffix = con.primaryExchange
calc_price = False if not suffix:
suffix = con.exchange
else: else:
# commodities and forex don't have an exchange name and # commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = 'secType' suffix = con.secType
# no real volume on this tract
calc_price = True calc_price = True
# pass first quote asap
quote = normalize(first_ticker, calc_price=calc_price) quote = normalize(first_ticker, calc_price=calc_price)
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic quote['symbol'] = topic
# pass first quote asap
first_quote = {topic: quote} first_quote = {topic: quote}
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
@ -1018,50 +1125,53 @@ async def stream_quotes(
task_status.started((init_msgs, first_quote)) task_status.started((init_msgs, first_quote))
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange' # suffix = 'exchange'
calc_price = False # should be real volume for contract # calc_price = False # should be real volume for contract
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
async with aclosing(stream): while True:
async for ticker in stream: ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if not calc_price and not ticker.rtTime:
# spin consuming tickers until we get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
# tell caller quotes are now coming in live
feed_is_live.set()
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic
await send_chan.send({topic: quote})
# for a real volume contract we rait for the first
# "real" trade to take place
if not calc_price and not ticker.rtTime:
# spin consuming tickers until we get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
# tell caller quotes are now coming in live
feed_is_live.set()
# last = time.time()
async with aclosing(stream):
async for ticker in stream:
# print(f'ticker rate: {1/(time.time() - last)}')
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
)
# con = quote['contract']
# topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic
await send_chan.send({topic: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()
def pack_position(pos: Position) -> Dict[str, Any]: def pack_position(pos: Position) -> Dict[str, Any]:
con = pos.contract con = pos.contract
@ -1179,3 +1289,63 @@ async def stream_trades(
continue continue
yield {'local_trades': (event_name, msg)} yield {'local_trades': (event_name, msg)}
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> None:
# async with open_cached_client('ib') as client:
# load all symbols locally for fast search
await ctx.started({})
async with ctx.open_stream() as stream:
last = time.time()
async for pattern in stream:
log.debug(f'received {pattern}')
now = time.time()
assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz
diff = now - last
if diff < 1.0:
log.debug('throttle sleeping')
await trio.sleep(diff)
try:
pattern = stream.receive_nowait()
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
continue
log.debug(f'searching for {pattern}')
# await tractor.breakpoint()
last = time.time()
results = await _trio_run_client_method(
method='search_stocks',
pattern=pattern,
upto=5,
)
log.debug(f'got results {results.keys()}')
log.debug("fuzzy matching")
matches = fuzzy.extractBests(
pattern,
results,
score_cutoff=50,
)
matches = {item[2]: item[0] for item in matches}
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)

View File

@ -21,7 +21,7 @@ Kraken backend.
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field from dataclasses import asdict, field
from types import ModuleType from types import ModuleType
from typing import List, Dict, Any, Tuple from typing import List, Dict, Any, Tuple, Optional
import json import json
import time import time
@ -37,6 +37,7 @@ from trio_websocket._impl import (
import arrow import arrow
import asks import asks
from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import trio import trio
import tractor import tractor
@ -56,6 +57,11 @@ log = get_logger(__name__)
_url = 'https://api.kraken.com/0' _url = 'https://api.kraken.com/0'
_search_conf = {
'pause_period': 0.0616
}
# Broker specific ohlc schema which includes a vwap field # Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [ _ohlc_dtype = [
('index', int), ('index', int),
@ -147,6 +153,17 @@ class Client:
'User-Agent': 'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}) })
self._pairs: list[str] = []
@property
def pairs(self) -> Dict[str, Any]:
if self._pairs is None:
raise RuntimeError(
"Make sure to run `cache_symbols()` on startup!"
)
# retreive and cache all symbols
return self._pairs
async def _public( async def _public(
self, self,
@ -162,14 +179,51 @@ class Client:
async def symbol_info( async def symbol_info(
self, self,
pair: str = 'all', pair: Optional[str] = None,
): ):
resp = await self._public('AssetPairs', {'pair': pair}) if pair is not None:
pairs = {'pair': pair}
else:
pairs = None # get all pairs
resp = await self._public('AssetPairs', pairs)
err = resp['error'] err = resp['error']
if err: if err:
raise BrokerError(err) raise BrokerError(err)
true_pair_key, data = next(iter(resp['result'].items()))
return data pairs = resp['result']
if pair is not None:
_, data = next(iter(pairs.items()))
return data
else:
return pairs
async def cache_symbols(
self,
) -> dict:
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = None,
) -> Dict[str, Any]:
if self._pairs is not None:
data = self._pairs
else:
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
data,
score_cutoff=50,
)
# repack in dict form
return {item[0]['altname']: item[0] for item in matches}
async def bars( async def bars(
self, self,
@ -182,6 +236,7 @@ class Client:
if since is None: if since is None:
since = arrow.utcnow().floor('minute').shift( since = arrow.utcnow().floor('minute').shift(
minutes=-count).timestamp() minutes=-count).timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value # UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since)) since = str(max(1499000000, since))
json = await self._public( json = await self._public(
@ -232,7 +287,12 @@ class Client:
@asynccontextmanager @asynccontextmanager
async def get_client() -> Client: async def get_client() -> Client:
yield Client() client = Client()
# at startup, load all symbols locally for fast search
await client.cache_symbols()
yield client
async def stream_messages(ws): async def stream_messages(ws):
@ -249,7 +309,7 @@ async def stream_messages(ws):
too_slow_count += 1 too_slow_count += 1
if too_slow_count > 10: if too_slow_count > 20:
log.warning( log.warning(
"Heartbeat is too slow, resetting ws connection") "Heartbeat is too slow, resetting ws connection")
@ -317,7 +377,8 @@ def normalize(
# seriously eh? what's with this non-symmetry everywhere # seriously eh? what's with this non-symmetry everywhere
# in subscription systems... # in subscription systems...
topic = quote['pair'].replace('/', '') # XXX: piker style is always lowercases symbols.
topic = quote['pair'].replace('/', '').lower()
# print(quote) # print(quote)
return topic, quote return topic, quote
@ -368,10 +429,13 @@ class AutoReconWs:
self, self,
tries: int = 10000, tries: int = 10000,
) -> None: ) -> None:
try: while True:
await self._stack.aclose() try:
except (DisconnectionTimeout, RuntimeError): await self._stack.aclose()
await trio.sleep(1) except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
else:
break
last_err = None last_err = None
for i in range(tries): for i in range(tries):
@ -430,12 +494,12 @@ async def open_autorecon_ws(url):
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
""" """
@ -475,6 +539,10 @@ async def stream_quotes(
# keep client cached for real-time section # keep client cached for real-time section
for sym in symbols: for sym in symbols:
# transform to upper since piker style is always lower
sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict() syminfo = si.dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
@ -482,7 +550,7 @@ async def stream_quotes(
sym_infos[sym] = syminfo sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname ws_pairs[sym] = si.wsname
symbol = symbols[0] symbol = symbols[0].lower()
init_msgs = { init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
@ -570,8 +638,34 @@ async def stream_quotes(
elif typ == 'l1': elif typ == 'l1':
quote = ohlc quote = ohlc
topic = quote['symbol'] topic = quote['symbol'].lower()
# XXX: format required by ``tractor.msg.pub`` # XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]`` # requires a ``Dict[topic: str, quote: dict]``
await send_chan.send({topic: quote}) await send_chan.send({topic: quote})
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('kraken') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await ctx.started(cache)
async with ctx.open_stream() as stream:
async for pattern in stream:
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send(
{item[0]['altname']: item[0]
for item in matches}
)

View File

@ -628,7 +628,7 @@ class Client:
f"Took {time.time() - start} seconds to retreive {len(bars)} bars") f"Took {time.time() - start} seconds to retreive {len(bars)} bars")
return bars return bars
async def search_stocks( async def search_symbols(
self, self,
pattern: str, pattern: str,
# how many contracts to return # how many contracts to return

View File

@ -136,7 +136,6 @@ def get_orders(
return _orders return _orders
# TODO: make this a ``tractor.msg.pub``
async def send_order_cmds(symbol_key: str): async def send_order_cmds(symbol_key: str):
""" """
Order streaming task: deliver orders transmitted from UI Order streaming task: deliver orders transmitted from UI

View File

@ -57,21 +57,31 @@ def pikerd(loglevel, host, tl, pdb):
@click.group(context_settings=_context_defaults) @click.group(context_settings=_context_defaults)
@click.option('--broker', '-b', default=DEFAULT_BROKER, @click.option(
help='Broker backend to use') '--brokers', '-b',
default=[DEFAULT_BROKER],
multiple=True,
help='Broker backend to use'
)
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory') @click.option('--configdir', '-c', help='Configuration directory')
@click.pass_context @click.pass_context
def cli(ctx, broker, loglevel, tl, configdir): def cli(ctx, brokers, loglevel, tl, configdir):
if configdir is not None: if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir) config._override_config_dir(configdir)
ctx.ensure_object(dict) ctx.ensure_object(dict)
if len(brokers) == 1:
brokermods = [get_brokermod(brokers[0])]
else:
brokermods = [get_brokermod(broker) for broker in brokers]
ctx.obj.update({ ctx.obj.update({
'broker': broker, 'brokers': brokers,
'brokermod': get_brokermod(broker), 'brokermods': brokermods,
'loglevel': loglevel, 'loglevel': loglevel,
'tractorloglevel': None, 'tractorloglevel': None,
'log': get_console_log(loglevel), 'log': get_console_log(loglevel),

View File

@ -227,7 +227,8 @@ async def sample_and_broadcast(
# end up triggering backpressure which which will # end up triggering backpressure which which will
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
subs = bus._subscribers[sym] subs = bus._subscribers[sym.lower()]
for ctx in subs: for ctx in subs:
# print(f'sub is {ctx.chan.uid}') # print(f'sub is {ctx.chan.uid}')
try: try:

View File

@ -17,6 +17,8 @@
""" """
Data feed apis and infra. Data feed apis and infra.
This module is enabled for ``brokerd`` daemons.
""" """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@ -25,7 +27,7 @@ from types import ModuleType
from typing import ( from typing import (
Dict, Any, Sequence, Dict, Any, Sequence,
AsyncIterator, Optional, AsyncIterator, Optional,
List List, Awaitable, Callable,
) )
import trio import trio
@ -43,7 +45,9 @@ from ._sharedmem import (
attach_shm_array, attach_shm_array,
ShmArray, ShmArray,
) )
from .ingest import get_ingestormod
from ._source import base_iohlc_dtype, Symbol from ._source import base_iohlc_dtype, Symbol
from ..ui import _search
from ._sampling import ( from ._sampling import (
_shms, _shms,
_incrementers, _incrementers,
@ -51,7 +55,6 @@ from ._sampling import (
iter_ohlc_periods, iter_ohlc_periods,
sample_and_broadcast, sample_and_broadcast,
) )
from .ingest import get_ingestormod
log = get_logger(__name__) log = get_logger(__name__)
@ -145,12 +148,14 @@ async def _setup_persistent_brokerd(
async def allocate_persistent_feed( async def allocate_persistent_feed(
ctx: tractor.Context, ctx: tractor.Context,
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
try: try:
@ -172,7 +177,7 @@ async def allocate_persistent_feed(
) )
# do history validation? # do history validation?
assert opened, f'Persistent shm for {symbol} was already open?!' # assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened: # if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!") # raise RuntimeError("Persistent shm for sym was already open?!")
@ -198,10 +203,12 @@ async def allocate_persistent_feed(
# TODO: make this into a composed type which also # TODO: make this into a composed type which also
# contains the backfiller cs for individual super-based # contains the backfiller cs for individual super-based
# resspawns when needed. # resspawns when needed.
bus.feeds[symbol] = (cs, init_msg, first_quote)
# XXX: the ``symbol`` here is put into our native piker format (i.e.
# lower case).
bus.feeds[symbol.lower()] = (cs, init_msg, first_quote)
if opened: if opened:
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
@ -235,13 +242,14 @@ async def allocate_persistent_feed(
@tractor.stream @tractor.stream
async def attach_feed_bus( async def attach_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
) -> None: ) -> None:
# try:
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
@ -252,26 +260,33 @@ async def attach_feed_bus(
assert 'brokerd' in tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
sub_only: bool = False
entry = bus.feeds.get(symbol)
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
async with bus.task_lock: async with bus.task_lock:
task_cs = bus.feeds.get(symbol) if entry is None:
sub_only: bool = False
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
if task_cs is None:
init_msg, first_quote = await bus.nursery.start( init_msg, first_quote = await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
ctx=ctx, ctx=ctx,
bus=bus, bus=bus,
brokername=brokername, brokername=brokername,
# here we pass through the selected symbol in native
# "format" (i.e. upper vs. lowercase depending on
# provider).
symbol=symbol, symbol=symbol,
loglevel=loglevel, loglevel=loglevel,
) )
) )
bus._subscribers.setdefault(symbol, []).append(ctx) bus._subscribers.setdefault(symbol, []).append(ctx)
assert isinstance(bus.feeds[symbol], tuple)
else: else:
sub_only = True sub_only = True
@ -313,6 +328,8 @@ class Feed:
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
_max_sample_rate: int = 0 _max_sample_rate: int = 0
search: Callable[..., Awaitable] = None
# cache of symbol info messages received as first message when # cache of symbol info messages received as first message when
# a stream startsc. # a stream startsc.
symbols: Dict[str, Symbol] = field(default_factory=dict) symbols: Dict[str, Symbol] = field(default_factory=dict)
@ -335,6 +352,7 @@ class Feed:
iter_ohlc_periods, iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate, delay_s=delay_s or self._max_sample_rate,
) as self._index_stream: ) as self._index_stream:
yield self._index_stream yield self._index_stream
else: else:
yield self._index_stream yield self._index_stream
@ -364,6 +382,7 @@ class Feed:
# more then one? # more then one?
topics=['local_trades'], topics=['local_trades'],
) as self._trade_stream: ) as self._trade_stream:
yield self._trade_stream yield self._trade_stream
else: else:
yield self._trade_stream yield self._trade_stream
@ -376,27 +395,78 @@ def sym_to_shm_key(
return f'{broker}.{symbol}' return f'{broker}.{symbol}'
@asynccontextmanager
async def install_brokerd_search(
portal: tractor._portal.Portal,
brokermod: ModuleType,
) -> None:
async with portal.open_context(
brokermod.open_symbol_search
) as (ctx, cache):
# shield here since we expect the search rpc to be
# cancellable by the user as they see fit.
async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]:
await stream.send(text)
return await stream.receive()
async with _search.register_symbol_search(
provider_name=brokermod.name,
search_routine=search,
# TODO: should be make this a required attr of
# a backend module?
pause_period=getattr(
brokermod, '_search_conf', {}
).get('pause_period', 0.0616),
):
yield
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
brokername: str, brokername: str,
symbols: Sequence[str], symbols: Sequence[str],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes. '''
""" Open a "data feed" which provides streamed real-time quotes.
'''
sym = symbols[0].lower()
# TODO: feed cache locking, right now this is causing
# issues when reconnecting to a long running emsd?
# global _searcher_cache
# async with _cache_lock:
# feed = _searcher_cache.get((brokername, sym))
# # if feed is not None and sym in feed.symbols:
# if feed is not None:
# yield feed
# # short circuit
# return
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
except ImportError: except ImportError:
mod = get_ingestormod(brokername) mod = get_ingestormod(brokername)
if loglevel is None: # no feed for broker exists so maybe spawn a data brokerd
loglevel = tractor.current_actor().loglevel
# TODO: do all! async with maybe_spawn_brokerd(
sym = symbols[0] brokername,
loglevel=loglevel
# TODO: compress these to one line with py3.9+ ) as portal:
async with maybe_spawn_brokerd(brokername, loglevel=loglevel) as portal:
async with portal.open_stream_from( async with portal.open_stream_from(
@ -449,10 +519,4 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
try: yield feed
yield feed
finally:
# always cancel the far end producer task
with trio.CancelScope(shield=True):
await stream.aclose()

View File

@ -19,17 +19,22 @@ High level Qt chart widgets.
""" """
import time import time
from contextlib import AsyncExitStack
from typing import Tuple, Dict, Any, Optional, Callable from typing import Tuple, Dict, Any, Optional, Callable
from types import ModuleType from types import ModuleType
from functools import partial from functools import partial
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import Qt
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus
from .._daemon import (
maybe_spawn_brokerd,
)
from ..brokers import get_brokermod
from ._axes import ( from ._axes import (
DynamicDateAxis, DynamicDateAxis,
PriceAxis, PriceAxis,
@ -55,16 +60,19 @@ from ._style import (
_bars_from_right_in_follow_mode, _bars_from_right_in_follow_mode,
_bars_to_left_in_follow_mode, _bars_to_left_in_follow_mode,
) )
from . import _search
from ._event import open_key_stream
from ..data._source import Symbol from ..data._source import Symbol
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ..data import maybe_open_shm_array
from .. import brokers from .. import brokers
from .. import data from .. import data
from ..data import maybe_open_shm_array
from ..log import get_logger from ..log import get_logger
from ._exec import run_qtractor, current_screen from ._exec import run_qtractor, current_screen
from ._interaction import ChartView from ._interaction import ChartView
from .order_mode import start_order_mode from .order_mode import start_order_mode
from .. import fsp from .. import fsp
from ..data import feed
log = get_logger(__name__) log = get_logger(__name__)
@ -78,36 +86,44 @@ class ChartSpace(QtGui.QWidget):
def __init__(self, parent=None): def __init__(self, parent=None):
super().__init__(parent) super().__init__(parent)
self.v_layout = QtGui.QVBoxLayout(self) self.hbox = QtGui.QHBoxLayout(self)
self.v_layout.setContentsMargins(0, 0, 0, 0) self.hbox.setContentsMargins(0, 0, 0, 0)
self.v_layout.setSpacing(0) self.hbox.setSpacing(2)
self.vbox = QtGui.QVBoxLayout()
self.vbox.setContentsMargins(0, 0, 0, 0)
self.vbox.setSpacing(2)
self.hbox.addLayout(self.vbox)
self.toolbar_layout = QtGui.QHBoxLayout() self.toolbar_layout = QtGui.QHBoxLayout()
self.toolbar_layout.setContentsMargins(0, 0, 0, 0) self.toolbar_layout.setContentsMargins(0, 0, 0, 0)
self.h_layout = QtGui.QHBoxLayout()
self.h_layout.setContentsMargins(0, 0, 0, 0)
# self.init_timeframes_ui() # self.init_timeframes_ui()
# self.init_strategy_ui() # self.init_strategy_ui()
self.v_layout.addLayout(self.toolbar_layout) self.vbox.addLayout(self.toolbar_layout)
self.v_layout.addLayout(self.h_layout) # self.vbox.addLayout(self.hbox)
self._chart_cache = {} self._chart_cache = {}
self.symbol_label: Optional[QtGui.QLabel] = None self.linkedcharts: 'LinkedSplitCharts' = None
self._root_n: Optional[trio.Nursery] = None
def init_search(self): def set_chart_symbol(
self.symbol_label = label = QtGui.QLabel() self,
label.setTextFormat(3) # markdown symbol_key: str, # of form <fqsn>.<providername>
label.setFont(_font.font) linked_charts: 'LinkedSplitCharts', # type: ignore
label.setMargin(0)
# title = f'sym: {self.symbol}'
# label.setText(title)
label.setAlignment( ) -> None:
QtCore.Qt.AlignVCenter # re-sort org cache symbol list in LIFO order
| QtCore.Qt.AlignLeft cache = self._chart_cache
) cache.pop(symbol_key, None)
self.v_layout.addWidget(label) cache[symbol_key] = linked_charts
def get_chart_symbol(
self,
symbol_key: str,
) -> 'LinkedSplitCharts': # type: ignore
return self._chart_cache.get(symbol_key)
def init_timeframes_ui(self): def init_timeframes_ui(self):
self.tf_layout = QtGui.QHBoxLayout() self.tf_layout = QtGui.QHBoxLayout()
@ -130,42 +146,73 @@ class ChartSpace(QtGui.QWidget):
# def init_strategy_ui(self): # def init_strategy_ui(self):
# self.strategy_box = StrategyBoxWidget(self) # self.strategy_box = StrategyBoxWidget(self)
# self.toolbar_layout.addWidget(self.strategy_box) # self.toolbar_layout.addWidget(self.strategy_box)
def load_symbol( def load_symbol(
self, self,
brokername: str, providername: str,
symbol_key: str, symbol_key: str,
data: np.ndarray, loglevel: str,
ohlc: bool = True, ohlc: bool = True,
reset: bool = False,
) -> None: ) -> None:
"""Load a new contract into the charting app. """Load a new contract into the charting app.
Expects a ``numpy`` structured array containing all the ohlcv fields. Expects a ``numpy`` structured array containing all the ohlcv fields.
"""
# TODO: symbol search
# # of course this doesn't work :eyeroll:
# h = _font.boundingRect('Ag').height()
# print(f'HEIGHT {h}')
# self.symbol_label.setFixedHeight(h + 4)
# self.v_layout.update()
# self.symbol_label.setText(f'/`{symbol}`')
linkedcharts = self._chart_cache.setdefault( """
symbol_key, # our symbol key style is always lower case
LinkedSplitCharts(self) symbol_key = symbol_key.lower()
)
# fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([symbol_key, providername])
linkedcharts = self.get_chart_symbol(fqsn)
if not self.vbox.isEmpty():
# XXX: this is CRITICAL especially with pixel buffer caching
self.linkedcharts.hide()
# XXX: pretty sure we don't need this
# remove any existing plots?
# XXX: ahh we might want to support cache unloading..
self.vbox.removeWidget(self.linkedcharts)
# switching to a new viewable chart
if linkedcharts is None or reset:
# we must load a fresh linked charts set
linkedcharts = LinkedSplitCharts(self)
# spawn new task to start up and update new sub-chart instances
self._root_n.start_soon(
chart_symbol,
self,
providername,
symbol_key,
loglevel,
)
self.set_chart_symbol(fqsn, linkedcharts)
self.vbox.addWidget(linkedcharts)
# chart is already in memory so just focus it
if self.linkedcharts:
self.linkedcharts.unfocus()
# self.vbox.addWidget(linkedcharts)
linkedcharts.show()
linkedcharts.focus()
self.linkedcharts = linkedcharts self.linkedcharts = linkedcharts
# remove any existing plots symbol = linkedcharts.symbol
if not self.v_layout.isEmpty():
self.v_layout.removeWidget(linkedcharts)
self.v_layout.addWidget(linkedcharts) if symbol is not None:
self.window.setWindowTitle(
return linkedcharts f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
# TODO: add signalling painter system )
# def add_signals(self):
# self.chart.add_signals()
class LinkedSplitCharts(QtGui.QWidget): class LinkedSplitCharts(QtGui.QWidget):
@ -184,8 +231,10 @@ class LinkedSplitCharts(QtGui.QWidget):
zoomIsDisabled = QtCore.pyqtSignal(bool) zoomIsDisabled = QtCore.pyqtSignal(bool)
def __init__( def __init__(
self, self,
chart_space: ChartSpace, chart_space: ChartSpace,
) -> None: ) -> None:
super().__init__() super().__init__()
self.signals_visible: bool = False self.signals_visible: bool = False
@ -194,6 +243,8 @@ class LinkedSplitCharts(QtGui.QWidget):
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
self.chart_space = chart_space self.chart_space = chart_space
self.chart_space = chart_space
self.xaxis = DynamicDateAxis( self.xaxis = DynamicDateAxis(
orientation='bottom', orientation='bottom',
linked_charts=self linked_charts=self
@ -232,6 +283,14 @@ class LinkedSplitCharts(QtGui.QWidget):
sizes.extend([min_h_ind] * len(self.subplots)) sizes.extend([min_h_ind] * len(self.subplots))
self.splitter.setSizes(sizes) # , int(self.height()*0.2) self.splitter.setSizes(sizes) # , int(self.height()*0.2)
def focus(self) -> None:
if self.chart is not None:
self.chart.focus()
def unfocus(self) -> None:
if self.chart is not None:
self.chart.clearFocus()
def plot_ohlc_main( def plot_ohlc_main(
self, self,
symbol: Symbol, symbol: Symbol,
@ -250,7 +309,7 @@ class LinkedSplitCharts(QtGui.QWidget):
self.chart = self.add_plot( self.chart = self.add_plot(
name=symbol.key, name=symbol.key,
array=array, array=array,
xaxis=self.xaxis, # xaxis=self.xaxis,
style=style, style=style,
_is_main=True, _is_main=True,
) )
@ -447,6 +506,10 @@ class ChartPlotWidget(pg.PlotWidget):
# for when the splitter(s) are resized # for when the splitter(s) are resized
self._vb.sigResized.connect(self._set_yrange) self._vb.sigResized.connect(self._set_yrange)
def focus(self) -> None:
# self.setFocus()
self._vb.setFocus()
def last_bar_in_view(self) -> int: def last_bar_in_view(self) -> int:
self._ohlc[-1]['index'] self._ohlc[-1]['index']
@ -1127,6 +1190,9 @@ async def spawn_fsps(
Pass target entrypoint and historical data. Pass target entrypoint and historical data.
""" """
linked_charts.focus()
# spawns sub-processes which execute cpu bound FSP code # spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery(loglevel=loglevel) as n: async with tractor.open_nursery(loglevel=loglevel) as n:
@ -1159,7 +1225,7 @@ async def spawn_fsps(
# XXX: fsp may have been opened by a duplicate chart. Error for # XXX: fsp may have been opened by a duplicate chart. Error for
# now until we figure out how to wrap fsps as "feeds". # now until we figure out how to wrap fsps as "feeds".
assert opened, f"A chart for {key} likely already exists?" # assert opened, f"A chart for {key} likely already exists?"
conf['shm'] = shm conf['shm'] = shm
@ -1219,9 +1285,6 @@ async def run_fsp(
# data-array as first msg # data-array as first msg
_ = await stream.receive() _ = await stream.receive()
conf['stream'] = stream
conf['portal'] = portal
shm = conf['shm'] shm = conf['shm']
if conf.get('overlay'): if conf.get('overlay'):
@ -1253,16 +1316,27 @@ async def run_fsp(
# fsp_func_name # fsp_func_name
) )
# read last value # XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linked_charts.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value
array = shm.array array = shm.array
value = array[fsp_func_name][-1] value = array[fsp_func_name][-1]
last_val_sticky = chart._ysticks[chart.name]
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array) chart._lc.focus()
chart._shm = shm # works also for overlays in which case data is looked up from
# internal chart array set....
chart.update_curve_from_array(fsp_func_name, shm.array)
# TODO: figure out if we can roll our own `FillToThreshold` to # TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions. # get brush filled polygons for OS/OB conditions.
@ -1284,8 +1358,6 @@ async def run_fsp(
chart._set_yrange() chart._set_yrange()
stream = conf['stream']
last = time.time() last = time.time()
# update chart graphics # update chart graphics
@ -1306,7 +1378,6 @@ async def run_fsp(
# re-compute steps. # re-compute steps.
read_tries = 2 read_tries = 2
while read_tries > 0: while read_tries > 0:
try: try:
# read last # read last
array = shm.array array = shm.array
@ -1387,7 +1458,6 @@ async def chart_symbol(
brokername: str, brokername: str,
sym: str, sym: str,
loglevel: str, loglevel: str,
task_status: TaskStatus[Symbol] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Spawn a real-time chart widget for this symbol and app session. """Spawn a real-time chart widget for this symbol and app session.
@ -1408,19 +1478,15 @@ async def chart_symbol(
bars = ohlcv.array bars = ohlcv.array
symbol = feed.symbols[sym] symbol = feed.symbols[sym]
task_status.started(symbol)
# load in symbol's ohlc data # load in symbol's ohlc data
chart_app.window.setWindowTitle( chart_app.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} ' f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}' f'tick:{symbol.tick_size}'
) )
# await tractor.breakpoint()
linked_charts = chart_app.linkedcharts linked_charts = chart_app.linkedcharts
linked_charts._symbol = symbol linked_charts._symbol = symbol
chart = linked_charts.plot_ohlc_main(symbol, bars) chart = linked_charts.plot_ohlc_main(symbol, bars)
chart.setFocus() chart.setFocus()
# plot historical vwap if available # plot historical vwap if available
@ -1495,9 +1561,8 @@ async def chart_symbol(
) )
# wait for a first quote before we start any update tasks # wait for a first quote before we start any update tasks
quote = await feed.receive() # quote = await feed.receive()
# log.info(f'Received first quote {quote}')
log.info(f'Received first quote {quote}')
n.start_soon( n.start_soon(
check_for_new_bars, check_for_new_bars,
@ -1518,12 +1583,48 @@ async def chart_symbol(
await start_order_mode(chart, symbol, brokername) await start_order_mode(chart, symbol, brokername)
async def load_providers(
brokernames: list[str],
loglevel: str,
) -> None:
# TODO: seems like our incentive for brokerd caching lelel
backends = {}
async with AsyncExitStack() as stack:
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
log.info(f'Loading brokerd for {broker}')
# spin up broker daemons for each provider
portal = await stack.enter_async_context(
maybe_spawn_brokerd(
broker,
loglevel=loglevel
)
)
backends[broker] = portal
await stack.enter_async_context(
feed.install_brokerd_search(
portal,
get_brokermod(broker),
)
)
# keep search engines up until cancelled
await trio.sleep_forever()
async def _async_main( async def _async_main(
# implicit required argument provided by ``qtractor_run()`` # implicit required argument provided by ``qtractor_run()``
widgets: Dict[str, Any], widgets: Dict[str, Any],
symbol_key: str, sym: str,
brokername: str, brokernames: str,
loglevel: str, loglevel: str,
) -> None: ) -> None:
@ -1556,27 +1657,58 @@ async def _async_main(
# that run cached in the bg # that run cached in the bg
chart_app._root_n = root_n chart_app._root_n = root_n
chart_app.load_symbol(brokername, symbol_key, loglevel) # setup search widget
search = _search.SearchWidget(chart_space=chart_app)
symbol = await root_n.start( # the main chart's view is given focus at startup
chart_symbol, search.bar.unfocus()
chart_app,
brokername, # add search singleton to global chart-space widget
symbol_key, chart_app.hbox.addWidget(
loglevel, search,
# alights to top and uses minmial space based on
# search bar size hint (i think?)
alignment=Qt.AlignTop
) )
chart_app.search = search
chart_app.window.setWindowTitle( symbol, _, provider = sym.rpartition('.')
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
await trio.sleep_forever() # this internally starts a ``chart_symbol()`` task above
chart_app.load_symbol(provider, symbol, loglevel)
root_n.start_soon(load_providers, brokernames, loglevel)
# spin up a search engine for the local cached symbol set
async with _search.register_symbol_search(
provider_name='cache',
search_routine=partial(
_search.search_simple_dict,
source=chart_app._chart_cache,
),
):
# start handling search bar kb inputs
async with open_key_stream(
search.bar,
) as key_stream:
# start kb handling task for searcher
root_n.start_soon(
_search.handle_keyboard_input,
# chart_app,
search,
key_stream,
)
await trio.sleep_forever()
def _main( def _main(
sym: str, sym: str,
brokername: str, brokernames: [str],
piker_loglevel: str, piker_loglevel: str,
tractor_kwargs, tractor_kwargs,
) -> None: ) -> None:
@ -1586,7 +1718,7 @@ def _main(
# Qt entry point # Qt entry point
run_qtractor( run_qtractor(
func=_async_main, func=_async_main,
args=(sym, brokername, piker_loglevel), args=(sym, brokernames, piker_loglevel),
main_widget=ChartSpace, main_widget=ChartSpace,
tractor_kwargs=tractor_kwargs, tractor_kwargs=tractor_kwargs,
) )

91
piker/ui/_event.py 100644
View File

@ -0,0 +1,91 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Qt event proxying and processing using ``trio`` mem chans.
"""
from contextlib import asynccontextmanager
from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import QEvent
import trio
class EventCloner(QtCore.QObject):
"""Clone and forward keyboard events over a trio memory channel
for later async processing.
"""
_event_types: set[QEvent] = set()
_send_chan: trio.abc.SendChannel = None
def eventFilter(
self,
source: QtGui.QWidget,
ev: QEvent,
) -> None:
if ev.type() in self._event_types:
# TODO: what's the right way to allow this?
# if ev.isAutoRepeat():
# ev.ignore()
# XXX: we unpack here because apparently doing it
# after pop from the mem chan isn't showing the same
# event object? no clue wtf is going on there, likely
# something to do with Qt internals and calling the
# parent handler?
key = ev.key()
mods = ev.modifiers()
txt = ev.text()
# run async processing
self._send_chan.send_nowait((ev, key, mods, txt))
# never intercept the event
return False
@asynccontextmanager
async def open_key_stream(
source_widget: QtGui.QWidget,
event_types: set[QEvent] = {QEvent.KeyPress},
# TODO: should we offer some kinda option for toggling releases?
# would it require a channel per event type?
# QEvent.KeyRelease,
) -> trio.abc.ReceiveChannel:
# 1 to force eager sending
send, recv = trio.open_memory_channel(16)
kc = EventCloner()
kc._send_chan = send
kc._event_types = event_types
source_widget.installEventFilter(kc)
try:
yield recv
finally:
await send.aclose()
source_widget.removeEventFilter(kc)

View File

@ -38,6 +38,7 @@ from PyQt5.QtCore import (
QCoreApplication, QCoreApplication,
) )
import qdarkstyle import qdarkstyle
# import qdarkgraystyle
import trio import trio
import tractor import tractor
from outcome import Error from outcome import Error
@ -144,6 +145,9 @@ def run_qtractor(
# currently seem tricky.. # currently seem tricky..
app.setQuitOnLastWindowClosed(False) app.setQuitOnLastWindowClosed(False)
# XXX: lmfao, this is how you disable text edit cursor blinking..smh
app.setCursorFlashTime(0)
# set global app singleton # set global app singleton
global _qt_app global _qt_app
_qt_app = app _qt_app = app

View File

@ -435,7 +435,12 @@ class Cursor(pg.GraphicsObject):
self, self,
y_label_level: float = None, y_label_level: float = None,
) -> None: ) -> None:
g = self.graphics[self.active_plot]
plot = self.active_plot
if not plot:
return
g = self.graphics[plot]
# show horiz line and y-label # show horiz line and y-label
g['hl'].show() g['hl'].show()
g['vl'].show() g['vl'].show()

View File

@ -188,7 +188,7 @@ class SelectRect(QtGui.QGraphicsRectItem):
self._abs_top_right = label_anchor self._abs_top_right = label_anchor
self._label_proxy.setPos(self.vb.mapFromView(label_anchor)) self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
self._label.show() # self._label.show()
def clear(self): def clear(self):
"""Clear the selection box from view. """Clear the selection box from view.
@ -486,6 +486,8 @@ class ChartView(ViewBox):
self._key_buffer = [] self._key_buffer = []
self._key_active: bool = False self._key_active: bool = False
self.setFocusPolicy(QtCore.Qt.StrongFocus)
@property @property
def chart(self) -> 'ChartPlotWidget': # type: ignore # noqa def chart(self) -> 'ChartPlotWidget': # type: ignore # noqa
return self._chart return self._chart
@ -692,7 +694,7 @@ class ChartView(ViewBox):
ev.accept() ev.accept()
self.mode.submit_exec() self.mode.submit_exec()
def keyReleaseEvent(self, ev): def keyReleaseEvent(self, ev: QtCore.QEvent):
""" """
Key release to normally to trigger release of input mode Key release to normally to trigger release of input mode
@ -711,6 +713,10 @@ class ChartView(ViewBox):
# if self.state['mouseMode'] == ViewBox.RectMode: # if self.state['mouseMode'] == ViewBox.RectMode:
self.setMouseMode(ViewBox.PanMode) self.setMouseMode(ViewBox.PanMode)
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
# if self.state['mouseMode'] == ViewBox.RectMode: # if self.state['mouseMode'] == ViewBox.RectMode:
# if key == QtCore.Qt.Key_Space: # if key == QtCore.Qt.Key_Space:
if mods == QtCore.Qt.ControlModifier or key == QtCore.Qt.Key_Control: if mods == QtCore.Qt.ControlModifier or key == QtCore.Qt.Key_Control:
@ -722,7 +728,7 @@ class ChartView(ViewBox):
self._key_active = False self._key_active = False
def keyPressEvent(self, ev): def keyPressEvent(self, ev: QtCore.QEvent) -> None:
""" """
This routine should capture key presses in the current view box. This routine should capture key presses in the current view box.
@ -747,15 +753,22 @@ class ChartView(ViewBox):
ctrl = False ctrl = False
if mods == QtCore.Qt.ControlModifier: if mods == QtCore.Qt.ControlModifier:
ctrl = True ctrl = True
if mods == QtCore.Qt.ControlModifier:
self.mode._exec_mode = 'live' self.mode._exec_mode = 'live'
self._key_active = True self._key_active = True
# alt # ctrl + alt
if mods == QtCore.Qt.AltModifier: # ctlalt = False
pass # if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
# ctlr-<space>/<l> for "lookup", "search" -> open search tree
if ctrl and key in {
QtCore.Qt.Key_L,
QtCore.Qt.Key_Space,
}:
search = self._chart._lc.chart_space.search
search.focus()
# esc # esc
if key == QtCore.Qt.Key_Escape or (ctrl and key == QtCore.Qt.Key_C): if key == QtCore.Qt.Key_Escape or (ctrl and key == QtCore.Qt.Key_C):
@ -802,5 +815,6 @@ class ChartView(ViewBox):
# elif ev.key() == QtCore.Qt.Key_Backspace: # elif ev.key() == QtCore.Qt.Key_Backspace:
# self.scaleHistory(len(self.axHistory)) # self.scaleHistory(len(self.axHistory))
else: else:
# maybe propagate to parent widget
ev.ignore() ev.ignore()
self._key_active = False self._key_active = False

963
piker/ui/_search.py 100644
View File

@ -0,0 +1,963 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
qompleterz: embeddable search and complete using trio, Qt and fuzzywuzzy.
"""
# link set for hackzin on this stuff:
# https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections
# https://doc.qt.io/qt-5/model-view-programming.html
# https://doc.qt.io/qt-5/modelview.html
# https://doc.qt.io/qt-5/qtreeview.html#selectedIndexes
# https://doc.qt.io/qt-5/qmodelindex.html#siblingAtColumn
# https://doc.qt.io/qt-5/qitemselectionmodel.html#currentIndex
# https://www.programcreek.com/python/example/108109/PyQt5.QtWidgets.QTreeView
# https://doc.qt.io/qt-5/qsyntaxhighlighter.html
# https://github.com/qutebrowser/qutebrowser/blob/master/qutebrowser/completion/completiondelegate.py#L243
# https://forum.qt.io/topic/61343/highlight-matched-substrings-in-qstyleditemdelegate
from collections import defaultdict
from contextlib import asynccontextmanager
from functools import partial
from typing import (
List, Optional, Callable,
Awaitable, Sequence, Dict,
Any, AsyncIterator, Tuple,
)
import time
# from pprint import pformat
from fuzzywuzzy import process as fuzzy
import trio
from trio_typing import TaskStatus
from PyQt5 import QtCore, QtGui
from PyQt5 import QtWidgets
from PyQt5.QtCore import (
Qt,
# QSize,
QModelIndex,
QItemSelectionModel,
)
from PyQt5.QtGui import (
# QLayout,
QStandardItem,
QStandardItemModel,
)
from PyQt5.QtWidgets import (
QWidget,
QTreeView,
# QListWidgetItem,
# QAbstractScrollArea,
QStyledItemDelegate,
)
from ..log import get_logger
from ._style import (
_font,
DpiAwareFont,
# hcolor,
)
log = get_logger(__name__)
class SimpleDelegate(QStyledItemDelegate):
"""
Super simple view delegate to render text in the same
font size as the search widget.
"""
def __init__(
self,
parent=None,
font: DpiAwareFont = _font,
) -> None:
super().__init__(parent)
self.dpi_font = font
class CompleterView(QTreeView):
# XXX: relevant docs links:
# - simple widget version of this:
# https://doc.qt.io/qt-5/qtreewidget.html#details
# - MVC high level instructional:
# https://doc.qt.io/qt-5/model-view-programming.html
# - MV tut:
# https://doc.qt.io/qt-5/modelview.html
# - custome header view (for doing stuff like we have in kivy?):
# https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections
# TODO: selection model stuff for eventual aggregate feeds
# charting and mgmt;
# https://doc.qt.io/qt-5/qabstractitemview.html#setSelectionModel
# https://doc.qt.io/qt-5/qitemselectionmodel.html
# https://doc.qt.io/qt-5/modelview.html#3-2-working-with-selections
# https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-of-items
# TODO: mouse extended handling:
# https://doc.qt.io/qt-5/qabstractitemview.html#entered
def __init__(
self,
parent=None,
labels: List[str] = [],
) -> None:
super().__init__(parent)
model = QStandardItemModel(self)
self.labels = labels
# a std "tabular" config
self.setItemDelegate(SimpleDelegate())
self.setModel(model)
self.setAlternatingRowColors(True)
# TODO: size this based on DPI font
self.setIndentation(20)
self.pressed.connect(self.on_pressed)
# self.setUniformRowHeights(True)
# self.setColumnWidth(0, 3)
# self.setVerticalBarPolicy(Qt.ScrollBarAlwaysOff)
# self.setSizeAdjustPolicy(QAbstractScrollArea.AdjustIgnored)
# ux settings
self.setItemsExpandable(True)
self.setExpandsOnDoubleClick(False)
self.setAnimated(False)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
# column headers
model.setHorizontalHeaderLabels(labels)
self._font_size: int = 0 # pixels
def on_pressed(self, idx: QModelIndex) -> None:
search = self.parent()
search.chart_current_item(clear_to_cache=False)
search.focus()
def set_font_size(self, size: int = 18):
# dpi_px_size = _font.px_size
# print(size)
if size < 0:
size = 16
self._font_size = size
self.setStyleSheet(f"font: {size}px")
def resize(self):
model = self.model()
cols = model.columnCount()
for i in range(cols):
self.resizeColumnToContents(i)
# inclusive of search bar and header "rows" in pixel terms
rows = 100
# max_rows = 8 # 6 + search and headers
row_px = self.rowHeight(self.currentIndex())
# print(f'font_h: {font_h}\n px_height: {px_height}')
# TODO: probably make this more general / less hacky
self.setMinimumSize(self.width(), rows * row_px)
self.setMaximumSize(self.width() + 10, rows * row_px)
self.setFixedWidth(333)
def is_selecting_d1(self) -> bool:
cidx = self.selectionModel().currentIndex()
return cidx.parent() == QModelIndex()
def previous_index(self) -> QModelIndex:
cidx = self.selectionModel().currentIndex()
one_above = self.indexAbove(cidx)
if one_above.parent() == QModelIndex():
# if the next node up's parent is the root we don't want to select
# the next node up since it's a top level node and we only
# select entries depth >= 2.
# see if one more up is not the root and we can select it.
two_above = self.indexAbove(one_above)
if two_above != QModelIndex():
return two_above
else:
return cidx
return one_above # just next up
def next_index(self) -> QModelIndex:
cidx = self.selectionModel().currentIndex()
one_below = self.indexBelow(cidx)
if one_below.parent() == QModelIndex():
# if the next node up's parent is the root we don't want to select
# the next node up since it's a top level node and we only
# select entries depth >= 2.
# see if one more up is not the root and we can select it.
two_below = self.indexBelow(one_below)
if two_below != QModelIndex():
return two_below
else:
return cidx
return one_below # just next down
def select_from_idx(
self,
idx: QModelIndex,
) -> QStandardItem:
'''Select and return the item at index ``idx``.
'''
sel = self.selectionModel()
model = self.model()
sel.setCurrentIndex(
idx,
QItemSelectionModel.ClearAndSelect |
QItemSelectionModel.Rows
)
return model.itemFromIndex(idx)
def select_first(self) -> QStandardItem:
'''Select the first depth >= 2 entry from the completer tree and
return it's item.
'''
# ensure we're **not** selecting the first level parent node and
# instead its child.
model = self.model()
for idx, item in self.iter_d1():
if model.rowCount(idx) == 0:
continue
else:
return self.select_from_idx(self.indexBelow(idx))
def select_next(self) -> QStandardItem:
idx = self.next_index()
assert idx.isValid()
return self.select_from_idx(idx)
def select_previous(self) -> QStandardItem:
idx = self.previous_index()
assert idx.isValid()
return self.select_from_idx(idx)
def next_section(self, direction: str = 'down') -> QModelIndex:
cidx = start_idx = self.selectionModel().currentIndex()
# step up levels to depth == 1
while cidx.parent() != QModelIndex():
cidx = cidx.parent()
# move to next section in `direction`
op = {'up': -1, 'down': +1}[direction]
next_row = cidx.row() + op
nidx = self.model().index(next_row, cidx.column(), QModelIndex())
# do nothing, if there is no valid "next" section
if not nidx.isValid():
return self.select_from_idx(start_idx)
# go to next selectable child item
self.select_from_idx(nidx)
return self.select_next()
def iter_d1(
self,
) -> tuple[QModelIndex, QStandardItem]:
model = self.model()
isections = model.rowCount()
# much thanks to following code to figure out breadth-first
# traversing from the root node:
# https://stackoverflow.com/a/33126689
for i in range(isections):
idx = model.index(i, 0, QModelIndex())
item = model.itemFromIndex(idx)
yield idx, item
def find_section(
self,
section: str,
) -> Optional[QModelIndex]:
'''Find the *first* depth = 1 section matching ``section`` in
the tree and return its index.
'''
for idx, item in self.iter_d1():
if item.text() == section:
return idx
else:
# caller must expect his
return None
def clear_section(
self,
section: str,
status_field: str = None,
) -> None:
'''Clear all result-rows from under the depth = 1 section.
'''
idx = self.find_section(section)
model = self.model()
if idx is not None:
if model.hasChildren(idx):
rows = model.rowCount(idx)
# print(f'removing {rows} from {section}')
assert model.removeRows(0, rows, parent=idx)
# remove section as well ?
# model.removeRow(i, QModelIndex())
if status_field is not None:
model.setItem(idx.row(), 1, QStandardItem(status_field))
else:
model.setItem(idx.row(), 1, QStandardItem())
self.resize()
return idx
else:
return None
def set_section_entries(
self,
section: str,
values: Sequence[str],
clear_all: bool = False,
) -> None:
'''Set result-rows for depth = 1 tree section ``section``.
'''
model = self.model()
if clear_all:
# XXX: rewrite the model from scratch if caller requests it
model.clear()
model.setHorizontalHeaderLabels(self.labels)
section_idx = self.clear_section(section)
# if we can't find a section start adding to the root
if section_idx is None:
root = model.invisibleRootItem()
section_item = QStandardItem(section)
blank = QStandardItem('')
root.appendRow([section_item, blank])
else:
section_item = model.itemFromIndex(section_idx)
# values just needs to be sequence-like
for i, s in enumerate(values):
ix = QStandardItem(str(i))
item = QStandardItem(s)
# Add the item to the model
section_item.appendRow([ix, item])
self.expandAll()
# TODO: figure out if we can avoid this line in a better way
# such that "re-selection" doesn't happen tree-wise for each new
# sub-search:
# https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-in-item-views
# XXX: THE BELOW LINE MUST BE CALLED.
# this stuff is super finicky and if not done right will cause
# Qt crashes out our buttz. it's required in order to get the
# view to show right after typing input.
self.select_first()
# TODO: the way we might be able to do this more sanely is,
# 1. for the currently selected item, when start rewriting
# a section figure out the row, column, parent "abstract"
# position in the tree view and store it
# 2. take that position and re-apply the selection to the new
# model/tree by looking up the new "equivalent element" and
# selecting
self.show_matches()
def show_matches(self) -> None:
self.show()
self.resize()
class SearchBar(QtWidgets.QLineEdit):
def __init__(
self,
parent: QWidget,
parent_chart: QWidget, # noqa
view: Optional[CompleterView] = None,
font: DpiAwareFont = _font,
) -> None:
super().__init__(parent)
# self.setContextMenuPolicy(Qt.CustomContextMenu)
# self.customContextMenuRequested.connect(self.show_menu)
# self.setStyleSheet(f"font: 18px")
self.view: CompleterView = view
self.dpi_font = font
self.chart_app = parent_chart
# size it as we specify
# https://doc.qt.io/qt-5/qsizepolicy.html#Policy-enum
self.setSizePolicy(
QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Fixed,
)
self.setFont(font.font)
# witty bit of margin
self.setTextMargins(2, 2, 2, 2)
def focus(self) -> None:
self.selectAll()
self.show()
self.setFocus()
def show(self) -> None:
super().show()
self.view.show_matches()
def sizeHint(self) -> QtCore.QSize:
"""
Scale edit box to size of dpi aware font.
"""
psh = super().sizeHint()
psh.setHeight(self.dpi_font.px_size + 2)
return psh
def unfocus(self) -> None:
self.parent().hide()
self.clearFocus()
if self.view:
self.view.hide()
class SearchWidget(QtGui.QWidget):
'''Composed widget of ``SearchBar`` + ``CompleterView``.
Includes helper methods for item management in the sub-widgets.
'''
def __init__(
self,
chart_space: 'ChartSpace', # type: ignore # noqa
columns: List[str] = ['src', 'symbol'],
parent=None,
) -> None:
super().__init__(parent or chart_space)
# size it as we specify
self.setSizePolicy(
QtWidgets.QSizePolicy.Fixed,
QtWidgets.QSizePolicy.Expanding,
)
self.chart_app = chart_space
self.vbox = QtGui.QVBoxLayout(self)
self.vbox.setContentsMargins(0, 0, 0, 0)
self.vbox.setSpacing(4)
# split layout for the (label:| search bar entry)
self.bar_hbox = QtGui.QHBoxLayout()
self.bar_hbox.setContentsMargins(0, 0, 0, 0)
self.bar_hbox.setSpacing(4)
# add label to left of search bar
self.label = label = QtGui.QLabel(parent=self)
label.setTextFormat(3) # markdown
label.setFont(_font.font)
label.setMargin(4)
label.setText("`search`:")
label.show()
label.setAlignment(
QtCore.Qt.AlignVCenter
| QtCore.Qt.AlignLeft
)
self.bar_hbox.addWidget(label)
self.view = CompleterView(
parent=self,
labels=columns,
)
self.bar = SearchBar(
parent=self,
parent_chart=chart_space,
view=self.view,
)
self.bar_hbox.addWidget(self.bar)
self.vbox.addLayout(self.bar_hbox)
self.vbox.setAlignment(self.bar, Qt.AlignTop | Qt.AlignRight)
self.vbox.addWidget(self.bar.view)
self.vbox.setAlignment(self.view, Qt.AlignTop | Qt.AlignLeft)
def focus(self) -> None:
if self.view.model().rowCount(QModelIndex()) == 0:
# fill cache list if nothing existing
self.view.set_section_entries(
'cache',
list(reversed(self.chart_app._chart_cache)),
clear_all=True,
)
self.bar.focus()
self.show()
def get_current_item(self) -> Optional[Tuple[str, str]]:
'''Return the current completer tree selection as
a tuple ``(parent: str, child: str)`` if valid, else ``None``.
'''
model = self.view.model()
sel = self.view.selectionModel()
cidx = sel.currentIndex()
# TODO: get rid of this hard coded column -> 1
# and use the ``CompleterView`` schema/settings
# to figure out the desired field(s)
# https://doc.qt.io/qt-5/qstandarditemmodel.html#itemFromIndex
node = model.itemFromIndex(cidx.siblingAtColumn(1))
if node:
symbol = node.text()
try:
provider = node.parent().text()
except AttributeError:
# no text set
return None
# TODO: move this to somewhere non-search machinery specific?
if provider == 'cache':
symbol, _, provider = symbol.rpartition('.')
return provider, symbol
else:
return None
def chart_current_item(
self,
clear_to_cache: bool = True,
) -> Optional[str]:
'''Attempt to load and switch the current selected
completion result to the affiliated chart app.
Return any loaded symbol
'''
value = self.get_current_item()
if value is None:
return None
provider, symbol = value
chart = self.chart_app
log.info(f'Requesting symbol: {symbol}.{provider}')
chart.load_symbol(
provider,
symbol,
'info',
)
# fully qualified symbol name (SNS i guess is what we're
# making?)
fqsn = '.'.join([symbol, provider]).lower()
# Re-order the symbol cache on the chart to display in
# LIFO order. this is normally only done internally by
# the chart on new symbols being loaded into memory
chart.set_chart_symbol(fqsn, chart.linkedcharts)
if clear_to_cache:
self.bar.clear()
self.view.set_section_entries(
'cache',
values=list(reversed(chart._chart_cache)),
# remove all other completion results except for cache
clear_all=True,
)
return fqsn
_search_active: trio.Event = trio.Event()
_search_enabled: bool = False
async def pack_matches(
view: CompleterView,
has_results: dict[str, set[str]],
matches: dict[(str, str), [str]],
provider: str,
pattern: str,
search: Callable[..., Awaitable[dict]],
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
log.info(f'Searching {provider} for "{pattern}"')
if provider != 'cache':
# insert provider entries with search status
view.set_section_entries(
section=provider,
values=[],
)
view.clear_section(provider, status_field='-> searchin..')
else: # for the cache just clear it's entries and don't put a status
view.clear_section(provider)
with trio.CancelScope() as cs:
task_status.started(cs)
# ensure ^ status is updated
results = await search(pattern)
if provider != 'cache': # XXX: don't cache the cache results xD
matches[(provider, pattern)] = results
# print(f'results from {provider}: {results}')
has_results[pattern].add(provider)
if results:
# display completion results
view.set_section_entries(
section=provider,
values=results,
)
else:
view.clear_section(provider)
async def fill_results(
search: SearchBar,
recv_chan: trio.abc.ReceiveChannel,
# kb debouncing pauses (bracket defaults)
min_pause_time: float = 0.1,
max_pause_time: float = 6/16,
) -> None:
"""Task to search through providers and fill in possible
completion results.
"""
global _search_active, _search_enabled, _searcher_cache
bar = search.bar
view = bar.view
view.select_from_idx(QModelIndex())
last_text = bar.text()
repeats = 0
# cache of prior patterns to search results
matches = defaultdict(list)
has_results: defaultdict[str, set[str]] = defaultdict(set)
while True:
await _search_active.wait()
period = None
while True:
last_text = bar.text()
wait_start = time.time()
with trio.move_on_after(max_pause_time):
pattern = await recv_chan.receive()
period = time.time() - wait_start
print(f'{pattern} after {period}')
# during fast multiple key inputs, wait until a pause
# (in typing) to initiate search
if period < min_pause_time:
log.debug(f'Ignoring fast input for {pattern}')
continue
text = bar.text()
# print(f'search: {text}')
if not text or text.isspace():
# print('idling')
_search_active = trio.Event()
break
if repeats > 2 and period >= max_pause_time:
_search_active = trio.Event()
repeats = 0
break
if text == last_text:
repeats += 1
if not _search_enabled:
# print('search currently disabled')
break
log.debug(f'Search req for {text}')
already_has_results = has_results[text]
# issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers
async with trio.open_nursery() as n:
for provider, (search, pause) in (
_searcher_cache.copy().items()
):
if provider != 'cache':
view.clear_section(
provider, status_field='-> searchin..')
# only conduct search on this backend if it's
# registered for the corresponding pause period.
if (period >= pause) and (
provider not in already_has_results
):
await n.start(
pack_matches,
view,
has_results,
matches,
provider,
text,
search
)
else: # already has results for this input text
results = matches[(provider, text)]
if results and provider != 'cache':
view.set_section_entries(
section=provider,
values=results,
)
else:
view.clear_section(provider)
bar.show()
async def handle_keyboard_input(
search: SearchWidget,
recv_chan: trio.abc.ReceiveChannel,
) -> None:
global _search_active, _search_enabled
# startup
chart = search.chart_app
bar = search.bar
view = bar.view
view.set_font_size(bar.dpi_font.px_size)
send, recv = trio.open_memory_channel(16)
async with trio.open_nursery() as n:
# start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and
# async updates the completer view's results.
n.start_soon(
partial(
fill_results,
search,
recv,
)
)
async for event, key, mods, txt in recv_chan:
log.debug(f'key: {key}, mods: {mods}, txt: {txt}')
ctl = False
if mods == Qt.ControlModifier:
ctl = True
# # ctl + alt as combo
# ctlalt = False
# if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods:
# ctlalt = True
if key in (Qt.Key_Enter, Qt.Key_Return):
search.chart_current_item(clear_to_cache=True)
_search_enabled = False
continue
elif not ctl and not bar.text():
# if nothing in search text show the cache
view.set_section_entries(
'cache',
list(reversed(chart._chart_cache)),
clear_all=True,
)
continue
# cancel and close
if ctl and key in {
Qt.Key_C,
Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt,
}:
search.bar.unfocus()
# kill the search and focus back on main chart
if chart:
chart.linkedcharts.focus()
continue
if ctl and key in {
Qt.Key_L,
}:
# like url (link) highlight in a web browser
bar.focus()
# selection navigation controls
elif ctl and key in {
Qt.Key_D,
}:
view.next_section(direction='down')
_search_enabled = False
elif ctl and key in {
Qt.Key_U,
}:
view.next_section(direction='up')
_search_enabled = False
# selection navigation controls
elif (ctl and key in {
Qt.Key_K,
Qt.Key_J,
}) or key in {
Qt.Key_Up,
Qt.Key_Down,
}:
_search_enabled = False
if key in {Qt.Key_K, Qt.Key_Up}:
item = view.select_previous()
elif key in {Qt.Key_J, Qt.Key_Down}:
item = view.select_next()
if item:
parent_item = item.parent()
if parent_item and parent_item.text() == 'cache':
# if it's a cache item, switch and show it immediately
search.chart_current_item(clear_to_cache=False)
elif not ctl:
# relay to completer task
_search_enabled = True
send.send_nowait(search.bar.text())
_search_active.set()
async def search_simple_dict(
text: str,
source: dict,
) -> Dict[str, Any]:
# search routine can be specified as a function such
# as in the case of the current app's local symbol cache
matches = fuzzy.extractBests(
text,
source.keys(),
score_cutoff=90,
)
return [item[0] for item in matches]
# cache of provider names to async search routines
_searcher_cache: Dict[str, Callable[..., Awaitable]] = {}
@asynccontextmanager
async def register_symbol_search(
provider_name: str,
search_routine: Callable,
pause_period: Optional[float] = None,
) -> AsyncIterator[dict]:
global _searcher_cache
pause_period = pause_period or 0.125
# deliver search func to consumer
try:
_searcher_cache[provider_name] = (search_routine, pause_period)
yield search_routine
finally:
_searcher_cache.pop(provider_name)

View File

@ -49,7 +49,7 @@ def monitor(config, rate, name, dhost, test, tl):
"""Start a real-time watchlist UI """Start a real-time watchlist UI
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermod = config['brokermods'][0]
loglevel = config['loglevel'] loglevel = config['loglevel']
log = config['log'] log = config['log']
@ -138,17 +138,24 @@ def chart(config, symbol, profile, pdb):
from .. import _profile from .. import _profile
from ._chart import _main from ._chart import _main
if '.' not in symbol:
click.echo(click.style(
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
fg='red',
))
return
# toggle to enable profiling # toggle to enable profiling
_profile._pg_profile = profile _profile._pg_profile = profile
# global opts # global opts
brokername = config['broker'] brokernames = config['brokers']
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel'] pikerloglevel = config['loglevel']
_main( _main(
sym=symbol, sym=symbol,
brokername=brokername, brokernames=brokernames,
piker_loglevel=pikerloglevel, piker_loglevel=pikerloglevel,
tractor_kwargs={ tractor_kwargs={
'debug_mode': pdb, 'debug_mode': pdb,

View File

@ -402,7 +402,9 @@ async def start_order_mode(
# each clearing tick is responded individually # each clearing tick is responded individually
elif resp in ('broker_filled',): elif resp in ('broker_filled',):
action = msg['action'] action = msg['action']
# TODO: some kinda progress system # TODO: some kinda progress system
order_mode.on_fill( order_mode.on_fill(
oid, oid,

View File

@ -71,9 +71,11 @@ setup(
'PyQt5', 'PyQt5',
'pyqtgraph', 'pyqtgraph',
'qdarkstyle==2.8.1', 'qdarkstyle==2.8.1',
#'kivy', see requirement.txt; using a custom branch atm
# tsdbs # tsdbs
'pymarketstore', 'pymarketstore',
#'kivy', see requirement.txt; using a custom branch atm #'kivy', see requirement.txt; using a custom branch atm
# fuzzy search # fuzzy search