Merge pull request #417 from pikers/daemon_sockaddr_config

Daemon sockaddr config
pre_multifeed_hotfix
goodboy 2022-11-10 17:31:24 -05:00 committed by GitHub
commit a44b8e3e22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 31 deletions

View File

@ -18,3 +18,10 @@
piker: trading gear for hackers.
"""
from ._daemon import open_piker_runtime
from .data.feed import open_feed
__all__ = [
'open_piker_runtime',
'open_feed',
]

View File

@ -35,7 +35,12 @@ log = get_logger(__name__)
_root_dname = 'pikerd'
_registry_addr = ('127.0.0.1', 6116)
_registry_host: str = '127.0.0.1'
_registry_port: int = 6116
_registry_addr = (
_registry_host,
_registry_port,
)
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
@ -135,6 +140,7 @@ async def open_pikerd(
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
) -> Optional[tractor._portal.Portal]:
'''
@ -146,14 +152,13 @@ async def open_pikerd(
'''
global _services
assert _services is None
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
arbiter_addr=registry_addr or _registry_addr,
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
@ -192,22 +197,22 @@ async def open_piker_runtime(
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = _registry_addr,
) -> Optional[tractor._portal.Portal]:
) -> tractor.Actor:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
'''
global _services
assert _services is None
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
arbiter_addr=registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
@ -248,6 +253,7 @@ async def maybe_open_runtime(
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
@ -260,13 +266,21 @@ async def maybe_open_pikerd(
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
async with (
maybe_open_runtime(loglevel, **kwargs),
tractor.find_actor(_root_dname) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
@ -274,6 +288,7 @@ async def maybe_open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
) as _:
# in the case where we're starting up the

View File

@ -27,7 +27,11 @@ import tractor
from ..log import get_console_log, get_logger, colorize_json
from ..brokers import get_brokermod
from .._daemon import _tractor_kwargs
from .._daemon import (
_tractor_kwargs,
_registry_host,
_registry_port,
)
from .. import config
@ -39,13 +43,21 @@ DEFAULT_BROKER = 'questrade'
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--pdb', is_flag=True, help='Enable tractor debug mode')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
@click.option('--host', '-h', default=None, help='Host addr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind')
@click.option(
'--tsdb',
is_flag=True,
help='Enable local ``marketstore`` instance'
)
def pikerd(loglevel, host, tl, pdb, tsdb):
def pikerd(
loglevel: str,
host: str,
port: int,
tl: bool,
pdb: bool,
tsdb: bool,
):
'''
Spawn the piker broker-daemon.
@ -62,12 +74,21 @@ def pikerd(loglevel, host, tl, pdb, tsdb):
"\n"
))
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
)
async def main():
async with (
open_pikerd(
loglevel=loglevel,
debug_mode=pdb,
registry_addr=reg_addr,
), # normally delivers a ``Services`` handle
trio.open_nursery() as n,
):
@ -104,8 +125,19 @@ def pikerd(loglevel, host, tl, pdb, tsdb):
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory')
@click.option('--host', '-h', default=None, help='Host addr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind')
@click.pass_context
def cli(ctx, brokers, loglevel, tl, configdir):
def cli(
ctx: click.Context,
brokers: list[str],
loglevel: str,
tl: bool,
configdir: str,
host: str,
port: int,
) -> None:
if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir)
@ -117,6 +149,13 @@ def cli(ctx, brokers, loglevel, tl, configdir):
else:
brokermods = [get_brokermod(broker) for broker in brokers]
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
)
ctx.obj.update({
'brokers': brokers,
'brokermods': brokermods,
@ -125,6 +164,7 @@ def cli(ctx, brokers, loglevel, tl, configdir):
'log': get_console_log(loglevel),
'confdir': config._config_dir,
'wl_path': config._watchlists_data_path,
'registry_addr': reg_addr,
})
# allow enabling same loglevel in ``tractor`` machinery

View File

@ -46,8 +46,10 @@ def _kivy_import_hack():
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def monitor(config, rate, name, dhost, test, tl):
"""Start a real-time watchlist UI
"""
'''
Start a real-time watchlist UI
'''
# global opts
brokermod = config['brokermods'][0]
loglevel = config['loglevel']
@ -70,8 +72,12 @@ def monitor(config, rate, name, dhost, test, tl):
) as portal:
# run app "main"
await _async_main(
name, portal, tickers,
brokermod, rate, test=test,
name,
portal,
tickers,
brokermod,
rate,
test=test,
)
tractor.run(
@ -122,7 +128,7 @@ def optschain(config, symbol, date, rate, test):
@cli.command()
@click.option(
'--profile',
'-p',
# '-p',
default=None,
help='Enable pyqtgraph profiling'
)
@ -131,9 +137,14 @@ def optschain(config, symbol, date, rate, test):
is_flag=True,
help='Enable tractor debug mode'
)
@click.argument('symbol', required=True)
@click.argument('symbols', nargs=-1, required=True)
@click.pass_obj
def chart(config, symbol, profile, pdb):
def chart(
config,
symbols: list[str],
profile,
pdb: bool,
):
'''
Start a real-time chartng UI
@ -144,14 +155,16 @@ def chart(config, symbol, profile, pdb):
_profile._pg_profile = True
_profile.ms_slower_then = float(profile)
# Qt UI entrypoint
from ._app import _main
if '.' not in symbol:
click.echo(click.style(
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
fg='red',
))
return
for symbol in symbols:
if '.' not in symbol:
click.echo(click.style(
f'symbol: {symbol} must have a {symbol}.<provider> suffix',
fg='red',
))
return
# global opts
@ -159,8 +172,9 @@ def chart(config, symbol, profile, pdb):
tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel']
_main(
sym=symbol,
syms=symbols,
brokernames=brokernames,
piker_loglevel=pikerloglevel,
tractor_kwargs={
@ -170,5 +184,6 @@ def chart(config, symbol, profile, pdb):
'enable_modules': [
'piker.clearing._client'
],
'registry_addr': config.get('registry_addr'),
},
)