Compare commits

..

No commits in common. "9b6f4d24be5ad12b6ec7cf5deab9a9d3bf07d4be" and "3e45a61287bd848647dbff851a41b47da7e906b4" have entirely different histories.

13 changed files with 67 additions and 194 deletions

View File

@ -18,10 +18,3 @@
piker: trading gear for hackers.
"""
from ._daemon import open_piker_runtime
from .data.feed import open_feed
__all__ = [
'open_piker_runtime',
'open_feed',
]

View File

@ -199,7 +199,7 @@ async def open_piker_runtime(
# for data daemons when running in production.
debug_mode: bool = False,
) -> tractor.Actor:
) -> Optional[tractor._portal.Portal]:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
@ -254,7 +254,6 @@ async def maybe_open_runtime(
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
@ -267,21 +266,13 @@ async def maybe_open_pikerd(
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
async with (
maybe_open_runtime(loglevel, **kwargs),
tractor.find_actor(_root_dname) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal
return
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
@ -289,7 +280,6 @@ async def maybe_open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
) as _:
# in the case where we're starting up the

View File

@ -1038,13 +1038,7 @@ async def open_symbol_search(
stock_results = []
async def stash_results(target: Awaitable[list]):
try:
results = await target
except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?")
return
stock_results.extend(results)
stock_results.extend(await target)
for i in range(10):
with trio.move_on_after(3) as cs:

View File

@ -1239,7 +1239,8 @@ async def process_client_order_cmds(
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
min_tick = feed.symbols[fqsn].tick_size
sym = fqsn.replace(f'.{brokers[0]}', '')
min_tick = feed.symbols[sym].tick_size
if action == 'buy':
tickfilter = ('ask', 'last', 'trade')

View File

@ -125,19 +125,8 @@ def pikerd(
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory')
@click.option('--host', '-h', default=None, help='Host addr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind')
@click.pass_context
def cli(
ctx: click.Context,
brokers: list[str],
loglevel: str,
tl: bool,
configdir: str,
host: str,
port: int,
) -> None:
def cli(ctx, brokers, loglevel, tl, configdir):
if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir)
@ -149,13 +138,6 @@ def cli(
else:
brokermods = [get_brokermod(broker) for broker in brokers]
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
)
ctx.obj.update({
'brokers': brokers,
'brokermods': brokermods,
@ -164,7 +146,6 @@ def cli(
'log': get_console_log(loglevel),
'confdir': config._config_dir,
'wl_path': config._watchlists_data_path,
'registry_addr': reg_addr,
})
# allow enabling same loglevel in ``tractor`` machinery

View File

@ -1410,7 +1410,7 @@ async def open_feed(
# symbol.broker_info[brokername] = si
feed.symbols[fqsn] = symbol
feed.symbols[f'{sym}.{brokername}'] = symbol
feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this
for shm_key, shm in [

View File

@ -66,7 +66,7 @@ async def _async_main(
# implicit required argument provided by ``qtractor_run()``
main_widget: GodWidget,
syms: list[str],
sym: str,
brokernames: str,
loglevel: str,
@ -113,16 +113,12 @@ async def _async_main(
# godwidget.hbox.addWidget(search)
godwidget.search = search
symbols: list[str] = []
for sym in syms:
symbol, _, provider = sym.rpartition('.')
symbols.append(symbol)
symbol, _, provider = sym.rpartition('.')
# this internally starts a ``display_symbol_data()`` task above
order_mode_ready = await godwidget.load_symbols(
order_mode_ready = await godwidget.load_symbol(
provider,
symbols,
symbol,
loglevel
)
@ -170,7 +166,7 @@ async def _async_main(
def _main(
syms: list[str],
sym: str,
brokernames: [str],
piker_loglevel: str,
tractor_kwargs,
@ -182,7 +178,7 @@ def _main(
'''
run_qtractor(
func=_async_main,
args=(syms, brokernames, piker_loglevel),
args=(sym, brokernames, piker_loglevel),
main_widget_type=GodWidget,
tractor_kwargs=tractor_kwargs,
)

View File

@ -186,10 +186,10 @@ class GodWidget(QWidget):
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
return self._chart_cache.get(symbol_key)
async def load_symbols(
async def load_symbol(
self,
providername: str,
symbol_keys: list[str],
symbol_key: str,
loglevel: str,
reset: bool = False,
@ -200,20 +200,12 @@ class GodWidget(QWidget):
Expects a ``numpy`` structured array containing all the ohlcv fields.
'''
fqsns: list[str] = []
# our symbol key style is always lower case
for key in list(map(str.lower, symbol_keys)):
symbol_key = symbol_key.lower()
# fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([key, providername])
fqsns.append(fqsn)
# NOTE: for now we use the first symbol in the set as the "key"
# for the overlay of feeds on the chart.
group_key = fqsns[0]
all_linked = self.get_chart_symbol(group_key)
# fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([symbol_key, providername])
all_linked = self.get_chart_symbol(fqsn)
order_mode_started = trio.Event()
if not self.vbox.isEmpty():
@ -246,7 +238,7 @@ class GodWidget(QWidget):
display_symbol_data,
self,
providername,
fqsns,
symbol_key,
loglevel,
order_mode_started,
)

View File

@ -947,7 +947,7 @@ async def link_views_with_region(
async def display_symbol_data(
godwidget: GodWidget,
provider: str,
fqsns: list[str],
sym: str,
loglevel: str,
order_mode_started: trio.Event,
@ -961,6 +961,11 @@ async def display_symbol_data(
'''
sbar = godwidget.window.status_bar
loading_sym_key = sbar.open_status(
f'loading {sym}.{provider} ->',
group_key=True
)
# historical data fetch
# brokermod = brokers.get_brokermod(provider)
@ -969,18 +974,10 @@ async def display_symbol_data(
# clear_on_next=True,
# group_key=loading_sym_key,
# )
for fqsn in fqsns:
loading_sym_key = sbar.open_status(
f'loading {fqsn} ->',
group_key=True
)
first_fqsn = fqsns[0]
fqsn = '.'.join((sym, provider))
async with open_feed(
fqsns,
[fqsn],
loglevel=loglevel,
# limit to at least display's FPS
@ -991,7 +988,7 @@ async def display_symbol_data(
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
symbol = feed.symbols[first_fqsn]
symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn()
step_size_s = 1
@ -1028,7 +1025,7 @@ async def display_symbol_data(
godwidget.pp_pane = pp_pane
# create main OHLC chart
ohlc_chart = rt_linked.plot_ohlc_main(
chart = rt_linked.plot_ohlc_main(
symbol,
ohlcv,
# in the case of history chart we explicitly set `False`
@ -1036,8 +1033,8 @@ async def display_symbol_data(
sidepane=pp_pane,
)
ohlc_chart._feeds[symbol.key] = feed
ohlc_chart.setFocus()
chart._feeds[symbol.key] = feed
chart.setFocus()
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# plot historical vwap if available
@ -1047,7 +1044,7 @@ async def display_symbol_data(
# and 'bar_wap' in bars.dtype.fields
# ):
# wap_in_history = True
# ohlc_chart.draw_curve(
# chart.draw_curve(
# name='bar_wap',
# shm=ohlcv,
# color='default_light',
@ -1108,7 +1105,7 @@ async def display_symbol_data(
await trio.sleep(0)
# size view to data prior to order mode init
ohlc_chart.default_view()
chart.default_view()
rt_linked.graphics_cycle()
await trio.sleep(0)
@ -1122,7 +1119,7 @@ async def display_symbol_data(
godwidget.resize_all()
await link_views_with_region(
ohlc_chart,
chart,
hist_chart,
feed,
)
@ -1138,7 +1135,7 @@ async def display_symbol_data(
):
if not vlm_chart:
# trigger another view reset if no sub-chart
ohlc_chart.default_view()
chart.default_view()
rt_linked.mode = mode

View File

@ -665,9 +665,9 @@ class SearchWidget(QtWidgets.QWidget):
log.info(f'Requesting symbol: {symbol}.{provider}')
await godw.load_symbols(
await godw.load_symbol(
provider,
[symbol],
symbol,
'info',
)

View File

@ -46,10 +46,8 @@ def _kivy_import_hack():
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def monitor(config, rate, name, dhost, test, tl):
'''
Start a real-time watchlist UI
'''
"""Start a real-time watchlist UI
"""
# global opts
brokermod = config['brokermods'][0]
loglevel = config['loglevel']
@ -72,12 +70,8 @@ def monitor(config, rate, name, dhost, test, tl):
) as portal:
# run app "main"
await _async_main(
name,
portal,
tickers,
brokermod,
rate,
test=test,
name, portal, tickers,
brokermod, rate, test=test,
)
tractor.run(
@ -128,7 +122,7 @@ def optschain(config, symbol, date, rate, test):
@cli.command()
@click.option(
'--profile',
# '-p',
'-p',
default=None,
help='Enable pyqtgraph profiling'
)
@ -137,14 +131,9 @@ def optschain(config, symbol, date, rate, test):
is_flag=True,
help='Enable tractor debug mode'
)
@click.argument('symbols', nargs=-1, required=True)
@click.argument('symbol', required=True)
@click.pass_obj
def chart(
config,
symbols: list[str],
profile,
pdb: bool,
):
def chart(config, symbol, profile, pdb):
'''
Start a real-time chartng UI
@ -155,16 +144,14 @@ def chart(
_profile._pg_profile = True
_profile.ms_slower_then = float(profile)
# Qt UI entrypoint
from ._app import _main
for symbol in symbols:
if '.' not in symbol:
click.echo(click.style(
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
fg='red',
))
return
if '.' not in symbol:
click.echo(click.style(
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
fg='red',
))
return
# global opts
@ -172,9 +159,8 @@ def chart(
tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel']
_main(
syms=symbols,
sym=symbol,
brokernames=brokernames,
piker_loglevel=pikerloglevel,
tractor_kwargs={
@ -184,6 +170,5 @@ def chart(
'enable_modules': [
'piker.clearing._client'
],
'registry_addr': config.get('registry_addr'),
},
)

View File

@ -14,6 +14,15 @@ def pytest_addoption(parser):
help="Use a practice API account")
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
log.get_console_log(level)
yield level
tractor.log._default_loglevel = orig
@pytest.fixture(scope='session')
def test_config():
dirname = os.path.dirname

View File

@ -1,65 +0,0 @@
'''
Data feed layer APIs, performance, msg throttling.
'''
from pprint import pprint
import pytest
import trio
from piker import (
open_piker_runtime,
open_feed,
)
from piker.data import ShmArray
@pytest.mark.parametrize(
'fqsns',
[
['btcusdt.binance']
],
ids=lambda param: f'fqsns={param}',
)
def test_basic_rt_feed(
fqsns: list[str],
):
'''
Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down.
'''
async def main():
async with (
open_piker_runtime('test_basic_rt_feed'),
open_feed(
fqsns,
loglevel='info',
# TODO: ensure throttle rate is applied
# limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
# tick_throttle=_quote_throttle_rate,
) as feed
):
for fqin in fqsns:
assert feed.symbols[fqin]
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
count: int = 0
async for quotes in feed.stream:
# print quote msg, rt and history
# buffer values on console.
pprint(quotes)
pprint(ohlcv.array[-1])
pprint(hist_ohlcv.array[-1])
if count >= 100:
break
count += 1
trio.run(main)