Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 9b6f4d24be Just warn on `ib` symbol search lags 2022-11-08 14:50:31 -05:00
Tyler Goodlet 4eea8042ff Start data feed layer test suite
Initial test that starts a `binance` feed and reads the quote messages
alongside shm buffers for 1s and 1m OHLC; just prints to console for
now.

Template out parametrization for multi-symbol quote-multiplexed feeds
which coming soon B)
2022-11-07 15:40:52 -05:00
Tyler Goodlet 9fc45c2bff Drop `tractor.log` level override fixture 2022-11-07 15:40:41 -05:00
Tyler Goodlet e7751cb5dd EMS: expect fqsn key in `Feed.symbols` 2022-11-07 15:40:01 -05:00
Tyler Goodlet 79b27899bf Use new `GodWidget.load_symbols()` from search 2022-11-07 15:39:28 -05:00
Tyler Goodlet 312c1552cd Expose `.open_feed()` and `open_piker_runtime()` eps at top level 2022-11-07 15:38:54 -05:00
Tyler Goodlet f1b5c6e62c Make all UI entrypoints accept an fqsn `list`
This is to prep for multi-symbol feeds and charts so we accept
a sequence of fqsns to the top level entrypoints as well as the
`.data.feed.open_feed()` API (though we're not actually supporting true
multiplexed feeds nor shm lookups per fqsn yet).
2022-11-07 15:33:52 -05:00
Tyler Goodlet 16699bdc88 Passthrough registry sockaddr from chart cmd to daemon 2022-11-07 13:05:52 -05:00
Tyler Goodlet 5151971131 Always set fqsn in `Feed.symbols: dict` 2022-11-07 13:04:58 -05:00
Tyler Goodlet 81f0fc77e3 Add registry socket cli flags to all client cmds
Allows starting UI apps and passing the `pikerd` registry socket-addr
args via `--host` or `--port` such that a separate actor tree can be
started by selecting an unused port. This is handy when hacking new
features but while also wishing to run a more stable version of the code
for trading on the same host.
2022-11-07 11:22:38 -05:00
13 changed files with 195 additions and 68 deletions

View File

@ -18,3 +18,10 @@
piker: trading gear for hackers. piker: trading gear for hackers.
""" """
from ._daemon import open_piker_runtime
from .data.feed import open_feed
__all__ = [
'open_piker_runtime',
'open_feed',
]

View File

@ -199,7 +199,7 @@ async def open_piker_runtime(
# for data daemons when running in production. # for data daemons when running in production.
debug_mode: bool = False, debug_mode: bool = False,
) -> Optional[tractor._portal.Portal]: ) -> tractor.Actor:
''' '''
Start a piker actor who's runtime will automatically sync with Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration. existing piker actors on the local link based on configuration.
@ -254,6 +254,7 @@ async def maybe_open_runtime(
@acm @acm
async def maybe_open_pikerd( async def maybe_open_pikerd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs, **kwargs,
) -> Union[tractor._portal.Portal, Services]: ) -> Union[tractor._portal.Portal, Services]:
@ -266,13 +267,21 @@ async def maybe_open_pikerd(
get_console_log(loglevel) get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail # subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs): async with (
maybe_open_runtime(loglevel, **kwargs),
async with tractor.find_actor(_root_dname) as portal: tractor.find_actor(_root_dname) as portal
# assert portal is not None ):
if portal is not None: # connect to any existing daemon presuming
yield portal # its registry socket was selected.
return if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal
return
# presume pikerd role since no daemon could be found at # presume pikerd role since no daemon could be found at
# configured address # configured address
@ -280,6 +289,7 @@ async def maybe_open_pikerd(
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
) as _: ) as _:
# in the case where we're starting up the # in the case where we're starting up the

View File

