Add a service checker predicate

incr_update_backup
Tyler Goodlet 2022-03-07 07:18:53 -05:00
parent d8d7757e88
commit 65609a35dc
1 changed files with 37 additions and 16 deletions

View File

@ -19,7 +19,7 @@ Structured, daemon tree service management.
""" """
from typing import Optional, Union, Callable, Any from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from pydantic import BaseModel from pydantic import BaseModel
@ -130,7 +130,7 @@ class Services(BaseModel):
_services: Optional[Services] = None _services: Optional[Services] = None
@asynccontextmanager @acm
async def open_pikerd( async def open_pikerd(
start_method: str = 'trio', start_method: str = 'trio',
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -185,7 +185,7 @@ async def open_pikerd(
yield _services yield _services
@asynccontextmanager @acm
async def open_piker_runtime( async def open_piker_runtime(
name: str, name: str,
enable_modules: list[str] = [], enable_modules: list[str] = [],
@ -226,7 +226,7 @@ async def open_piker_runtime(
yield tractor.current_actor() yield tractor.current_actor()
@asynccontextmanager @acm
async def maybe_open_runtime( async def maybe_open_runtime(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**kwargs, **kwargs,
@ -249,7 +249,7 @@ async def maybe_open_runtime(
yield yield
@asynccontextmanager @acm
async def maybe_open_pikerd( async def maybe_open_pikerd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**kwargs, **kwargs,
@ -300,7 +300,34 @@ class Brokerd:
locks = defaultdict(trio.Lock) locks = defaultdict(trio.Lock)
@asynccontextmanager @acm
async def find_service(
service_name: str,
) -> tractor.Portal:
log.info(f'Scanning for existing {service_name}')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as portal:
yield portal
async def check_for_service(
service_name: str,
) -> bool:
'''
Service daemon "liveness" predicate.
'''
async with find_service(service_name) as portal:
return portal is not None
@acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
@ -330,13 +357,7 @@ async def maybe_spawn_daemon(
lock = Brokerd.locks[service_name] lock = Brokerd.locks[service_name]
await lock.acquire() await lock.acquire()
log.info(f'Scanning for existing {service_name}') async with find_service(service_name) as portal:
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as portal:
if portal is not None: if portal is not None:
lock.release() lock.release()
yield portal yield portal
@ -423,7 +444,7 @@ async def spawn_brokerd(
return True return True
@asynccontextmanager @acm
async def maybe_spawn_brokerd( async def maybe_spawn_brokerd(
brokername: str, brokername: str,
@ -485,7 +506,7 @@ async def spawn_emsd(
return True return True
@asynccontextmanager @acm
async def maybe_open_emsd( async def maybe_open_emsd(
brokername: str, brokername: str,