Compare commits

..

No commits in common. "9b6f4d24be5ad12b6ec7cf5deab9a9d3bf07d4be" and "3e45a61287bd848647dbff851a41b47da7e906b4" have entirely different histories.

13 changed files with 67 additions and 194 deletions

View File

@ -18,10 +18,3 @@
piker: trading gear for hackers. piker: trading gear for hackers.
""" """
from ._daemon import open_piker_runtime
from .data.feed import open_feed
__all__ = [
'open_piker_runtime',
'open_feed',
]

View File

@ -199,7 +199,7 @@ async def open_piker_runtime(
# for data daemons when running in production. # for data daemons when running in production.
debug_mode: bool = False, debug_mode: bool = False,
) -> tractor.Actor: ) -> Optional[tractor._portal.Portal]:
''' '''
Start a piker actor who's runtime will automatically sync with Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration. existing piker actors on the local link based on configuration.
@ -254,7 +254,6 @@ async def maybe_open_runtime(
@acm @acm
async def maybe_open_pikerd( async def maybe_open_pikerd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs, **kwargs,
) -> Union[tractor._portal.Portal, Services]: ) -> Union[tractor._portal.Portal, Services]:
@ -267,19 +266,11 @@ async def maybe_open_pikerd(
get_console_log(loglevel) get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail # subtle, we must have the runtime up here or portal lookup will fail
async with ( async with maybe_open_runtime(loglevel, **kwargs):
maybe_open_runtime(loglevel, **kwargs),
tractor.find_actor(_root_dname) as portal async with tractor.find_actor(_root_dname) as portal:
): # assert portal is not None
# connect to any existing daemon presuming if portal is not None:
# its registry socket was selected.
if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal yield portal
return return
@ -289,7 +280,6 @@ async def maybe_open_pikerd(
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
) as _: ) as _:
# in the case where we're starting up the # in the case where we're starting up the

View File

