Add `open_piker_runtime()` to setup actor runtime correctly from non-daemons

async_hist_loading
Tyler Goodlet 2022-02-18 12:13:16 -05:00
parent b1dd24d1f7
commit 7252094f90
1 changed files with 53 additions and 3 deletions

View File

@ -34,9 +34,11 @@ from .brokers import get_brokermod
log = get_logger(__name__) log = get_logger(__name__)
_root_dname = 'pikerd' _root_dname = 'pikerd'
_registry_addr = ('127.0.0.1', 6116)
_tractor_kwargs: dict[str, Any] = { _tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default # use a different registry addr then tractor's default
'arbiter_addr': ('127.0.0.1', 6116), 'arbiter_addr': _registry_addr
} }
_root_modules = [ _root_modules = [
__name__, __name__,
@ -150,7 +152,7 @@ async def open_pikerd(
tractor.open_root_actor( tractor.open_root_actor(
# passed through to ``open_root_actor`` # passed through to ``open_root_actor``
arbiter_addr=_tractor_kwargs['arbiter_addr'], arbiter_addr=_registry_addr,
name=_root_dname, name=_root_dname,
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
@ -179,6 +181,47 @@ async def open_pikerd(
yield _services yield _services
@asynccontextmanager
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
start_method: str = 'trio',
loglevel: Optional[str] = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
) -> Optional[tractor._portal.Portal]:
'''
Start a piker actor who's runtime will automatically
sync with existing piker actors in local network
based on configuration.
'''
global _services
assert _services is None
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
yield tractor.current_actor()
@asynccontextmanager @asynccontextmanager
async def maybe_open_runtime( async def maybe_open_runtime(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -283,13 +326,20 @@ async def maybe_spawn_daemon(
lock = Brokerd.locks[service_name] lock = Brokerd.locks[service_name]
await lock.acquire() await lock.acquire()
log.info(f'Scanning for existing {service_name}')
# attach to existing daemon by name if possible # attach to existing daemon by name if possible
async with tractor.find_actor(service_name) as portal: async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as portal:
if portal is not None: if portal is not None:
lock.release() lock.release()
yield portal yield portal
return return
log.warning(f"Couldn't find any existing {service_name}")
# ask root ``pikerd`` daemon to spawn the daemon we need if # ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the # pikerd is not live we now become the root of the
# process tree # process tree