@ -1038,7 +1038,13 @@ async def open_symbol_search(
stock_results = [] stock_results = []
async def stash_results(target: Awaitable[list]): async def stash_results(target: Awaitable[list]):
stock_results.extend(await target) try:
results = await target
except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?")
return
stock_results.extend(results)
for i in range(10): for i in range(10):
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:

View File

@ -1239,8 +1239,7 @@ async def process_client_order_cmds(
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
sym = fqsn.replace(f'.{brokers[0]}', '') min_tick = feed.symbols[fqsn].tick_size
min_tick = feed.symbols[sym].tick_size
if action == 'buy': if action == 'buy':
tickfilter = ('ask', 'last', 'trade') tickfilter = ('ask', 'last', 'trade')

View File

@ -125,8 +125,19 @@ def pikerd(
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory') @click.option('--configdir', '-c', help='Configuration directory')
@click.option('--host', '-h', default=None, help='Host addr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind')
@click.pass_context @click.pass_context
def cli(ctx, brokers, loglevel, tl, configdir): def cli(
ctx: click.Context,
brokers: list[str],
loglevel: str,
tl: bool,
configdir: str,
host: str,
port: int,
) -> None:
if configdir is not None: if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir) config._override_config_dir(configdir)
@ -138,6 +149,13 @@ def cli(ctx, brokers, loglevel, tl, configdir):
else: else:
brokermods = [get_brokermod(broker) for broker in brokers] brokermods = [get_brokermod(broker) for broker in brokers]
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
)
ctx.obj.update({ ctx.obj.update({
'brokers': brokers, 'brokers': brokers,
'brokermods': brokermods, 'brokermods': brokermods,
@ -146,6 +164,7 @@ def cli(ctx, brokers, loglevel, tl, configdir):
'log': get_console_log(loglevel), 'log': get_console_log(loglevel),
'confdir': config._config_dir, 'confdir': config._config_dir,
'wl_path': config._watchlists_data_path, 'wl_path': config._watchlists_data_path,
'registry_addr': reg_addr,
}) })
# allow enabling same loglevel in ``tractor`` machinery # allow enabling same loglevel in ``tractor`` machinery

View File

@ -1410,7 +1410,7 @@ async def open_feed(
# symbol.broker_info[brokername] = si # symbol.broker_info[brokername] = si
feed.symbols[fqsn] = symbol feed.symbols[fqsn] = symbol
feed.symbols[sym] = symbol feed.symbols[f'{sym}.{brokername}'] = symbol
# cast shm dtype to list... can't member why we need this # cast shm dtype to list... can't member why we need this
for shm_key, shm in [ for shm_key, shm in [

View File

@ -66,7 +66,7 @@ async def _async_main(
# implicit required argument provided by ``qtractor_run()`` # implicit required argument provided by ``qtractor_run()``
main_widget: GodWidget, main_widget: GodWidget,
sym: str, syms: list[str],
brokernames: str, brokernames: str,
loglevel: str, loglevel: str,
@ -113,12 +113,16 @@ async def _async_main(
# godwidget.hbox.addWidget(search) # godwidget.hbox.addWidget(search)
godwidget.search = search godwidget.search = search
symbol, _, provider = sym.rpartition('.') symbols: list[str] = []
for sym in syms:
symbol, _, provider = sym.rpartition('.')
symbols.append(symbol)
# this internally starts a ``display_symbol_data()`` task above # this internally starts a ``display_symbol_data()`` task above
order_mode_ready = await godwidget.load_symbol( order_mode_ready = await godwidget.load_symbols(
provider, provider,
symbol, symbols,
loglevel loglevel
) )
@ -166,7 +170,7 @@ async def _async_main(
def _main( def _main(
sym: str, syms: list[str],
brokernames: [str], brokernames: [str],
piker_loglevel: str, piker_loglevel: str,
tractor_kwargs, tractor_kwargs,
@ -178,7 +182,7 @@ def _main(
''' '''
run_qtractor( run_qtractor(
func=_async_main, func=_async_main,
args=(sym, brokernames, piker_loglevel), args=(syms, brokernames, piker_loglevel),
main_widget_type=GodWidget, main_widget_type=GodWidget,
tractor_kwargs=tractor_kwargs, tractor_kwargs=tractor_kwargs,
) )

View File

@ -186,10 +186,10 @@ class GodWidget(QWidget):
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore ) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
return self._chart_cache.get(symbol_key) return self._chart_cache.get(symbol_key)
async def load_symbol( async def load_symbols(
self, self,
providername: str, providername: str,
symbol_key: str, symbol_keys: list[str],
loglevel: str, loglevel: str,
reset: bool = False, reset: bool = False,
@ -200,12 +200,20 @@ class GodWidget(QWidget):
Expects a ``numpy`` structured array containing all the ohlcv fields. Expects a ``numpy`` structured array containing all the ohlcv fields.
''' '''
# our symbol key style is always lower case fqsns: list[str] = []
symbol_key = symbol_key.lower()
# fully qualified symbol name (SNS i guess is what we're making?) # our symbol key style is always lower case
fqsn = '.'.join([symbol_key, providername]) for key in list(map(str.lower, symbol_keys)):
all_linked = self.get_chart_symbol(fqsn)
# fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([key, providername])
fqsns.append(fqsn)
# NOTE: for now we use the first symbol in the set as the "key"
# for the overlay of feeds on the chart.
group_key = fqsns[0]
all_linked = self.get_chart_symbol(group_key)
order_mode_started = trio.Event() order_mode_started = trio.Event()
if not self.vbox.isEmpty(): if not self.vbox.isEmpty():
@ -238,7 +246,7 @@ class GodWidget(QWidget):
display_symbol_data, display_symbol_data,
self, self,
providername, providername,
symbol_key, fqsns,
loglevel, loglevel,
order_mode_started, order_mode_started,
) )

View File

@ -947,7 +947,7 @@ async def link_views_with_region(
async def display_symbol_data( async def display_symbol_data(
godwidget: GodWidget, godwidget: GodWidget,
provider: str, provider: str,
sym: str, fqsns: list[str],
loglevel: str, loglevel: str,
order_mode_started: trio.Event, order_mode_started: trio.Event,
@ -961,11 +961,6 @@ async def display_symbol_data(
''' '''
sbar = godwidget.window.status_bar sbar = godwidget.window.status_bar
loading_sym_key = sbar.open_status(
f'loading {sym}.{provider} ->',
group_key=True
)
# historical data fetch # historical data fetch
# brokermod = brokers.get_brokermod(provider) # brokermod = brokers.get_brokermod(provider)
@ -974,10 +969,18 @@ async def display_symbol_data(
# clear_on_next=True, # clear_on_next=True,
# group_key=loading_sym_key, # group_key=loading_sym_key,
# ) # )
fqsn = '.'.join((sym, provider))
for fqsn in fqsns:
loading_sym_key = sbar.open_status(
f'loading {fqsn} ->',
group_key=True
)
first_fqsn = fqsns[0]
async with open_feed( async with open_feed(
[fqsn], fqsns,
loglevel=loglevel, loglevel=loglevel,
# limit to at least display's FPS # limit to at least display's FPS
@ -988,7 +991,7 @@ async def display_symbol_data(
ohlcv: ShmArray = feed.rt_shm ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm hist_ohlcv: ShmArray = feed.hist_shm
symbol = feed.symbols[sym] symbol = feed.symbols[first_fqsn]
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
step_size_s = 1 step_size_s = 1
@ -1025,7 +1028,7 @@ async def display_symbol_data(
godwidget.pp_pane = pp_pane godwidget.pp_pane = pp_pane
# create main OHLC chart # create main OHLC chart
chart = rt_linked.plot_ohlc_main( ohlc_chart = rt_linked.plot_ohlc_main(
symbol, symbol,
ohlcv, ohlcv,
# in the case of history chart we explicitly set `False` # in the case of history chart we explicitly set `False`
@ -1033,8 +1036,8 @@ async def display_symbol_data(
sidepane=pp_pane, sidepane=pp_pane,
) )
chart._feeds[symbol.key] = feed ohlc_chart._feeds[symbol.key] = feed
chart.setFocus() ohlc_chart.setFocus()
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# plot historical vwap if available # plot historical vwap if available
@ -1044,7 +1047,7 @@ async def display_symbol_data(
# and 'bar_wap' in bars.dtype.fields # and 'bar_wap' in bars.dtype.fields
# ): # ):
# wap_in_history = True # wap_in_history = True
# chart.draw_curve( # ohlc_chart.draw_curve(
# name='bar_wap', # name='bar_wap',
# shm=ohlcv, # shm=ohlcv,
# color='default_light', # color='default_light',
@ -1105,7 +1108,7 @@ async def display_symbol_data(
await trio.sleep(0) await trio.sleep(0)
# size view to data prior to order mode init # size view to data prior to order mode init
chart.default_view() ohlc_chart.default_view()
rt_linked.graphics_cycle() rt_linked.graphics_cycle()
await trio.sleep(0) await trio.sleep(0)
@ -1119,7 +1122,7 @@ async def display_symbol_data(
godwidget.resize_all() godwidget.resize_all()
await link_views_with_region( await link_views_with_region(
chart, ohlc_chart,
hist_chart, hist_chart,
feed, feed,
) )
@ -1135,7 +1138,7 @@ async def display_symbol_data(
): ):
if not vlm_chart: if not vlm_chart:
# trigger another view reset if no sub-chart # trigger another view reset if no sub-chart
chart.default_view() ohlc_chart.default_view()
rt_linked.mode = mode rt_linked.mode = mode

View File

@ -665,9 +665,9 @@ class SearchWidget(QtWidgets.QWidget):
log.info(f'Requesting symbol: {symbol}.{provider}') log.info(f'Requesting symbol: {symbol}.{provider}')
await godw.load_symbol( await godw.load_symbols(
provider, provider,
symbol, [symbol],
'info', 'info',
) )

View File

@ -46,8 +46,10 @@ def _kivy_import_hack():
@click.argument('name', nargs=1, required=True) @click.argument('name', nargs=1, required=True)
@click.pass_obj @click.pass_obj
def monitor(config, rate, name, dhost, test, tl): def monitor(config, rate, name, dhost, test, tl):
"""Start a real-time watchlist UI '''
""" Start a real-time watchlist UI
'''
# global opts # global opts
brokermod = config['brokermods'][0] brokermod = config['brokermods'][0]
loglevel = config['loglevel'] loglevel = config['loglevel']
@ -70,8 +72,12 @@ def monitor(config, rate, name, dhost, test, tl):
) as portal: ) as portal:
# run app "main" # run app "main"
await _async_main( await _async_main(
name, portal, tickers, name,
brokermod, rate, test=test, portal,
tickers,
brokermod,
rate,
test=test,
) )
tractor.run( tractor.run(
@ -122,7 +128,7 @@ def optschain(config, symbol, date, rate, test):
@cli.command() @cli.command()
@click.option( @click.option(
'--profile', '--profile',
'-p', # '-p',
default=None, default=None,
help='Enable pyqtgraph profiling' help='Enable pyqtgraph profiling'
) )
@ -131,9 +137,14 @@ def optschain(config, symbol, date, rate, test):
is_flag=True, is_flag=True,
help='Enable tractor debug mode' help='Enable tractor debug mode'
) )
@click.argument('symbol', required=True) @click.argument('symbols', nargs=-1, required=True)
@click.pass_obj @click.pass_obj
def chart(config, symbol, profile, pdb): def chart(
config,
symbols: list[str],
profile,
pdb: bool,
):
''' '''
Start a real-time chartng UI Start a real-time chartng UI
@ -144,14 +155,16 @@ def chart(config, symbol, profile, pdb):
_profile._pg_profile = True _profile._pg_profile = True
_profile.ms_slower_then = float(profile) _profile.ms_slower_then = float(profile)
# Qt UI entrypoint
from ._app import _main from ._app import _main
if '.' not in symbol: for symbol in symbols:
click.echo(click.style( if '.' not in symbol:
f'symbol: {symbol} must have a {symbol}.<provider> suffix', click.echo(click.style(
fg='red', f'symbol: {symbol} must have a {symbol}.<provider> suffix',
)) fg='red',
return ))
return
# global opts # global opts
@ -159,8 +172,9 @@ def chart(config, symbol, profile, pdb):
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel'] pikerloglevel = config['loglevel']
_main( _main(
sym=symbol, syms=symbols,
brokernames=brokernames, brokernames=brokernames,
piker_loglevel=pikerloglevel, piker_loglevel=pikerloglevel,
tractor_kwargs={ tractor_kwargs={
@ -170,5 +184,6 @@ def chart(config, symbol, profile, pdb):
'enable_modules': [ 'enable_modules': [
'piker.clearing._client' 'piker.clearing._client'
], ],
'registry_addr': config.get('registry_addr'),
}, },
) )

View File

@ -14,15 +14,6 @@ def pytest_addoption(parser):
help="Use a practice API account") help="Use a practice API account")
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
log.get_console_log(level)
yield level
tractor.log._default_loglevel = orig
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def test_config(): def test_config():
dirname = os.path.dirname dirname = os.path.dirname

View File

@ -0,0 +1,65 @@
'''
Data feed layer APIs, performance, msg throttling.
'''
from pprint import pprint
import pytest
import trio
from piker import (
open_piker_runtime,
open_feed,
)
from piker.data import ShmArray
@pytest.mark.parametrize(
'fqsns',
[
['btcusdt.binance']
],
ids=lambda param: f'fqsns={param}',
)
def test_basic_rt_feed(
fqsns: list[str],
):
'''
Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down.
'''
async def main():
async with (
open_piker_runtime('test_basic_rt_feed'),
open_feed(
fqsns,
loglevel='info',
# TODO: ensure throttle rate is applied
# limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
# tick_throttle=_quote_throttle_rate,
) as feed
):
for fqin in fqsns:
assert feed.symbols[fqin]
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
count: int = 0
async for quotes in feed.stream:
# print quote msg, rt and history
# buffer values on console.
pprint(quotes)
pprint(ohlcv.array[-1])
pprint(hist_ohlcv.array[-1])
if count >= 100:
break
count += 1
trio.run(main)