# piker: trading gear for hackers # Copyright (C) 2018-present Tyler Goodlet # (in stewardship for pikers, everywhere.) # This program is free software: you can redistribute it and/or # modify it under the terms of the GNU Affero General Public # License as published by the Free Software Foundation, either # version 3 of the License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public # License along with this program. If not, see # . ''' CLI commons. ''' import os # from contextlib import AsyncExitStack from types import ModuleType import click import trio import tractor from tractor._multiaddr import parse_maddr from ..log import ( get_console_log, get_logger, colorize_json, ) from ..brokers import get_brokermod from ..service import ( _default_registry_host, _default_registry_port, ) from .. import config log = get_logger('piker.cli') def load_trans_eps( network: dict | None = None, maddrs: list[tuple] | None = None, ) -> dict[str, dict[str, dict]]: # transport-oriented endpoint multi-addresses eps: dict[ str, # service name, eg. `pikerd`, `emsd`.. # libp2p style multi-addresses parsed into prot layers list[dict[str, str | int]] ] = {} if ( network and not maddrs ): # load network section and (attempt to) connect all endpoints # which are reachable B) for key, maddrs in network.items(): match key: # TODO: resolve table across multiple discov # prots Bo case 'resolv': pass case 'pikerd': dname: str = key for maddr in maddrs: layers: dict = parse_maddr(maddr) eps.setdefault( dname, [], ).append(layers) elif maddrs: # presume user is manually specifying the root actor ep. eps['pikerd'] = [parse_maddr(maddr)] return eps @click.command() @click.option( '--loglevel', '-l', default='warning', help='Logging level', ) @click.option( '--tl', is_flag=True, help='Enable tractor-runtime logs', ) @click.option( '--pdb', is_flag=True, help='Enable tractor debug mode', ) @click.option( '--maddr', '-m', default=None, help='Multiaddrs to bind or contact', ) def pikerd( maddr: list[str] | None, loglevel: str, tl: bool, pdb: bool, ): ''' Start the "root service actor", `pikerd`, run it until cancellation. This "root daemon" operates as the top most service-mngr and subsys-as-subactor supervisor, think of it as the "init proc" of any of any `piker` application or daemon-process tree. ''' # from tractor.devx import maybe_open_crash_handler # with maybe_open_crash_handler(pdb=False): log = get_console_log( level=loglevel, with_tractor_log=tl, ) if pdb: log.warning(( "\n" "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" "When a `piker` daemon crashes it will block the " "task-thread until resumed from console!\n" "\n" )) # service-actor registry endpoint socket-address set regaddrs: list[tuple[str, int]] = [] conf, _ = config.load( conf_name='conf', ) network: dict = conf.get('network') if ( network is None and not maddr ): regaddrs = [( _default_registry_host, _default_registry_port, )] else: eps: dict = load_trans_eps( network, maddr, ) for layers in eps['pikerd']: regaddrs.append(( layers['ipv4']['addr'], layers['tcp']['port'], )) from .. import service async def main(): service_mngr: service.Services async with ( service.open_pikerd( registry_addrs=regaddrs, loglevel=loglevel, debug_mode=pdb, # enable_transports=['uds'], enable_transports=['tcp'], ) as service_mngr, ): assert service_mngr # ?TODO? spawn all other sub-actor daemons according to # multiaddress endpoint spec defined by user config await trio.sleep_forever() trio.run(main) @click.group(context_settings=config._context_defaults) @click.option( '--brokers', '-b', default=None, multiple=True, help='Broker backend to use' ) @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--configdir', '-c', help='Configuration directory') @click.option( '--pdb', is_flag=True, help='Enable runtime debug mode ', ) @click.option( '--maddr', '-m', default=None, multiple=True, help='Multiaddr to bind', ) @click.option( '--regaddr', '-r', default=None, help='Registrar addr to contact', ) @click.pass_context def cli( ctx: click.Context, brokers: list[str], loglevel: str, tl: bool, configdir: str, pdb: bool, # TODO: make these list[str] with multiple -m maddr0 -m maddr1 maddr: list[str], regaddr: str, ) -> None: ''' The "root" `piker`-cmd CLI endpoint. NOTE, this def generally relies on and requires a sub-cmd to be provided by the user, OW only a `--help` msg (listing said subcmds) will be dumped to console. ''' if configdir is not None: assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" config._override_config_dir(configdir) # TODO: for typer see # https://typer.tiangolo.com/tutorial/commands/context/ ctx.ensure_object(dict) if not brokers: # (try to) load all (supposedly) supported data/broker backends from piker.brokers import __brokers__ brokers = __brokers__ brokermods: dict[str, ModuleType] = { broker: get_brokermod(broker) for broker in brokers } assert brokermods # TODO: load endpoints from `conf::[network].pikerd` # - pikerd vs. regd, separate registry daemon? # - expose datad vs. brokerd? # - bind emsd with certain perms on public iface? regaddrs: list[tuple[str, int]] = regaddr or [( _default_registry_host, _default_registry_port, )] # TODO: factor [network] section parsing out from pikerd # above and call it here as well. # if maddr: # for addr in maddr: # layers: dict = parse_maddr(addr) ctx.obj.update({ 'brokers': brokers, 'brokermods': brokermods, 'loglevel': loglevel, 'tractorloglevel': None, 'log': get_console_log(loglevel), 'confdir': config._config_dir, 'wl_path': config._watchlists_data_path, 'registry_addrs': regaddrs, 'pdb': pdb, # debug mode flag # TODO: endpoint parsing, pinging and binding # on no existing server. # 'maddrs': maddr, }) # allow enabling same loglevel in ``tractor`` machinery if tl: ctx.obj.update({'tractorloglevel': loglevel}) @cli.command() @click.option('--tl', is_flag=True, help='Enable tractor logging') @click.argument('ports', nargs=-1, required=False) @click.pass_obj def services( config, tl: bool, ports: list[int], ): ''' List all `piker` "service deamons" to the console in a `json`-table which maps each actor's UID in the form, `{service_name}.{subservice_name}.{UUID}` to its (primary) IPC server address. (^TODO, should be its multiaddr form once we support it) Note that by convention actors which operate as "headless" processes (those without GUIs/graphics, and which generally parent some noteworthy subsystem) are normally suffixed by a "d" such as, - pikerd: the root runtime supervisor - brokerd: a broker-backend order ctl daemon - emsd: the internal dark-clearing and order routing daemon - datad: a data-provider-backend data feed daemon - samplerd: the real-time data sampling and clock-syncing daemon "Headed units" are normally just given an obvious app-like name with subactors indexed by `.` such as, - chart: the primary modal charting iface, a Qt app - chart.fsp_0: a financial-sig-proc cascade instance which delivers graphics to a parent `chart` app. - polars_boi: some (presumably) `polars` using console app. ''' from piker.service import ( open_piker_runtime, _default_registry_port, _default_registry_host, ) # !TODO, mk this to work with UDS! host: str = _default_registry_host if not ports: ports: list[int] = [_default_registry_port] addr = tractor._addr.wrap_address( addr=(host, ports[0]) ) async def list_services(): nonlocal host async with ( open_piker_runtime( name='service_query', loglevel=( config['loglevel'] if tl else None ), ), tractor.get_registry( addr=addr, ) as portal ): registry = await portal.run_from_ns( 'self', 'get_registry', ) json_d = {} for key, socket in registry.items(): json_d[key] = f'{socket}' click.echo(f"{colorize_json(json_d)}") trio.run(list_services) def _load_clis() -> None: ''' Dynamically load and register all subsys CLI endpoints (at call time). NOTE, obviously this is normally expected to be called at `import` time and implicitly relies on our use of various `click`/`typer` decorator APIs. ''' from ..brokers import cli # noqa from ..ui import cli # noqa from ..watchlists import cli # noqa # typer implemented from ..storage import cli # noqa from ..accounting import cli # noqa # load all subsytem cli eps _load_clis()