From dfc3fb76fd89ef3085bfab1cc2142b4c285d0910 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 07:18:53 -0500 Subject: [PATCH] Add a service checker predicate --- piker/_daemon.py | 53 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index ff7a129d..a77b189d 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,7 +19,7 @@ Structured, daemon tree service management. """ from typing import Optional, Union, Callable, Any -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from collections import defaultdict from pydantic import BaseModel @@ -130,7 +130,7 @@ class Services(BaseModel): _services: Optional[Services] = None -@asynccontextmanager +@acm async def open_pikerd( start_method: str = 'trio', loglevel: Optional[str] = None, @@ -185,7 +185,7 @@ async def open_pikerd( yield _services -@asynccontextmanager +@acm async def open_piker_runtime( name: str, enable_modules: list[str] = [], @@ -226,7 +226,7 @@ async def open_piker_runtime( yield tractor.current_actor() -@asynccontextmanager +@acm async def maybe_open_runtime( loglevel: Optional[str] = None, **kwargs, @@ -249,7 +249,7 @@ async def maybe_open_runtime( yield -@asynccontextmanager +@acm async def maybe_open_pikerd( loglevel: Optional[str] = None, **kwargs, @@ -300,7 +300,34 @@ class Brokerd: locks = defaultdict(trio.Lock) -@asynccontextmanager +@acm +async def find_service( + service_name: str, + +) -> tractor.Portal: + + log.info(f'Scanning for existing {service_name}') + # attach to existing daemon by name if possible + async with tractor.find_actor( + service_name, + arbiter_sockaddr=_registry_addr, + ) as portal: + yield portal + + +async def check_for_service( + service_name: str, + +) -> bool: + ''' + Service daemon "liveness" predicate. + + ''' + async with find_service(service_name) as portal: + return portal is not None + + +@acm async def maybe_spawn_daemon( service_name: str, @@ -330,19 +357,13 @@ async def maybe_spawn_daemon( lock = Brokerd.locks[service_name] await lock.acquire() - log.info(f'Scanning for existing {service_name}') - # attach to existing daemon by name if possible - async with tractor.find_actor( - service_name, - arbiter_sockaddr=_registry_addr, - - ) as portal: + async with find_service(service_name) as portal: if portal is not None: lock.release() yield portal return - log.warning(f"Couldn't find any existing {service_name}") + log.warning(f"Couldn't find any existing {service_name}") # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the @@ -423,7 +444,7 @@ async def spawn_brokerd( return True -@asynccontextmanager +@acm async def maybe_spawn_brokerd( brokername: str, @@ -485,7 +506,7 @@ async def spawn_emsd( return True -@asynccontextmanager +@acm async def maybe_open_emsd( brokername: str,