Better formalize `pikerd` service semantics
An async exit stack around the new `@tractor.context` is problematic since a pushed context can't bubble errors unless the exit stack has been closed. But in that case why do you need the exit stack if you're going to push it and wait it right away; it seems more correct to use a nursery and spawn a task in `pikerd` that waits on the both the target context completion first (thus being able to bubble up any errors from the remote, and top level service task) and the sub-actor portal. (Sub)service Daemons are spawned with `.start_actor()` and thus will block forever until cancelled so, add a way to cancel them explicitly which we'll need eventually for restarts and dynamic feed management. The big lesson here is that async exit stacks are not conducive to spawning and monitoring service tasks, and especially so if a `@tractor.context` is used since if the `.open_context()` call isn't exited (only possible by the stack being closed), then there will be no way for `trio` to cancel the task that pushed that context (since it can't run a checkpoint while yielded inside the stack) without also cancelling all other contexts pushed on that stack. Presuming one `pikerd` task is used to do the original pushing (which it was) then any error would have to kill all service daemon tasks which obviously won't work. I see this mostly as the painz of tinkering out an SC service manager with `tractor` / `trio` for the first time, so try to go easy on the process ;Pwait_on_daemon_portals
parent
cc9a720af7
commit
7b6e34aaf4
140
piker/_daemon.py
140
piker/_daemon.py
|
@ -19,11 +19,12 @@ Structured, daemon tree service management.
|
|||
|
||||
"""
|
||||
from typing import Optional, Union, Callable, Any
|
||||
from contextlib import asynccontextmanager, AsyncExitStack
|
||||
from contextlib import asynccontextmanager
|
||||
from collections import defaultdict
|
||||
|
||||
from pydantic import BaseModel
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
|
||||
from .log import get_logger, get_console_log
|
||||
|
@ -49,40 +50,75 @@ class Services(BaseModel):
|
|||
actor_n: tractor._trionics.ActorNursery
|
||||
service_n: trio.Nursery
|
||||
debug_mode: bool # tractor sub-actor debug mode flag
|
||||
ctx_stack: AsyncExitStack
|
||||
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
async def open_remote_ctx(
|
||||
async def start_service_task(
|
||||
self,
|
||||
name: str,
|
||||
portal: tractor.Portal,
|
||||
target: Callable,
|
||||
**kwargs,
|
||||
|
||||
) -> tractor.Context:
|
||||
) -> (trio.CancelScope, tractor.Context):
|
||||
'''
|
||||
Open a context in a service sub-actor, add to a stack
|
||||
that gets unwound at ``pikerd`` tearodwn.
|
||||
that gets unwound at ``pikerd`` teardown.
|
||||
|
||||
This allows for allocating long-running sub-services in our main
|
||||
daemon and explicitly controlling their lifetimes.
|
||||
|
||||
'''
|
||||
async def open_context_in_task():
|
||||
async def open_context_in_task(
|
||||
task_status: TaskStatus[
|
||||
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
async with portal.open_context(
|
||||
target,
|
||||
**kwargs,
|
||||
) as (ctx, first):
|
||||
) -> Any:
|
||||
|
||||
await ctx.result()
|
||||
with trio.CancelScope() as cs:
|
||||
|
||||
await portal.result()
|
||||
async with portal.open_context(
|
||||
target,
|
||||
**kwargs,
|
||||
|
||||
self.service_n.start_soon(open_context_in_task)
|
||||
) as (ctx, first):
|
||||
|
||||
return 'yo, dis a daemon yo.'
|
||||
# unblock once the remote context has started
|
||||
task_status.started((cs, first))
|
||||
|
||||
# wait on any context's return value
|
||||
ctx_res = await ctx.result()
|
||||
log.info(
|
||||
f'`pikerd` service {name} started with value {ctx_res}'
|
||||
)
|
||||
|
||||
# wait on any error from the sub-actor
|
||||
# NOTE: this will block indefinitely until cancelled
|
||||
# either by error from the target context function or
|
||||
# by being cancelled here by the surroundingn cancel
|
||||
# scope
|
||||
return await (portal.result(), ctx_res)
|
||||
|
||||
cs, first = await self.service_n.start(open_context_in_task)
|
||||
|
||||
# store the cancel scope and portal for later cancellation or
|
||||
# retstart if needed.
|
||||
self.service_tasks[name] = (cs, portal)
|
||||
|
||||
return cs, first
|
||||
|
||||
async def cancel_service(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> Any:
|
||||
|
||||
log.info(f'Cancelling `pikerd` service {name}')
|
||||
cs, portal = self.service_tasks[name]
|
||||
cs.cancel()
|
||||
return await portal.cancel_actor()
|
||||
|
||||
|
||||
_services: Optional[Services] = None
|
||||
|
@ -125,22 +161,22 @@ async def open_pikerd(
|
|||
# spawn other specialized daemons I think?
|
||||
enable_modules=_root_modules,
|
||||
) as _,
|
||||
|
||||
tractor.open_nursery() as actor_nursery,
|
||||
):
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
|
||||
# setup service mngr singleton instance
|
||||
async with AsyncExitStack() as stack:
|
||||
# # setup service mngr singleton instance
|
||||
# async with AsyncExitStack() as stack:
|
||||
|
||||
# assign globally for future daemon/task creation
|
||||
_services = Services(
|
||||
actor_n=actor_nursery,
|
||||
service_n=service_nursery,
|
||||
debug_mode=debug_mode,
|
||||
ctx_stack=stack,
|
||||
)
|
||||
# assign globally for future daemon/task creation
|
||||
_services = Services(
|
||||
actor_n=actor_nursery,
|
||||
service_n=service_nursery,
|
||||
debug_mode=debug_mode,
|
||||
)
|
||||
|
||||
yield _services
|
||||
yield _services
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -182,16 +218,20 @@ async def maybe_open_pikerd(
|
|||
|
||||
# subtle, we must have the runtime up here or portal lookup will fail
|
||||
async with maybe_open_runtime(loglevel, **kwargs):
|
||||
|
||||
async with tractor.find_actor(_root_dname) as portal:
|
||||
# assert portal is not None
|
||||
if portal is not None:
|
||||
yield portal
|
||||
return
|
||||
|
||||
# presume pikerd role
|
||||
# presume pikerd role since no daemon could be found at
|
||||
# configured address
|
||||
async with open_pikerd(
|
||||
|
||||
loglevel=loglevel,
|
||||
debug_mode=kwargs.get('debug_mode', False),
|
||||
|
||||
) as _:
|
||||
# in the case where we're starting up the
|
||||
# tractor-piker runtime stack in **this** process
|
||||
|
@ -217,7 +257,7 @@ class Brokerd:
|
|||
async def maybe_spawn_daemon(
|
||||
|
||||
service_name: str,
|
||||
spawn_func: Callable,
|
||||
service_task_target: Callable,
|
||||
spawn_args: dict[str, Any],
|
||||
loglevel: Optional[str] = None,
|
||||
**kwargs,
|
||||
|
@ -227,6 +267,13 @@ async def maybe_spawn_daemon(
|
|||
If no ``service_name`` daemon-actor can be found,
|
||||
spawn one in a local subactor and return a portal to it.
|
||||
|
||||
If this function is called from a non-pikerd actor, the
|
||||
spawned service will persist as long as pikerd does or
|
||||
it is requested to be cancelled.
|
||||
|
||||
This can be seen as a service starting api for remote-actor
|
||||
clients.
|
||||
|
||||
"""
|
||||
if loglevel:
|
||||
get_console_log(loglevel)
|
||||
|
@ -254,13 +301,24 @@ async def maybe_spawn_daemon(
|
|||
) as pikerd_portal:
|
||||
|
||||
if pikerd_portal is None:
|
||||
# we are root so spawn brokerd directly in our tree
|
||||
# the root nursery is accessed through process global state
|
||||
await spawn_func(**spawn_args)
|
||||
# we are the root and thus are `pikerd`
|
||||
# so spawn the target service directly by calling
|
||||
# the provided target routine.
|
||||
# XXX: this assumes that the target is well formed and will
|
||||
# do the right things to setup both a sub-actor **and** call
|
||||
# the ``_Services`` api from above to start the top level
|
||||
# service task for that actor.
|
||||
await service_task_target(**spawn_args)
|
||||
|
||||
else:
|
||||
# tell the remote `pikerd` to start the target,
|
||||
# the target can't return a non-serializable value
|
||||
# since it is expected that service startingn is
|
||||
# non-blocking and the target task will persist running
|
||||
# on `pikerd` after the client requesting it's start
|
||||
# disconnects.
|
||||
await pikerd_portal.run(
|
||||
spawn_func,
|
||||
service_task_target,
|
||||
**spawn_args,
|
||||
)
|
||||
|
||||
|
@ -275,7 +333,7 @@ async def spawn_brokerd(
|
|||
loglevel: Optional[str] = None,
|
||||
**tractor_kwargs,
|
||||
|
||||
) -> tractor._portal.Portal:
|
||||
) -> bool:
|
||||
|
||||
log.info(f'Spawning {brokername} broker daemon')
|
||||
|
||||
|
@ -288,6 +346,8 @@ async def spawn_brokerd(
|
|||
global _services
|
||||
assert _services
|
||||
|
||||
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
||||
# actor nursery
|
||||
portal = await _services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=_data_mods + [brokermod.__name__],
|
||||
|
@ -299,13 +359,13 @@ async def spawn_brokerd(
|
|||
# non-blocking setup of brokerd service nursery
|
||||
from .data import _setup_persistent_brokerd
|
||||
|
||||
await _services.open_remote_ctx(
|
||||
await _services.start_service_task(
|
||||
dname,
|
||||
portal,
|
||||
_setup_persistent_brokerd,
|
||||
brokername=brokername,
|
||||
)
|
||||
|
||||
return dname
|
||||
return True
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -322,7 +382,7 @@ async def maybe_spawn_brokerd(
|
|||
async with maybe_spawn_daemon(
|
||||
|
||||
f'brokerd.{brokername}',
|
||||
spawn_func=spawn_brokerd,
|
||||
service_task_target=spawn_brokerd,
|
||||
spawn_args={'brokername': brokername, 'loglevel': loglevel},
|
||||
loglevel=loglevel,
|
||||
**kwargs,
|
||||
|
@ -336,7 +396,7 @@ async def spawn_emsd(
|
|||
loglevel: Optional[str] = None,
|
||||
**extra_tractor_kwargs
|
||||
|
||||
) -> tractor._portal.Portal:
|
||||
) -> bool:
|
||||
"""
|
||||
Start the clearing engine under ``pikerd``.
|
||||
|
||||
|
@ -360,12 +420,12 @@ async def spawn_emsd(
|
|||
# non-blocking setup of clearing service
|
||||
from .clearing._ems import _setup_persistent_emsd
|
||||
|
||||
await _services.open_remote_ctx(
|
||||
await _services.start_service_task(
|
||||
'emsd',
|
||||
portal,
|
||||
_setup_persistent_emsd,
|
||||
)
|
||||
|
||||
return 'emsd'
|
||||
return True
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -380,7 +440,7 @@ async def maybe_open_emsd(
|
|||
async with maybe_spawn_daemon(
|
||||
|
||||
'emsd',
|
||||
spawn_func=spawn_emsd,
|
||||
service_task_target=spawn_emsd,
|
||||
spawn_args={'loglevel': loglevel},
|
||||
loglevel=loglevel,
|
||||
**kwargs,
|
||||
|
|
Loading…
Reference in New Issue