Ensure all sub-services cancel on `pikerd` exit

Previously we were relying on implicit actor termination in
`maybe_spawn_daemon()` but really on `pikerd` teardown we should be sure
to tear down not only all service tasks in each actor but also the actor
runtimes. This adjusts `Services.cancel_service()` to only cancel the
service task scope and wait on the `complete` event and reworks the
`open_context_in_task()` inner closure body to,

- always cancel the service actor at exit.
- not call `.cancel_service()` (potentially causing recursion issues on
  cancellation).
- allocate a `complete: trio.Event` to signal full task + actor termination.
- pop the service task from the `.service_tasks` registry.

Further, add a `maybe_set_global_registry_sockaddr()` helper-cm to do
the work of checking whether a registry socket needs-to/has-been set
and use it for discovery calls to the `pikerd` service tree.
samplerd_service
Tyler Goodlet 2023-01-10 15:25:25 -05:00
parent 6a1bb13feb
commit c8c641a038
1 changed files with 118 additions and 72 deletions

View File

@ -19,7 +19,10 @@ Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from collections import defaultdict
import tractor
@ -57,12 +60,21 @@ _root_modules = [
]
# TODO: factor this into a ``tractor.highlevel`` extension
# pack for the library.
class Services:
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
service_tasks: dict[
str,
tuple[
trio.CancelScope,
tractor.Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
@classmethod
@ -84,7 +96,12 @@ class Services:
'''
async def open_context_in_task(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
tuple[
trio.CancelScope,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
@ -96,28 +113,33 @@ class Services:
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res = await ctx.result()
except tractor.ContextCancelled:
return await self.cancel_service(name)
else:
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until
# cancelled either by error from the target
# context function or by being cancelled here by
# the surrounding cancel scope
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
self.service_tasks[name] = (cs, portal, complete)
return cs, first
@ -127,13 +149,38 @@ class Services:
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
# XXX: not entirely sure why this is required,
# and should probably be better fine tuned in
# ``tractor``?
cs, portal, complete = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'
@cm
def maybe_set_global_registry_sockaddr(
registry_addr: None | tuple[str, int] = None,
) -> None:
global _registry_addr
was_set: bool = False
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
try:
yield _registry_addr
finally:
# XXX: always clear the global addr if we set it so that the
# next (set of) calls will apply whatever new one is passed
# in.
if was_set:
_registry_addr = None
@acm
@ -155,20 +202,13 @@ async def open_pikerd(
alive underling services (see below).
'''
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr:
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
arbiter_addr=reg_addr,
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
@ -188,7 +228,12 @@ async def open_pikerd(
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield
finally:
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
@acm
@ -209,20 +254,12 @@ async def open_piker_runtime(
existing piker actors on the local link based on configuration.
'''
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr:
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
arbiter_addr=reg_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
@ -325,6 +362,11 @@ async def find_service(
service_name: str,
) -> Optional[tractor.Portal]:
global _registry_addr
if not _registry_addr:
yield None
return
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
@ -342,6 +384,10 @@ async def check_for_service(
Service daemon "liveness" predicate.
'''
global _registry_addr
if not _registry_addr:
return None
async with tractor.query_actor(
service_name,
arbiter_sockaddr=_registry_addr,