From 31f2b01c3ecb4e2411d116159bc5bc6f5bbd9c5a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:59:19 -0500 Subject: [PATCH] Move `Services` api to `.service._mngr` mod --- piker/service/__init__.py | 110 ++---------------------------- piker/service/_mngr.py | 136 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 105 deletions(-) create mode 100644 piker/service/_mngr.py diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 04ec4c28..e5e2c1fa 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -15,7 +15,7 @@ # along with this program. If not, see . """ -Structured, daemon tree service management. +Actor-runtime service orchestration machinery. """ from __future__ import annotations @@ -31,17 +31,18 @@ from typing import ( from contextlib import ( asynccontextmanager as acm, ) -from collections import defaultdict import tractor import trio -from trio_typing import TaskStatus from ..log import ( get_logger, get_console_log, ) from ..brokers import get_brokermod +from ._mngr import ( + Services, +) log = get_logger(__name__) @@ -142,107 +143,6 @@ _root_modules = [ ] -# TODO: factor this into a ``tractor.highlevel`` extension -# pack for the library. -class Services: - - actor_n: tractor._supervise.ActorNursery - service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - tractor.Portal, - trio.Event, - ] - ] = {} - locks = defaultdict(trio.Lock) - - @classmethod - async def start_service_task( - self, - name: str, - portal: tractor.Portal, - target: Callable, - **kwargs, - - ) -> (trio.CancelScope, tractor.Context): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> Any: - - with trio.CancelScope() as cs: - async with portal.open_context( - target, - **kwargs, - - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res = await ctx.result() - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return (await portal.result(), ctx_res) - - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) - - cs, complete, first = await self.service_n.start(open_context_in_task) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) - - return cs, first - - @classmethod - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() - await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' - - @acm async def open_piker_runtime( name: str, diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py new file mode 100644 index 00000000..04f396af --- /dev/null +++ b/piker/service/_mngr.py @@ -0,0 +1,136 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +daemon-service management API. + +""" +from collections import defaultdict +from typing import ( + Callable, + Any, +) + +import trio +from trio_typing import TaskStatus +import tractor + +from ..log import ( + get_logger, +) + +log = get_logger(__name__) + + +# TODO: factor this into a ``tractor.highlevel`` extension +# pack for the library. +class Services: + + actor_n: tractor._supervise.ActorNursery + service_n: trio.Nursery + debug_mode: bool # tractor sub-actor debug mode flag + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + tractor.Portal, + trio.Event, + ] + ] = {} + locks = defaultdict(trio.Lock) + + @classmethod + async def start_service_task( + self, + name: str, + portal: tractor.Portal, + target: Callable, + **kwargs, + + ) -> (trio.CancelScope, tractor.Context): + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` teardown. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + async def open_context_in_task( + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + + ) -> Any: + + with trio.CancelScope() as cs: + async with portal.open_context( + target, + **kwargs, + + ) as (ctx, first): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started((cs, complete, first)) + log.info( + f'`pikerd` service {name} started with value {first}' + ) + try: + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res = await ctx.result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return (await portal.result(), ctx_res) + + finally: + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, complete, first = await self.service_n.start(open_context_in_task) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = (cs, portal, complete) + + return cs, first + + @classmethod + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, portal, complete = self.service_tasks[name] + cs.cancel() + await complete.wait() + assert name not in self.service_tasks, \ + f'Serice task for {name} not terminated?'