Compare commits
10 Commits
3e45a61287
...
9b6f4d24be
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 9b6f4d24be | |
Tyler Goodlet | 4eea8042ff | |
Tyler Goodlet | 9fc45c2bff | |
Tyler Goodlet | e7751cb5dd | |
Tyler Goodlet | 79b27899bf | |
Tyler Goodlet | 312c1552cd | |
Tyler Goodlet | f1b5c6e62c | |
Tyler Goodlet | 16699bdc88 | |
Tyler Goodlet | 5151971131 | |
Tyler Goodlet | 81f0fc77e3 |
|
@ -18,3 +18,10 @@
|
|||
piker: trading gear for hackers.
|
||||
|
||||
"""
|
||||
from ._daemon import open_piker_runtime
|
||||
from .data.feed import open_feed
|
||||
|
||||
__all__ = [
|
||||
'open_piker_runtime',
|
||||
'open_feed',
|
||||
]
|
||||
|
|
|
@ -199,7 +199,7 @@ async def open_piker_runtime(
|
|||
# for data daemons when running in production.
|
||||
debug_mode: bool = False,
|
||||
|
||||
) -> Optional[tractor._portal.Portal]:
|
||||
) -> tractor.Actor:
|
||||
'''
|
||||
Start a piker actor who's runtime will automatically sync with
|
||||
existing piker actors on the local link based on configuration.
|
||||
|
@ -254,6 +254,7 @@ async def maybe_open_runtime(
|
|||
@acm
|
||||
async def maybe_open_pikerd(
|
||||
loglevel: Optional[str] = None,
|
||||
registry_addr: None | tuple = None,
|
||||
**kwargs,
|
||||
|
||||
) -> Union[tractor._portal.Portal, Services]:
|
||||
|
@ -266,13 +267,21 @@ async def maybe_open_pikerd(
|
|||
get_console_log(loglevel)
|
||||
|
||||
# subtle, we must have the runtime up here or portal lookup will fail
|
||||
async with maybe_open_runtime(loglevel, **kwargs):
|
||||
|
||||
async with tractor.find_actor(_root_dname) as portal:
|
||||
# assert portal is not None
|
||||
if portal is not None:
|
||||
yield portal
|
||||
return
|
||||
async with (
|
||||
maybe_open_runtime(loglevel, **kwargs),
|
||||
tractor.find_actor(_root_dname) as portal
|
||||
):
|
||||
# connect to any existing daemon presuming
|
||||
# its registry socket was selected.
|
||||
if (
|
||||
portal is not None
|
||||
and (
|
||||
registry_addr is None
|
||||
or portal.channel.raddr == registry_addr
|
||||
)
|
||||
):
|
||||
yield portal
|
||||
return
|
||||
|
||||
# presume pikerd role since no daemon could be found at
|
||||
# configured address
|
||||
|
@ -280,6 +289,7 @@ async def maybe_open_pikerd(
|
|||
|
||||
loglevel=loglevel,
|
||||
debug_mode=kwargs.get('debug_mode', False),
|
||||
registry_addr=registry_addr,
|
||||
|
||||
) as _:
|
||||
# in the case where we're starting up the
|
||||
|
|
|
@ -1038,7 +1038,13 @@ async def open_symbol_search(
|
|||
stock_results = []
|
||||
|
||||
async def stash_results(target: Awaitable[list]):
|
||||
stock_results.extend(await target)
|
||||
try:
|
||||
results = await target
|
||||
except tractor.trionics.Lagged:
|
||||
print("IB SYM-SEARCH OVERRUN?!?")
|
||||
return
|
||||
|
||||
stock_results.extend(results)
|
||||
|
||||
for i in range(10):
|
||||
with trio.move_on_after(3) as cs:
|
||||
|
|
|
@ -1239,8 +1239,7 @@ async def process_client_order_cmds(
|
|||
pred = mk_check(trigger_price, last, action)
|
||||
|
||||
spread_slap: float = 5
|
||||
sym = fqsn.replace(f'.{brokers[0]}', '')
|
||||
min_tick = feed.symbols[sym].tick_size
|
||||
min_tick = feed.symbols[fqsn].tick_size
|
||||
|
||||
if action == 'buy':
|
||||
tickfilter = ('ask', 'last', 'trade')
|
||||
|
|
|
@ -125,8 +125,19 @@ def pikerd(
|
|||
@click.option('--loglevel', '-l', default='warning', help='Logging level')
|
||||
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
||||
@click.option('--configdir', '-c', help='Configuration directory')
|
||||
@click.option('--host', '-h', default=None, help='Host addr to bind')
|
||||
@click.option('--port', '-p', default=None, help='Port number to bind')
|
||||
@click.pass_context
|
||||
def cli(ctx, brokers, loglevel, tl, configdir):
|
||||
def cli(
|
||||
ctx: click.Context,
|
||||
brokers: list[str],
|
||||
loglevel: str,
|
||||
tl: bool,
|
||||
configdir: str,
|
||||
host: str,
|
||||
port: int,
|
||||
|
||||
) -> None:
|
||||
if configdir is not None:
|
||||
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
|
||||
config._override_config_dir(configdir)
|
||||
|
@ -138,6 +149,13 @@ def cli(ctx, brokers, loglevel, tl, configdir):
|
|||
else:
|
||||
brokermods = [get_brokermod(broker) for broker in brokers]
|
||||
|
||||
reg_addr: None | tuple[str, int] = None
|
||||
if host or port:
|
||||
reg_addr = (
|
||||
host or _registry_host,
|
||||
int(port) or _registry_port,
|
||||
)
|
||||
|
||||
ctx.obj.update({
|
||||
'brokers': brokers,
|
||||
'brokermods': brokermods,
|
||||
|
@ -146,6 +164,7 @@ def cli(ctx, brokers, loglevel, tl, configdir):
|
|||
'log': get_console_log(loglevel),
|
||||
'confdir': config._config_dir,
|
||||
'wl_path': config._watchlists_data_path,
|
||||
'registry_addr': reg_addr,
|
||||
})
|
||||
|
||||
# allow enabling same loglevel in ``tractor`` machinery
|
||||
|
|
|
@ -1410,7 +1410,7 @@ async def open_feed(
|
|||
|
||||
# symbol.broker_info[brokername] = si
|
||||
feed.symbols[fqsn] = symbol
|
||||
feed.symbols[sym] = symbol
|
||||
feed.symbols[f'{sym}.{brokername}'] = symbol
|
||||
|
||||
# cast shm dtype to list... can't member why we need this
|
||||
for shm_key, shm in [
|
||||
|
|
|
@ -66,7 +66,7 @@ async def _async_main(
|
|||
# implicit required argument provided by ``qtractor_run()``
|
||||
main_widget: GodWidget,
|
||||
|
||||
sym: str,
|
||||
syms: list[str],
|
||||
brokernames: str,
|
||||
loglevel: str,
|
||||
|
||||
|
@ -113,12 +113,16 @@ async def _async_main(
|
|||
# godwidget.hbox.addWidget(search)
|
||||
godwidget.search = search
|
||||
|
||||
symbol, _, provider = sym.rpartition('.')
|
||||
symbols: list[str] = []
|
||||
|
||||
for sym in syms:
|
||||
symbol, _, provider = sym.rpartition('.')
|
||||
symbols.append(symbol)
|
||||
|
||||
# this internally starts a ``display_symbol_data()`` task above
|
||||
order_mode_ready = await godwidget.load_symbol(
|
||||
order_mode_ready = await godwidget.load_symbols(
|
||||
provider,
|
||||
symbol,
|
||||
symbols,
|
||||
loglevel
|
||||
)
|
||||
|
||||
|
@ -166,7 +170,7 @@ async def _async_main(
|
|||
|
||||
|
||||
def _main(
|
||||
sym: str,
|
||||
syms: list[str],
|
||||
brokernames: [str],
|
||||
piker_loglevel: str,
|
||||
tractor_kwargs,
|
||||
|
@ -178,7 +182,7 @@ def _main(
|
|||
'''
|
||||
run_qtractor(
|
||||
func=_async_main,
|
||||
args=(sym, brokernames, piker_loglevel),
|
||||
args=(syms, brokernames, piker_loglevel),
|
||||
main_widget_type=GodWidget,
|
||||
tractor_kwargs=tractor_kwargs,
|
||||
)
|
||||
|
|
|
@ -186,10 +186,10 @@ class GodWidget(QWidget):
|
|||
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
|
||||
return self._chart_cache.get(symbol_key)
|
||||
|
||||
async def load_symbol(
|
||||
async def load_symbols(
|
||||
self,
|
||||
providername: str,
|
||||
symbol_key: str,
|
||||
symbol_keys: list[str],
|
||||
loglevel: str,
|
||||
reset: bool = False,
|
||||
|
||||
|
@ -200,12 +200,20 @@ class GodWidget(QWidget):
|
|||
Expects a ``numpy`` structured array containing all the ohlcv fields.
|
||||
|
||||
'''
|
||||
# our symbol key style is always lower case
|
||||
symbol_key = symbol_key.lower()
|
||||
fqsns: list[str] = []
|
||||
|
||||
# fully qualified symbol name (SNS i guess is what we're making?)
|
||||
fqsn = '.'.join([symbol_key, providername])
|
||||
all_linked = self.get_chart_symbol(fqsn)
|
||||
# our symbol key style is always lower case
|
||||
for key in list(map(str.lower, symbol_keys)):
|
||||
|
||||
# fully qualified symbol name (SNS i guess is what we're making?)
|
||||
fqsn = '.'.join([key, providername])
|
||||
fqsns.append(fqsn)
|
||||
|
||||
# NOTE: for now we use the first symbol in the set as the "key"
|
||||
# for the overlay of feeds on the chart.
|
||||
group_key = fqsns[0]
|
||||
|
||||
all_linked = self.get_chart_symbol(group_key)
|
||||
order_mode_started = trio.Event()
|
||||
|
||||
if not self.vbox.isEmpty():
|
||||
|
@ -238,7 +246,7 @@ class GodWidget(QWidget):
|
|||
display_symbol_data,
|
||||
self,
|
||||
providername,
|
||||
symbol_key,
|
||||
fqsns,
|
||||
loglevel,
|
||||
order_mode_started,
|
||||
)
|
||||
|
|
|
@ -947,7 +947,7 @@ async def link_views_with_region(
|
|||
async def display_symbol_data(
|
||||
godwidget: GodWidget,
|
||||
provider: str,
|
||||
sym: str,
|
||||
fqsns: list[str],
|
||||
loglevel: str,
|
||||
order_mode_started: trio.Event,
|
||||
|
||||
|
@ -961,11 +961,6 @@ async def display_symbol_data(
|
|||
|
||||
'''
|
||||
sbar = godwidget.window.status_bar
|
||||
loading_sym_key = sbar.open_status(
|
||||
f'loading {sym}.{provider} ->',
|
||||
group_key=True
|
||||
)
|
||||
|
||||
# historical data fetch
|
||||
# brokermod = brokers.get_brokermod(provider)
|
||||
|
||||
|
@ -974,10 +969,18 @@ async def display_symbol_data(
|
|||
# clear_on_next=True,
|
||||
# group_key=loading_sym_key,
|
||||
# )
|
||||
fqsn = '.'.join((sym, provider))
|
||||
|
||||
for fqsn in fqsns:
|
||||
|
||||
loading_sym_key = sbar.open_status(
|
||||
f'loading {fqsn} ->',
|
||||
group_key=True
|
||||
)
|
||||
|
||||
first_fqsn = fqsns[0]
|
||||
|
||||
async with open_feed(
|
||||
[fqsn],
|
||||
fqsns,
|
||||
loglevel=loglevel,
|
||||
|
||||
# limit to at least display's FPS
|
||||
|
@ -988,7 +991,7 @@ async def display_symbol_data(
|
|||
ohlcv: ShmArray = feed.rt_shm
|
||||
hist_ohlcv: ShmArray = feed.hist_shm
|
||||
|
||||
symbol = feed.symbols[sym]
|
||||
symbol = feed.symbols[first_fqsn]
|
||||
fqsn = symbol.front_fqsn()
|
||||
|
||||
step_size_s = 1
|
||||
|
@ -1025,7 +1028,7 @@ async def display_symbol_data(
|
|||
godwidget.pp_pane = pp_pane
|
||||
|
||||
# create main OHLC chart
|
||||
chart = rt_linked.plot_ohlc_main(
|
||||
ohlc_chart = rt_linked.plot_ohlc_main(
|
||||
symbol,
|
||||
ohlcv,
|
||||
# in the case of history chart we explicitly set `False`
|
||||
|
@ -1033,8 +1036,8 @@ async def display_symbol_data(
|
|||
sidepane=pp_pane,
|
||||
)
|
||||
|
||||
chart._feeds[symbol.key] = feed
|
||||
chart.setFocus()
|
||||
ohlc_chart._feeds[symbol.key] = feed
|
||||
ohlc_chart.setFocus()
|
||||
|
||||
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
|
||||
# plot historical vwap if available
|
||||
|
@ -1044,7 +1047,7 @@ async def display_symbol_data(
|
|||
# and 'bar_wap' in bars.dtype.fields
|
||||
# ):
|
||||
# wap_in_history = True
|
||||
# chart.draw_curve(
|
||||
# ohlc_chart.draw_curve(
|
||||
# name='bar_wap',
|
||||
# shm=ohlcv,
|
||||
# color='default_light',
|
||||
|
@ -1105,7 +1108,7 @@ async def display_symbol_data(
|
|||
await trio.sleep(0)
|
||||
|
||||
# size view to data prior to order mode init
|
||||
chart.default_view()
|
||||
ohlc_chart.default_view()
|
||||
rt_linked.graphics_cycle()
|
||||
await trio.sleep(0)
|
||||
|
||||
|
@ -1119,7 +1122,7 @@ async def display_symbol_data(
|
|||
godwidget.resize_all()
|
||||
|
||||
await link_views_with_region(
|
||||
chart,
|
||||
ohlc_chart,
|
||||
hist_chart,
|
||||
feed,
|
||||
)
|
||||
|
@ -1135,7 +1138,7 @@ async def display_symbol_data(
|
|||
):
|
||||
if not vlm_chart:
|
||||
# trigger another view reset if no sub-chart
|
||||
chart.default_view()
|
||||
ohlc_chart.default_view()
|
||||
|
||||
rt_linked.mode = mode
|
||||
|
||||
|
|
|
@ -665,9 +665,9 @@ class SearchWidget(QtWidgets.QWidget):
|
|||
|
||||
log.info(f'Requesting symbol: {symbol}.{provider}')
|
||||
|
||||
await godw.load_symbol(
|
||||
await godw.load_symbols(
|
||||
provider,
|
||||
symbol,
|
||||
[symbol],
|
||||
'info',
|
||||
)
|
||||
|
||||
|
|
|
@ -46,8 +46,10 @@ def _kivy_import_hack():
|
|||
@click.argument('name', nargs=1, required=True)
|
||||
@click.pass_obj
|
||||
def monitor(config, rate, name, dhost, test, tl):
|
||||
"""Start a real-time watchlist UI
|
||||
"""
|
||||
'''
|
||||
Start a real-time watchlist UI
|
||||
|
||||
'''
|
||||
# global opts
|
||||
brokermod = config['brokermods'][0]
|
||||
loglevel = config['loglevel']
|
||||
|
@ -70,8 +72,12 @@ def monitor(config, rate, name, dhost, test, tl):
|
|||
) as portal:
|
||||
# run app "main"
|
||||
await _async_main(
|
||||
name, portal, tickers,
|
||||
brokermod, rate, test=test,
|
||||
name,
|
||||
portal,
|
||||
tickers,
|
||||
brokermod,
|
||||
rate,
|
||||
test=test,
|
||||
)
|
||||
|
||||
tractor.run(
|
||||
|
@ -122,7 +128,7 @@ def optschain(config, symbol, date, rate, test):
|
|||
@cli.command()
|
||||
@click.option(
|
||||
'--profile',
|
||||
'-p',
|
||||
# '-p',
|
||||
default=None,
|
||||
help='Enable pyqtgraph profiling'
|
||||
)
|
||||
|
@ -131,9 +137,14 @@ def optschain(config, symbol, date, rate, test):
|
|||
is_flag=True,
|
||||
help='Enable tractor debug mode'
|
||||
)
|
||||
@click.argument('symbol', required=True)
|
||||
@click.argument('symbols', nargs=-1, required=True)
|
||||
@click.pass_obj
|
||||
def chart(config, symbol, profile, pdb):
|
||||
def chart(
|
||||
config,
|
||||
symbols: list[str],
|
||||
profile,
|
||||
pdb: bool,
|
||||
):
|
||||
'''
|
||||
Start a real-time chartng UI
|
||||
|
||||
|
@ -144,14 +155,16 @@ def chart(config, symbol, profile, pdb):
|
|||
_profile._pg_profile = True
|
||||
_profile.ms_slower_then = float(profile)
|
||||
|
||||
# Qt UI entrypoint
|
||||
from ._app import _main
|
||||
|
||||
if '.' not in symbol:
|
||||
click.echo(click.style(
|
||||
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
|
||||
fg='red',
|
||||
))
|
||||
return
|
||||
for symbol in symbols:
|
||||
if '.' not in symbol:
|
||||
click.echo(click.style(
|
||||
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
|
||||
fg='red',
|
||||
))
|
||||
return
|
||||
|
||||
|
||||
# global opts
|
||||
|
@ -159,8 +172,9 @@ def chart(config, symbol, profile, pdb):
|
|||
tractorloglevel = config['tractorloglevel']
|
||||
pikerloglevel = config['loglevel']
|
||||
|
||||
|
||||
_main(
|
||||
sym=symbol,
|
||||
syms=symbols,
|
||||
brokernames=brokernames,
|
||||
piker_loglevel=pikerloglevel,
|
||||
tractor_kwargs={
|
||||
|
@ -170,5 +184,6 @@ def chart(config, symbol, profile, pdb):
|
|||
'enable_modules': [
|
||||
'piker.clearing._client'
|
||||
],
|
||||
'registry_addr': config.get('registry_addr'),
|
||||
},
|
||||
)
|
||||
|
|
|
@ -14,15 +14,6 @@ def pytest_addoption(parser):
|
|||
help="Use a practice API account")
|
||||
|
||||
|
||||
@pytest.fixture(scope='session', autouse=True)
|
||||
def loglevel(request):
|
||||
orig = tractor.log._default_loglevel
|
||||
level = tractor.log._default_loglevel = request.config.option.loglevel
|
||||
log.get_console_log(level)
|
||||
yield level
|
||||
tractor.log._default_loglevel = orig
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def test_config():
|
||||
dirname = os.path.dirname
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
'''
|
||||
Data feed layer APIs, performance, msg throttling.
|
||||
|
||||
'''
|
||||
from pprint import pprint
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
from piker import (
|
||||
open_piker_runtime,
|
||||
open_feed,
|
||||
)
|
||||
from piker.data import ShmArray
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'fqsns',
|
||||
[
|
||||
['btcusdt.binance']
|
||||
],
|
||||
ids=lambda param: f'fqsns={param}',
|
||||
)
|
||||
def test_basic_rt_feed(
|
||||
fqsns: list[str],
|
||||
):
|
||||
'''
|
||||
Start a real-time data feed for provided fqsn and pull
|
||||
a few quotes then simply shut down.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with (
|
||||
open_piker_runtime('test_basic_rt_feed'),
|
||||
open_feed(
|
||||
fqsns,
|
||||
loglevel='info',
|
||||
|
||||
# TODO: ensure throttle rate is applied
|
||||
# limit to at least display's FPS
|
||||
# avoiding needless Qt-in-guest-mode context switches
|
||||
# tick_throttle=_quote_throttle_rate,
|
||||
|
||||
) as feed
|
||||
):
|
||||
for fqin in fqsns:
|
||||
assert feed.symbols[fqin]
|
||||
|
||||
ohlcv: ShmArray = feed.rt_shm
|
||||
hist_ohlcv: ShmArray = feed.hist_shm
|
||||
|
||||
count: int = 0
|
||||
async for quotes in feed.stream:
|
||||
|
||||
# print quote msg, rt and history
|
||||
# buffer values on console.
|
||||
pprint(quotes)
|
||||
pprint(ohlcv.array[-1])
|
||||
pprint(hist_ohlcv.array[-1])
|
||||
|
||||
if count >= 100:
|
||||
break
|
||||
|
||||
count += 1
|
||||
|
||||
trio.run(main)
|
Loading…
Reference in New Issue