Add a service checker predicate
parent
7fbd4a95e3
commit
855d02ef5a
|
@ -19,7 +19,7 @@ Structured, daemon tree service management.
|
|||
|
||||
"""
|
||||
from typing import Optional, Union, Callable, Any
|
||||
from contextlib import asynccontextmanager
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from collections import defaultdict
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
@ -123,7 +123,7 @@ class Services(BaseModel):
|
|||
_services: Optional[Services] = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def open_pikerd(
|
||||
start_method: str = 'trio',
|
||||
loglevel: Optional[str] = None,
|
||||
|
@ -178,7 +178,7 @@ async def open_pikerd(
|
|||
yield _services
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def open_piker_runtime(
|
||||
name: str,
|
||||
enable_modules: list[str] = [],
|
||||
|
@ -219,7 +219,7 @@ async def open_piker_runtime(
|
|||
yield tractor.current_actor()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_open_runtime(
|
||||
loglevel: Optional[str] = None,
|
||||
**kwargs,
|
||||
|
@ -242,7 +242,7 @@ async def maybe_open_runtime(
|
|||
yield
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_open_pikerd(
|
||||
loglevel: Optional[str] = None,
|
||||
**kwargs,
|
||||
|
@ -293,7 +293,34 @@ class Brokerd:
|
|||
locks = defaultdict(trio.Lock)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def find_service(
|
||||
service_name: str,
|
||||
|
||||
) -> tractor.Portal:
|
||||
|
||||
log.info(f'Scanning for existing {service_name}')
|
||||
# attach to existing daemon by name if possible
|
||||
async with tractor.find_actor(
|
||||
service_name,
|
||||
arbiter_sockaddr=_registry_addr,
|
||||
) as portal:
|
||||
yield portal
|
||||
|
||||
|
||||
async def check_for_service(
|
||||
service_name: str,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Service daemon "liveness" predicate.
|
||||
|
||||
'''
|
||||
async with find_service(service_name) as portal:
|
||||
return portal is not None
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_spawn_daemon(
|
||||
|
||||
service_name: str,
|
||||
|
@ -323,19 +350,13 @@ async def maybe_spawn_daemon(
|
|||
lock = Brokerd.locks[service_name]
|
||||
await lock.acquire()
|
||||
|
||||
log.info(f'Scanning for existing {service_name}')
|
||||
# attach to existing daemon by name if possible
|
||||
async with tractor.find_actor(
|
||||
service_name,
|
||||
arbiter_sockaddr=_registry_addr,
|
||||
|
||||
) as portal:
|
||||
async with find_service(service_name) as portal:
|
||||
if portal is not None:
|
||||
lock.release()
|
||||
yield portal
|
||||
return
|
||||
|
||||
log.warning(f"Couldn't find any existing {service_name}")
|
||||
log.warning(f"Couldn't find any existing {service_name}")
|
||||
|
||||
# ask root ``pikerd`` daemon to spawn the daemon we need if
|
||||
# pikerd is not live we now become the root of the
|
||||
|
@ -415,7 +436,7 @@ async def spawn_brokerd(
|
|||
return True
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_spawn_brokerd(
|
||||
|
||||
brokername: str,
|
||||
|
@ -477,7 +498,7 @@ async def spawn_emsd(
|
|||
return True
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_open_emsd(
|
||||
|
||||
brokername: str,
|
||||
|
|
Loading…
Reference in New Issue