Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 9b6f4d24be Just warn on `ib` symbol search lags 2022-11-08 14:50:31 -05:00
Tyler Goodlet 4eea8042ff Start data feed layer test suite
Initial test that starts a `binance` feed and reads the quote messages
alongside shm buffers for 1s and 1m OHLC; just prints to console for
now.

Template out parametrization for multi-symbol quote-multiplexed feeds
which coming soon B)
2022-11-07 15:40:52 -05:00
Tyler Goodlet 9fc45c2bff Drop `tractor.log` level override fixture 2022-11-07 15:40:41 -05:00
Tyler Goodlet e7751cb5dd EMS: expect fqsn key in `Feed.symbols` 2022-11-07 15:40:01 -05:00
Tyler Goodlet 79b27899bf Use new `GodWidget.load_symbols()` from search 2022-11-07 15:39:28 -05:00
Tyler Goodlet 312c1552cd Expose `.open_feed()` and `open_piker_runtime()` eps at top level 2022-11-07 15:38:54 -05:00
Tyler Goodlet f1b5c6e62c Make all UI entrypoints accept an fqsn `list`
This is to prep for multi-symbol feeds and charts so we accept
a sequence of fqsns to the top level entrypoints as well as the
`.data.feed.open_feed()` API (though we're not actually supporting true
multiplexed feeds nor shm lookups per fqsn yet).
2022-11-07 15:33:52 -05:00
Tyler Goodlet 16699bdc88 Passthrough registry sockaddr from chart cmd to daemon 2022-11-07 13:05:52 -05:00
Tyler Goodlet 5151971131 Always set fqsn in `Feed.symbols: dict` 2022-11-07 13:04:58 -05:00
Tyler Goodlet 81f0fc77e3 Add registry socket cli flags to all client cmds
Allows starting UI apps and passing the `pikerd` registry socket-addr
args via `--host` or `--port` such that a separate actor tree can be
started by selecting an unused port. This is handy when hacking new
features but while also wishing to run a more stable version of the code
for trading on the same host.
2022-11-07 11:22:38 -05:00
13 changed files with 195 additions and 68 deletions

View File

