Port `.cli` & `.service` to latest `tractor` registry APIs

Namely changes for the `registry_addrs: list`, enable_transports: list`
and related `tractor._addr` primitive requirements.

Other updates include,
- passing `maybe_enable_greenback=True`,
- additional exc logging around `pikerd` syncing/booting,
- changing to newer `Context.wait_for_result()`,
- dropping (unnecessary?) `maybe_open_crash_handler()` around `pikerd` ep.
testing_utils
Tyler Goodlet 2025-09-20 22:38:47 -04:00
parent f9610c9e26
commit 46285a601e
5 changed files with 176 additions and 126 deletions

View File

@ -134,8 +134,8 @@ def pikerd(
Spawn the piker broker-daemon.
'''
from tractor.devx import maybe_open_crash_handler
with maybe_open_crash_handler(pdb=pdb):
# from tractor.devx import maybe_open_crash_handler
# with maybe_open_crash_handler(pdb=False):
log = get_console_log(loglevel, name='cli')
if pdb:
@ -178,13 +178,13 @@ def pikerd(
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
enable_transports=['uds'],
# enable_transports=['tcp'],
) as service_mngr,
):
assert service_mngr
@ -307,6 +307,10 @@ def services(config, tl, ports):
if not ports:
ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services():
nonlocal host
async with (
@ -315,15 +319,17 @@ def services(config, tl, ports):
loglevel=config['loglevel'] if tl else None,
),
tractor.get_registry(
host=host,
port=ports[0]
addr=addr,
) as portal
):
registry = await portal.run_from_ns('self', 'get_registry')
registry = await portal.run_from_ns(
'self',
'get_registry',
)
json_d = {}
for key, socket in registry.items():
host, port = socket
json_d[key] = f'{host}:{port}'
json_d[key] = f'{socket}'
click.echo(f"{colorize_json(json_d)}")
trio.run(list_services)

View File

@ -107,17 +107,22 @@ async def open_piker_runtime(
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
# passed through to `open_root_actor`
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs,
) as actor,
@ -257,7 +262,10 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
) -> (
tractor._portal.Portal
|ClassVar[Services]
):
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
@ -282,10 +290,11 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or [_default_reg_addr]
or
[_default_reg_addr]
)
pikerd_portal: tractor.Portal | None
pikerd_portal: tractor.Portal|None
async with (
open_piker_runtime(
name=query_name,

View File

@ -28,6 +28,7 @@ from contextlib import (
)
import tractor
from trio.lowlevel import current_task
from ._util import (
log, # sub-sys logger
@ -70,6 +71,7 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name]
await lock.acquire()
try:
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
@ -134,6 +136,20 @@ async def maybe_spawn_daemon(
yield portal
await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
)
lock.release()
raise err
async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context

View File

@ -101,13 +101,15 @@ async def open_registry(
if (
not tractor.is_root_process()
and not Registry.addrs
and
not Registry.addrs
):
Registry.addrs.extend(actor.reg_addrs)
if (
ensure_exists
and not Registry.addrs
and
not Registry.addrs
):
raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?"
@ -146,7 +148,7 @@ async def find_service(
| list[Portal]
| None
):
# try:
reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=(
@ -157,22 +159,39 @@ async def find_service(
or Registry.addrs
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None
log.info(
f'Scanning for service {service_name!r}'
)
# attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor(
service_name,
registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref
) as maybe_portals:
if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None
return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service(
service_name: str,