@ -1038,13 +1038,7 @@ async def open_symbol_search(
stock_results = [] stock_results = []
async def stash_results(target: Awaitable[list]): async def stash_results(target: Awaitable[list]):
try: stock_results.extend(await target)
results = await target
except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?")
return
stock_results.extend(results)
for i in range(10): for i in range(10):
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:

View File

@ -1239,7 +1239,8 @@ async def process_client_order_cmds(
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
min_tick = feed.symbols[fqsn].tick_size sym = fqsn.replace(f'.{brokers[0]}', '')
min_tick = feed.symbols[sym].tick_size
if action == 'buy': if action == 'buy':
tickfilter = ('ask', 'last', 'trade') tickfilter = ('ask', 'last', 'trade')

View File

@ -125,19 +125,8 @@ def pikerd(
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory') @click.option('--configdir', '-c', help='Configuration directory')
@click.option('--host', '-h', default=None, help='Host addr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind')
@click.pass_context @click.pass_context
def cli( def cli(ctx, brokers, loglevel, tl, configdir):
ctx: click.Context,
brokers: list[str],
loglevel: str,
tl: bool,
configdir: str,
host: str,
port: int,
) -> None:
if configdir is not None: if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir) config._override_config_dir(configdir)
@ -149,13 +138,6 @@ def cli(
else: else:
brokermods = [get_brokermod(broker) for broker in brokers] brokermods = [get_brokermod(broker) for broker in brokers]
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
)
ctx.obj.update({ ctx.obj.update({
'brokers': brokers, 'brokers': brokers,
'brokermods': brokermods, 'brokermods': brokermods,
@ -164,7 +146,6 @@ def cli(
'log': get_console_log(loglevel), 'log': get_console_log(loglevel),
'confdir': config._config_dir, 'confdir': config._config_dir,
'wl_path': config._watchlists_data_path, 'wl_path': config._watchlists_data_path,
'registry_addr': reg_addr,
}) })
# allow enabling same loglevel in ``tractor`` machinery # allow enabling same loglevel in ``tractor`` machinery

View File

@ -1410,7 +1410,7 @@ async def open_feed(
# symbol.broker_info[brokername] = si # symbol.broker_info[brokername] = si
feed.symbols[fqsn] = symbol feed.symbols[fqsn] = symbol
feed.symbols[f'{sym}.{brokername}'] = symbol feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this # cast shm dtype to list... can't member why we need this
for shm_key, shm in [ for shm_key, shm in [

View File

@ -66,7 +66,7 @@ async def _async_main(
# implicit required argument provided by ``qtractor_run()`` # implicit required argument provided by ``qtractor_run()``
main_widget: GodWidget, main_widget: GodWidget,
syms: list[str], sym: str,
brokernames: str, brokernames: str,
loglevel: str, loglevel: str,
@ -113,16 +113,12 @@ async def _async_main(
# godwidget.hbox.addWidget(search) # godwidget.hbox.addWidget(search)
godwidget.search = search godwidget.search = search
symbols: list[str] = []
for sym in syms:
symbol, _, provider = sym.rpartition('.') symbol, _, provider = sym.rpartition('.')
symbols.append(symbol)
# this internally starts a ``display_symbol_data()`` task above # this internally starts a ``display_symbol_data()`` task above
order_mode_ready = await godwidget.load_symbols( order_mode_ready = await godwidget.load_symbol(
provider, provider,
symbols, symbol,
loglevel loglevel
) )
@ -170,7 +166,7 @@ async def _async_main(
def _main( def _main(
syms: list[str], sym: str,
brokernames: [str], brokernames: [str],
piker_loglevel: str, piker_loglevel: str,
tractor_kwargs, tractor_kwargs,
@ -182,7 +178,7 @@ def _main(
''' '''
run_qtractor( run_qtractor(
func=_async_main, func=_async_main,
args=(syms, brokernames, piker_loglevel), args=(sym, brokernames, piker_loglevel),
main_widget_type=GodWidget, main_widget_type=GodWidget,
tractor_kwargs=tractor_kwargs, tractor_kwargs=tractor_kwargs,
) )

View File

@ -186,10 +186,10 @@ class GodWidget(QWidget):
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore ) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
return self._chart_cache.get(symbol_key) return self._chart_cache.get(symbol_key)
async def load_symbols( async def load_symbol(
self, self,
providername: str, providername: str,
symbol_keys: list[str], symbol_key: str,
loglevel: str, loglevel: str,
reset: bool = False, reset: bool = False,
@ -200,20 +200,12 @@ class GodWidget(QWidget):
Expects a ``numpy`` structured array containing all the ohlcv fields. Expects a ``numpy`` structured array containing all the ohlcv fields.
''' '''
fqsns: list[str] = []
# our symbol key style is always lower case # our symbol key style is always lower case
for key in list(map(str.lower, symbol_keys)): symbol_key = symbol_key.lower()
# fully qualified symbol name (SNS i guess is what we're making?) # fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([key, providername]) fqsn = '.'.join([symbol_key, providername])
fqsns.append(fqsn) all_linked = self.get_chart_symbol(fqsn)
# NOTE: for now we use the first symbol in the set as the "key"
# for the overlay of feeds on the chart.
group_key = fqsns[0]
all_linked = self.get_chart_symbol(group_key)
order_mode_started = trio.Event() order_mode_started = trio.Event()
if not self.vbox.isEmpty(): if not self.vbox.isEmpty():
@ -246,7 +238,7 @@ class GodWidget(QWidget):
display_symbol_data, display_symbol_data,
self, self,
providername, providername,
fqsns, symbol_key,
loglevel, loglevel,
order_mode_started, order_mode_started,
) )

View File

@ -947,7 +947,7 @@ async def link_views_with_region(
async def display_symbol_data( async def display_symbol_data(
godwidget: GodWidget, godwidget: GodWidget,
provider: str, provider: str,
fqsns: list[str], sym: str,
loglevel: str, loglevel: str,
order_mode_started: trio.Event, order_mode_started: trio.Event,
@ -961,6 +961,11 @@ async def display_symbol_data(
''' '''
sbar = godwidget.window.status_bar sbar = godwidget.window.status_bar
loading_sym_key = sbar.open_status(
f'loading {sym}.{provider} ->',
group_key=True
)
# historical data fetch # historical data fetch
# brokermod = brokers.get_brokermod(provider) # brokermod = brokers.get_brokermod(provider)
@ -969,18 +974,10 @@ async def display_symbol_data(
# clear_on_next=True, # clear_on_next=True,
# group_key=loading_sym_key, # group_key=loading_sym_key,
# ) # )
fqsn = '.'.join((sym, provider))
for fqsn in fqsns:
loading_sym_key = sbar.open_status(
f'loading {fqsn} ->',
group_key=True
)
first_fqsn = fqsns[0]
async with open_feed( async with open_feed(
fqsns, [fqsn],
loglevel=loglevel, loglevel=loglevel,
# limit to at least display's FPS # limit to at least display's FPS
@ -991,7 +988,7 @@ async def display_symbol_data(
ohlcv: ShmArray = feed.rt_shm ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm hist_ohlcv: ShmArray = feed.hist_shm
symbol = feed.symbols[first_fqsn] symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
step_size_s = 1 step_size_s = 1
@ -1028,7 +1025,7 @@ async def display_symbol_data(
godwidget.pp_pane = pp_pane godwidget.pp_pane = pp_pane
# create main OHLC chart # create main OHLC chart
ohlc_chart = rt_linked.plot_ohlc_main( chart = rt_linked.plot_ohlc_main(
symbol, symbol,
ohlcv, ohlcv,
# in the case of history chart we explicitly set `False` # in the case of history chart we explicitly set `False`
@ -1036,8 +1033,8 @@ async def display_symbol_data(
sidepane=pp_pane, sidepane=pp_pane,
) )
ohlc_chart._feeds[symbol.key] = feed chart._feeds[symbol.key] = feed
ohlc_chart.setFocus() chart.setFocus()
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# plot historical vwap if available # plot historical vwap if available
@ -1047,7 +1044,7 @@ async def display_symbol_data(
# and 'bar_wap' in bars.dtype.fields # and 'bar_wap' in bars.dtype.fields
# ): # ):
# wap_in_history = True # wap_in_history = True
# ohlc_chart.draw_curve( # chart.draw_curve(
# name='bar_wap', # name='bar_wap',
# shm=ohlcv, # shm=ohlcv,
# color='default_light', # color='default_light',
@ -1108,7 +1105,7 @@ async def display_symbol_data(
await trio.sleep(0) await trio.sleep(0)
# size view to data prior to order mode init # size view to data prior to order mode init
ohlc_chart.default_view() chart.default_view()
rt_linked.graphics_cycle() rt_linked.graphics_cycle()
await trio.sleep(0) await trio.sleep(0)
@ -1122,7 +1119,7 @@ async def display_symbol_data(
godwidget.resize_all() godwidget.resize_all()
await link_views_with_region( await link_views_with_region(
ohlc_chart, chart,
hist_chart, hist_chart,
feed, feed,
) )
@ -1138,7 +1135,7 @@ async def display_symbol_data(
): ):
if not vlm_chart: if not vlm_chart:
# trigger another view reset if no sub-chart # trigger another view reset if no sub-chart
ohlc_chart.default_view() chart.default_view()
rt_linked.mode = mode rt_linked.mode = mode

View File

@ -665,9 +665,9 @@ class SearchWidget(QtWidgets.QWidget):
log.info(f'Requesting symbol: {symbol}.{provider}') log.info(f'Requesting symbol: {symbol}.{provider}')
await godw.load_symbols( await godw.load_symbol(
provider, provider,
[symbol], symbol,
'info', 'info',
) )

View File

@ -46,10 +46,8 @@ def _kivy_import_hack():
@click.argument('name', nargs=1, required=True) @click.argument('name', nargs=1, required=True)
@click.pass_obj @click.pass_obj
def monitor(config, rate, name, dhost, test, tl): def monitor(config, rate, name, dhost, test, tl):
''' """Start a real-time watchlist UI
Start a real-time watchlist UI """
'''
# global opts # global opts
brokermod = config['brokermods'][0] brokermod = config['brokermods'][0]
loglevel = config['loglevel'] loglevel = config['loglevel']
@ -72,12 +70,8 @@ def monitor(config, rate, name, dhost, test, tl):
) as portal: ) as portal:
# run app "main" # run app "main"
await _async_main( await _async_main(
name, name, portal, tickers,
portal, brokermod, rate, test=test,
tickers,
brokermod,
rate,
test=test,
) )
tractor.run( tractor.run(
@ -128,7 +122,7 @@ def optschain(config, symbol, date, rate, test):
@cli.command() @cli.command()
@click.option( @click.option(
'--profile', '--profile',
# '-p', '-p',
default=None, default=None,
help='Enable pyqtgraph profiling' help='Enable pyqtgraph profiling'
) )
@ -137,14 +131,9 @@ def optschain(config, symbol, date, rate, test):
is_flag=True, is_flag=True,
help='Enable tractor debug mode' help='Enable tractor debug mode'
) )
@click.argument('symbols', nargs=-1, required=True) @click.argument('symbol', required=True)
@click.pass_obj @click.pass_obj
def chart( def chart(config, symbol, profile, pdb):
config,
symbols: list[str],
profile,
pdb: bool,
):
''' '''
Start a real-time chartng UI Start a real-time chartng UI
@ -155,10 +144,8 @@ def chart(
_profile._pg_profile = True _profile._pg_profile = True
_profile.ms_slower_then = float(profile) _profile.ms_slower_then = float(profile)
# Qt UI entrypoint
from ._app import _main from ._app import _main
for symbol in symbols:
if '.' not in symbol: if '.' not in symbol:
click.echo(click.style( click.echo(click.style(
f'symbol: {symbol} must have a {symbol}.<provider> suffix', f'symbol: {symbol} must have a {symbol}.<provider> suffix',
@ -172,9 +159,8 @@ def chart(
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel'] pikerloglevel = config['loglevel']
_main( _main(
syms=symbols, sym=symbol,
brokernames=brokernames, brokernames=brokernames,
piker_loglevel=pikerloglevel, piker_loglevel=pikerloglevel,
tractor_kwargs={ tractor_kwargs={
@ -184,6 +170,5 @@ def chart(
'enable_modules': [ 'enable_modules': [
'piker.clearing._client' 'piker.clearing._client'
], ],
'registry_addr': config.get('registry_addr'),
}, },
) )

View File

@ -14,6 +14,15 @@ def pytest_addoption(parser):
help="Use a practice API account") help="Use a practice API account")
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
log.get_console_log(level)
yield level
tractor.log._default_loglevel = orig
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def test_config(): def test_config():
dirname = os.path.dirname dirname = os.path.dirname

View File

@ -1,65 +0,0 @@
'''
Data feed layer APIs, performance, msg throttling.
'''
from pprint import pprint
import pytest
import trio
from piker import (
open_piker_runtime,
open_feed,
)
from piker.data import ShmArray
@pytest.mark.parametrize(
'fqsns',
[
['btcusdt.binance']
],
ids=lambda param: f'fqsns={param}',
)
def test_basic_rt_feed(
fqsns: list[str],
):
'''
Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down.
'''
async def main():
async with (
open_piker_runtime('test_basic_rt_feed'),
open_feed(
fqsns,
loglevel='info',
# TODO: ensure throttle rate is applied
# limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
# tick_throttle=_quote_throttle_rate,
) as feed
):
for fqin in fqsns:
assert feed.symbols[fqin]
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
count: int = 0
async for quotes in feed.stream:
# print quote msg, rt and history
# buffer values on console.
pprint(quotes)
pprint(ohlcv.array[-1])
pprint(hist_ohlcv.array[-1])
if count >= 100:
break
count += 1
trio.run(main)