@ -18,3 +18,10 @@
piker: trading gear for hackers.
"""
from ._daemon import open_piker_runtime
from .data.feed import open_feed
__all__ = [
'open_piker_runtime',
'open_feed',
]

View File

@ -199,7 +199,7 @@ async def open_piker_runtime(
# for data daemons when running in production.
debug_mode: bool = False,
) -> Optional[tractor._portal.Portal]:
) -> tractor.Actor:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
@ -254,6 +254,7 @@ async def maybe_open_runtime(
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
@ -266,11 +267,19 @@ async def maybe_open_pikerd(
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
async with (
maybe_open_runtime(loglevel, **kwargs),
tractor.find_actor(_root_dname) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal
return
@ -280,6 +289,7 @@ async def maybe_open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
) as _:
# in the case where we're starting up the

View File

@ -1038,7 +1038,13 @@ async def open_symbol_search(
stock_results = []
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
try:
results = await target
except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?")
return
stock_results.extend(results)
for i in range(10):
with trio.move_on_after(3) as cs:

View File

@ -1239,8 +1239,7 @@ async def process_client_order_cmds(
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
sym = fqsn.replace(f'.{brokers[0]}', '')
min_tick = feed.symbols[sym].tick_size
min_tick = feed.symbols[fqsn].tick_size
if action == 'buy':
tickfilter = ('ask', 'last', 'trade')

View File

@ -125,8 +125,19 @@ def pikerd(
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory')
@click.option('--host', '-h', default=None, help='Host addr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind')
@click.pass_context
def cli(ctx, brokers, loglevel, tl, configdir):
def cli(
ctx: click.Context,
brokers: list[str],
loglevel: str,
tl: bool,
configdir: str,
host: str,
port: int,
) -> None:
if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir)
@ -138,6 +149,13 @@ def cli(ctx, brokers, loglevel, tl, configdir):
else:
brokermods = [get_brokermod(broker) for broker in brokers]
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
)
ctx.obj.update({
'brokers': brokers,
'brokermods': brokermods,
@ -146,6 +164,7 @@ def cli(ctx, brokers, loglevel, tl, configdir):
'log': get_console_log(loglevel),
'confdir': config._config_dir,
'wl_path': config._watchlists_data_path,
'registry_addr': reg_addr,
})
# allow enabling same loglevel in ``tractor`` machinery

View File

@ -1410,7 +1410,7 @@ async def open_feed(
# symbol.broker_info[brokername] = si
feed.symbols[fqsn] = symbol
feed.symbols[sym] = symbol
feed.symbols[f'{sym}.{brokername}'] = symbol
# cast shm dtype to list... can't member why we need this
for shm_key, shm in [

View File

@ -66,7 +66,7 @@ async def _async_main(
# implicit required argument provided by ``qtractor_run()``
main_widget: GodWidget,
sym: str,
syms: list[str],
brokernames: str,
loglevel: str,
@ -113,12 +113,16 @@ async def _async_main(
# godwidget.hbox.addWidget(search)
godwidget.search = search
symbols: list[str] = []
for sym in syms:
symbol, _, provider = sym.rpartition('.')
symbols.append(symbol)
# this internally starts a ``display_symbol_data()`` task above
order_mode_ready = await godwidget.load_symbol(
order_mode_ready = await godwidget.load_symbols(
provider,
symbol,
symbols,
loglevel
)
@ -166,7 +170,7 @@ async def _async_main(
def _main(
sym: str,
syms: list[str],
brokernames: [str],
piker_loglevel: str,
tractor_kwargs,
@ -178,7 +182,7 @@ def _main(
'''
run_qtractor(
func=_async_main,
args=(sym, brokernames, piker_loglevel),
args=(syms, brokernames, piker_loglevel),
main_widget_type=GodWidget,
tractor_kwargs=tractor_kwargs,
)

View File

@ -186,10 +186,10 @@ class GodWidget(QWidget):
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
return self._chart_cache.get(symbol_key)
async def load_symbol(
async def load_symbols(
self,
providername: str,
symbol_key: str,
symbol_keys: list[str],
loglevel: str,
reset: bool = False,
@ -200,12 +200,20 @@ class GodWidget(QWidget):
Expects a ``numpy`` structured array containing all the ohlcv fields.
'''
fqsns: list[str] = []
# our symbol key style is always lower case
symbol_key = symbol_key.lower()
for key in list(map(str.lower, symbol_keys)):
# fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([symbol_key, providername])
all_linked = self.get_chart_symbol(fqsn)
fqsn = '.'.join([key, providername])
fqsns.append(fqsn)
# NOTE: for now we use the first symbol in the set as the "key"
# for the overlay of feeds on the chart.
group_key = fqsns[0]
all_linked = self.get_chart_symbol(group_key)
order_mode_started = trio.Event()
if not self.vbox.isEmpty():
@ -238,7 +246,7 @@ class GodWidget(QWidget):
display_symbol_data,
self,
providername,
symbol_key,
fqsns,
loglevel,
order_mode_started,
)

View File

@ -947,7 +947,7 @@ async def link_views_with_region(
async def display_symbol_data(
godwidget: GodWidget,
provider: str,
sym: str,
fqsns: list[str],
loglevel: str,
order_mode_started: trio.Event,
@ -961,11 +961,6 @@ async def display_symbol_data(
'''
sbar = godwidget.window.status_bar
loading_sym_key = sbar.open_status(
f'loading {sym}.{provider} ->',
group_key=True
)
# historical data fetch
# brokermod = brokers.get_brokermod(provider)
@ -974,10 +969,18 @@ async def display_symbol_data(
# clear_on_next=True,
# group_key=loading_sym_key,
# )
fqsn = '.'.join((sym, provider))
for fqsn in fqsns:
loading_sym_key = sbar.open_status(
f'loading {fqsn} ->',
group_key=True
)
first_fqsn = fqsns[0]
async with open_feed(
[fqsn],
fqsns,
loglevel=loglevel,
# limit to at least display's FPS
@ -988,7 +991,7 @@ async def display_symbol_data(
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
symbol = feed.symbols[sym]
symbol = feed.symbols[first_fqsn]
fqsn = symbol.front_fqsn()
step_size_s = 1
@ -1025,7 +1028,7 @@ async def display_symbol_data(
godwidget.pp_pane = pp_pane
# create main OHLC chart
chart = rt_linked.plot_ohlc_main(
ohlc_chart = rt_linked.plot_ohlc_main(
symbol,
ohlcv,
# in the case of history chart we explicitly set `False`
@ -1033,8 +1036,8 @@ async def display_symbol_data(
sidepane=pp_pane,
)
chart._feeds[symbol.key] = feed
chart.setFocus()
ohlc_chart._feeds[symbol.key] = feed
ohlc_chart.setFocus()
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# plot historical vwap if available
@ -1044,7 +1047,7 @@ async def display_symbol_data(
# and 'bar_wap' in bars.dtype.fields
# ):
# wap_in_history = True
# chart.draw_curve(
# ohlc_chart.draw_curve(
# name='bar_wap',
# shm=ohlcv,
# color='default_light',
@ -1105,7 +1108,7 @@ async def display_symbol_data(
await trio.sleep(0)
# size view to data prior to order mode init
chart.default_view()
ohlc_chart.default_view()
rt_linked.graphics_cycle()
await trio.sleep(0)
@ -1119,7 +1122,7 @@ async def display_symbol_data(
godwidget.resize_all()
await link_views_with_region(
chart,
ohlc_chart,
hist_chart,
feed,
)
@ -1135,7 +1138,7 @@ async def display_symbol_data(
):
if not vlm_chart:
# trigger another view reset if no sub-chart
chart.default_view()
ohlc_chart.default_view()
rt_linked.mode = mode

View File

@ -665,9 +665,9 @@ class SearchWidget(QtWidgets.QWidget):
log.info(f'Requesting symbol: {symbol}.{provider}')
await godw.load_symbol(
await godw.load_symbols(
provider,
symbol,
[symbol],
'info',
)

View File

@ -46,8 +46,10 @@ def _kivy_import_hack():
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def monitor(config, rate, name, dhost, test, tl):
"""Start a real-time watchlist UI
"""
'''
Start a real-time watchlist UI
'''
# global opts
brokermod = config['brokermods'][0]
loglevel = config['loglevel']
@ -70,8 +72,12 @@ def monitor(config, rate, name, dhost, test, tl):
) as portal:
# run app "main"
await _async_main(
name, portal, tickers,
brokermod, rate, test=test,
name,
portal,
tickers,
brokermod,
rate,
test=test,
)
tractor.run(
@ -122,7 +128,7 @@ def optschain(config, symbol, date, rate, test):
@cli.command()
@click.option(
'--profile',
'-p',
# '-p',
default=None,
help='Enable pyqtgraph profiling'
)
@ -131,9 +137,14 @@ def optschain(config, symbol, date, rate, test):
is_flag=True,
help='Enable tractor debug mode'
)
@click.argument('symbol', required=True)
@click.argument('symbols', nargs=-1, required=True)
@click.pass_obj
def chart(config, symbol, profile, pdb):
def chart(
config,
symbols: list[str],
profile,
pdb: bool,
):
'''
Start a real-time chartng UI
@ -144,8 +155,10 @@ def chart(config, symbol, profile, pdb):
_profile._pg_profile = True
_profile.ms_slower_then = float(profile)
# Qt UI entrypoint
from ._app import _main
for symbol in symbols:
if '.' not in symbol:
click.echo(click.style(
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
@ -159,8 +172,9 @@ def chart(config, symbol, profile, pdb):
tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel']
_main(
sym=symbol,
syms=symbols,
brokernames=brokernames,
piker_loglevel=pikerloglevel,
tractor_kwargs={
@ -170,5 +184,6 @@ def chart(config, symbol, profile, pdb):
'enable_modules': [
'piker.clearing._client'
],
'registry_addr': config.get('registry_addr'),
},
)

View File

@ -14,15 +14,6 @@ def pytest_addoption(parser):
help="Use a practice API account")
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
log.get_console_log(level)
yield level
tractor.log._default_loglevel = orig
@pytest.fixture(scope='session')
def test_config():
dirname = os.path.dirname

View File

@ -0,0 +1,65 @@
'''
Data feed layer APIs, performance, msg throttling.
'''
from pprint import pprint
import pytest
import trio
from piker import (
open_piker_runtime,
open_feed,
)
from piker.data import ShmArray
@pytest.mark.parametrize(
'fqsns',
[
['btcusdt.binance']
],
ids=lambda param: f'fqsns={param}',
)
def test_basic_rt_feed(
fqsns: list[str],
):
'''
Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down.
'''
async def main():
async with (
open_piker_runtime('test_basic_rt_feed'),
open_feed(
fqsns,
loglevel='info',
# TODO: ensure throttle rate is applied
# limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
# tick_throttle=_quote_throttle_rate,
) as feed
):
for fqin in fqsns:
assert feed.symbols[fqin]
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
count: int = 0
async for quotes in feed.stream:
# print quote msg, rt and history
# buffer values on console.
pprint(quotes)
pprint(ohlcv.array[-1])
pprint(hist_ohlcv.array[-1])
if count >= 100:
break
count += 1
trio.run(main)