Add a service checker predicate
parent
881b1afc12
commit
dfc3fb76fd
|
@ -19,7 +19,7 @@ Structured, daemon tree service management.
|
|||
|
||||
"""
|
||||
from typing import Optional, Union, Callable, Any
|
||||
from contextlib import asynccontextmanager
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from collections import defaultdict
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
@ -130,7 +130,7 @@ class Services(BaseModel):
|
|||
_services: Optional[Services] = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def open_pikerd(
|
||||
start_method: str = 'trio',
|
||||
loglevel: Optional[str] = None,
|
||||
|
@ -185,7 +185,7 @@ async def open_pikerd(
|
|||
yield _services
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def open_piker_runtime(
|
||||
name: str,
|
||||
enable_modules: list[str] = [],
|
||||
|
@ -226,7 +226,7 @@ async def open_piker_runtime(
|
|||
yield tractor.current_actor()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_open_runtime(
|
||||
loglevel: Optional[str] = None,
|
||||
**kwargs,
|
||||
|
@ -249,7 +249,7 @@ async def maybe_open_runtime(
|
|||
yield
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_open_pikerd(
|
||||
loglevel: Optional[str] = None,
|
||||
**kwargs,
|
||||
|
@ -300,7 +300,34 @@ class Brokerd:
|
|||
locks = defaultdict(trio.Lock)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def find_service(
|
||||
service_name: str,
|
||||
|
||||
) -> tractor.Portal:
|
||||
|
||||
log.info(f'Scanning for existing {service_name}')
|
||||
# attach to existing daemon by name if possible
|
||||
async with tractor.find_actor(
|
||||
service_name,
|
||||
arbiter_sockaddr=_registry_addr,
|
||||
) as portal:
|
||||
yield portal
|
||||
|
||||
|
||||
async def check_for_service(
|
||||
service_name: str,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Service daemon "liveness" predicate.
|
||||
|
||||
'''
|
||||
async with find_service(service_name) as portal:
|
||||
return portal is not None
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_spawn_daemon(
|
||||
|
||||
service_name: str,
|
||||
|
@ -330,19 +357,13 @@ async def maybe_spawn_daemon(
|
|||
lock = Brokerd.locks[service_name]
|
||||
await lock.acquire()
|
||||
|
||||
log.info(f'Scanning for existing {service_name}')
|
||||
# attach to existing daemon by name if possible
|
||||
async with tractor.find_actor(
|
||||
service_name,
|
||||
arbiter_sockaddr=_registry_addr,
|
||||
|
||||
) as portal:
|
||||
async with find_service(service_name) as portal:
|
||||
if portal is not None:
|
||||
lock.release()
|
||||
yield portal
|
||||
return
|
||||
|
||||
log.warning(f"Couldn't find any existing {service_name}")
|
||||
log.warning(f"Couldn't find any existing {service_name}")
|
||||
|
||||
# ask root ``pikerd`` daemon to spawn the daemon we need if
|
||||
# pikerd is not live we now become the root of the
|
||||
|
@ -423,7 +444,7 @@ async def spawn_brokerd(
|
|||
return True
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_spawn_brokerd(
|
||||
|
||||
brokername: str,
|
||||
|
@ -485,7 +506,7 @@ async def spawn_emsd(
|
|||
return True
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def maybe_open_emsd(
|
||||
|
||||
brokername: str,
|
||||
|
|
Loading…
Reference in New Issue