From cc5b21a7e6f40b031a82fe629f615f9080f2a506 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Jun 2024 15:34:57 -0400 Subject: [PATCH] Prep service mngr for move to `tractor.hilevel` Given it's a fairly simple yet useful abstraction, it makes sense to offer this sub-sys alongside the core `tractor` runtime lib. Without going into extreme detail on the impl changes (it'll come in the commit that moves to the other repo) here is the high level summary: ------ - ------ - rename `Services` -> `ServiceMngr` and use an factory `@acm` to guarantee a single-instance-per-actor using a niche approach for a singleton object using a default keyword-arg B) - the mod level `open_service_mngr()` and `get_service_mngr()` are the new allocation/access API. - add a `ServiceMngr.start_service()` method which does the work of both spawning a new subactor (for the daemon) and uses its portal to start the mngr side supervision task. - open actor/task nurseries inside the `@acm` allocator. Adjust other dependent subsystems to match: ------ - ------ - use `open_service_mngr()` when first allocated in `open_pikerd()`. - use `get_service_mngr()` instead of importing the class ref inside `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()` and `.data._sampling.spawn_samplerd()` using a `partial` to pack in the endpoint ctx kwargs (unpacked inside `.start_service()` XD). --- piker/brokers/_daemon.py | 53 +++-- piker/data/_sampling.py | 61 +++--- piker/service/__init__.py | 6 +- piker/service/_actor_runtime.py | 46 ++--- piker/service/_ahab.py | 7 +- piker/service/_daemon.py | 17 +- piker/service/_mngr.py | 352 +++++++++++++++++++++++++++----- piker/service/elastic.py | 4 +- piker/service/marketstore.py | 4 +- tests/conftest.py | 4 +- tests/test_ems.py | 4 +- tests/test_services.py | 4 +- 12 files changed, 420 insertions(+), 142 deletions(-) diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5efb03dd..da92f246 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -23,6 +23,7 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) +from functools import partial from types import ModuleType from typing import ( TYPE_CHECKING, @@ -190,14 +191,17 @@ def broker_init( async def spawn_brokerd( - brokername: str, loglevel: str | None = None, **tractor_kwargs, ) -> bool: + ''' + Spawn a `brokerd.` subactor service daemon + using `pikerd`'s service mngr. + ''' from piker.service._util import log # use service mngr log log.info(f'Spawning {brokername} broker daemon') @@ -217,27 +221,35 @@ async def spawn_brokerd( # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery - from piker.service import Services - + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), - debug_mode=Services.debug_mode, + mngr: ServiceMngr = get_service_mngr() + ctx: tractor.Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( + # signature of target root-task endpoint + daemon_fixture_ep, + + # passed to daemon_fixture_ep(**kwargs) + brokername=brokername, + loglevel=loglevel, + ), + debug_mode=mngr.debug_mode, + loglevel=loglevel, + enable_modules=( + _data_mods + + + tractor_kwargs.pop('enable_modules') + ), **tractor_kwargs ) - - # NOTE: the service mngr expects an already spawned actor + its - # portal ref in order to do non-blocking setup of brokerd - # service nursery. - await Services.start_service_task( - dname, - portal, - - # signature of target root-task endpoint - daemon_fixture_ep, - brokername=brokername, - loglevel=loglevel, + assert ( + not ctx.cancel_called + and ctx.portal # parent side + and dname in ctx.chan.uid # subactor is named as desired ) return True @@ -262,8 +274,7 @@ async def maybe_spawn_brokerd( from piker.service import maybe_spawn_daemon async with maybe_spawn_daemon( - - f'brokerd.{brokername}', + service_name=f'brokerd.{brokername}', service_task_target=spawn_brokerd, spawn_args={ 'brokername': brokername, diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7bb0231d..83cd363d 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -25,6 +25,7 @@ from collections import ( defaultdict, ) from contextlib import asynccontextmanager as acm +from functools import partial import time from typing import ( Any, @@ -42,7 +43,7 @@ from tractor.trionics import ( maybe_open_nursery, ) import trio -from trio_typing import TaskStatus +from trio import TaskStatus from .ticktools import ( frame_ticks, @@ -70,6 +71,7 @@ if TYPE_CHECKING: _default_delay_s: float = 1.0 +# TODO: use new `tractor.singleton_acm` API for this! class Sampler: ''' Global sampling engine registry. @@ -79,9 +81,9 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step-sample (real-time) quote feeds, see - ``.service.maybe_open_samplerd()`` and the below - ``register_with_sampler()``. + time-step-sample a (real-time) quote feeds, see + `.service.maybe_open_samplerd()` and the below + `register_with_sampler()`. ''' service_nursery: None | trio.Nursery = None @@ -375,7 +377,10 @@ async def register_with_sampler( assert Sampler.ohlcv_shms # unblock caller - await ctx.started(set(Sampler.ohlcv_shms.keys())) + await ctx.started( + # XXX bc msgpack only allows one array type! + list(Sampler.ohlcv_shms.keys()) + ) if open_index_stream: try: @@ -419,7 +424,6 @@ async def register_with_sampler( async def spawn_samplerd( - loglevel: str | None = None, **extra_tractor_kwargs @@ -429,7 +433,10 @@ async def spawn_samplerd( update and increment count write and stream broadcasting. ''' - from piker.service import Services + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname = 'samplerd' log.info(f'Spawning `{dname}`') @@ -437,26 +444,33 @@ async def spawn_samplerd( # singleton lock creation of ``samplerd`` since we only ever want # one daemon per ``pikerd`` proc tree. # TODO: make this built-into the service api? - async with Services.locks[dname + '_singleton']: + mngr: ServiceMngr = get_service_mngr() + already_started: bool = dname in mngr.service_tasks - if dname not in Services.service_tasks: - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=[ - 'piker.data._sampling', - ], - loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag - **extra_tractor_kwargs - ) - - await Services.start_service_task( - dname, - portal, + async with mngr._locks[dname + '_singleton']: + ctx: Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( register_with_sampler, period_s=1, sub_for_broadcasts=False, + ), + debug_mode=mngr.debug_mode, # set by pikerd flag + + # proxy-through to tractor + enable_modules=[ + 'piker.data._sampling', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + if not already_started: + assert ( + ctx + and + ctx.portal + and + not ctx.cancel_called ) return True @@ -889,6 +903,7 @@ async def uniform_rate_send( # to consumers which crash or lose network connection. # I.e. we **DO NOT** want to crash and propagate up to # ``pikerd`` these kinds of errors! + trio.EndOfChannel, trio.ClosedResourceError, trio.BrokenResourceError, ConnectionResetError, diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 29360620..beb9c70b 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for, => TODO: maybe to (re)move elsewhere? ''' -from ._mngr import Services as Services +from ._mngr import ( + get_service_mngr as get_service_mngr, + open_service_mngr as open_service_mngr, + ServiceMngr as ServiceMngr, +) from ._registry import ( _tractor_kwargs as _tractor_kwargs, _default_reg_addr as _default_reg_addr, diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index a4e3ccf2..42440f82 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -21,7 +21,6 @@ from __future__ import annotations import os from typing import ( - Optional, Any, ClassVar, ) @@ -30,13 +29,13 @@ from contextlib import ( ) import tractor -import trio from ._util import ( get_console_log, ) from ._mngr import ( - Services, + open_service_mngr, + ServiceMngr, ) from ._registry import ( # noqa _tractor_kwargs, @@ -59,7 +58,7 @@ async def open_piker_runtime( registry_addrs: list[tuple[str, int]] = [], enable_modules: list[str] = [], - loglevel: Optional[str] = None, + loglevel: str|None = None, # XXX NOTE XXX: you should pretty much never want debug mode # for data daemons when running in production. @@ -69,7 +68,7 @@ async def open_piker_runtime( # and spawn the service tree distributed per that. start_method: str = 'trio', - tractor_runtime_overrides: dict | None = None, + tractor_runtime_overrides: dict|None = None, **tractor_kwargs, ) -> tuple[ @@ -119,6 +118,10 @@ async def open_piker_runtime( # spawn other specialized daemons I think? enable_modules=enable_modules, + # TODO: how to configure this? + # keep it on by default if debug mode is set? + # maybe_enable_greenback=debug_mode, + **tractor_kwargs, ) as actor, @@ -167,12 +170,13 @@ async def open_pikerd( **kwargs, -) -> Services: +) -> ServiceMngr: ''' - Start a root piker daemon with an indefinite lifetime. + Start a root piker daemon actor (aka `pikerd`) with an indefinite + lifetime. - A root actor nursery is created which can be used to create and keep - alive underling services (see below). + A root actor-nursery is created which can be used to spawn and + supervise underling service sub-actors (see below). ''' # NOTE: for the root daemon we always enable the root @@ -199,8 +203,6 @@ async def open_pikerd( root_actor, reg_addrs, ), - tractor.open_nursery() as actor_nursery, - trio.open_nursery() as service_nursery, ): for addr in reg_addrs: if addr not in root_actor.accept_addrs: @@ -209,25 +211,17 @@ async def open_pikerd( 'Maybe you have another daemon already running?' ) - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - - try: - yield Services - - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() + mngr: ServiceMngr + async with open_service_mngr( + debug_mode=debug_mode, + ) as mngr: + yield mngr # TODO: do we even need this? # @acm # async def maybe_open_runtime( -# loglevel: Optional[str] = None, +# loglevel: str|None = None, # **kwargs, # ) -> None: @@ -256,7 +250,7 @@ async def maybe_open_pikerd( loglevel: str | None = None, **kwargs, -) -> tractor._portal.Portal | ClassVar[Services]: +) -> tractor._portal.Portal | ClassVar[ServiceMngr]: ''' If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 4cccf855..0bdd1688 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -49,7 +49,7 @@ from requests.exceptions import ( ReadTimeout, ) -from ._mngr import Services +from ._mngr import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -453,7 +453,7 @@ async def open_ahabd( @acm async def start_ahab_service( - services: Services, + services: ServiceMngr, service_name: str, # endpoint config passed as **kwargs @@ -549,7 +549,8 @@ async def start_ahab_service( log.warning('Failed to cancel root permsed container') except ( - trio.MultiError, + # trio.MultiError, + ExceptionGroup, ) as err: for subexc in err.exceptions: if isinstance(subexc, PermissionError): diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 1e7ff096..a76918ec 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -26,14 +26,17 @@ from typing import ( from contextlib import ( asynccontextmanager as acm, ) +from collections import defaultdict import tractor +import trio from ._util import ( log, # sub-sys logger ) from ._mngr import ( - Services, + get_service_mngr, + ServiceMngr, ) from ._actor_runtime import maybe_open_pikerd from ._registry import find_service @@ -41,15 +44,14 @@ from ._registry import find_service @acm async def maybe_spawn_daemon( - service_name: str, service_task_target: Callable, - spawn_args: dict[str, Any], loglevel: str | None = None, singleton: bool = False, + _locks = defaultdict(trio.Lock), **pikerd_kwargs, ) -> tractor.Portal: @@ -67,7 +69,7 @@ async def maybe_spawn_daemon( ''' # serialize access to this section to avoid # 2 or more tasks racing to create a daemon - lock = Services.locks[service_name] + lock = _locks[service_name] await lock.acquire() async with find_service( @@ -147,21 +149,22 @@ async def spawn_emsd( """ log.info('Spawning emsd') - portal = await Services.actor_n.start_actor( + smngr: ServiceMngr = get_service_mngr() + portal = await smngr.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag + debug_mode=smngr.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await Services.start_service_task( + await smngr.start_service_task( 'emsd', portal, diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 89e98411..3197bef3 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,16 +18,29 @@ daemon-service management API. """ +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + # contextmanager as cm, +) from collections import defaultdict +from dataclasses import ( + dataclass, + field, +) +import functools +import inspect from typing import ( Callable, Any, ) -import trio -from trio_typing import TaskStatus +import msgspec import tractor +import trio +from trio import TaskStatus from tractor import ( + ActorNursery, current_actor, ContextCancelled, Context, @@ -39,6 +52,130 @@ from ._util import ( ) +# TODO: implement a singleton deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + + +# TODO: singleton factory API instead of a class API +@acm +async def open_service_mngr( + *, + debug_mode: bool = False, + + # impl deat which ensures a single global instance + _singleton: list[ServiceMngr|None] = [None], + **init_kwargs, + +) -> ServiceMngr: + ''' + Open a multi-subactor-as-service-daemon tree supervisor. + + The delivered `ServiceMngr` is a singleton instance for each + actor-process and is allocated on first open and never + de-allocated unless explicitly deleted by al call to + `del_service_mngr()`. + + ''' + # TODO: factor this an allocation into + # a `._mngr.open_service_mngr()` and put in the + # once-n-only-once setup/`.__aenter__()` part! + # -[ ] how to make this only happen on the `mngr == None` case? + # |_ use `.trionics.maybe_open_context()` (for generic + # async-with-style-only-once of the factory impl, though + # what do we do for the allocation case? + # / `.maybe_open_nursery()` (since for this specific case + # it's simpler?) to activate + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as tn, + ): + # impl specific obvi.. + init_kwargs.update({ + 'actor_n': an, + 'service_n': tn, + }) + + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + + # TODO: put into `.__aenter__()` section of + # eventual `@singleton_acm` API wrapper. + # + # assign globally for future daemon/task creation + mngr.actor_n = an + mngr.service_n = tn + + else: + assert ( + mngr.actor_n + and + mngr.service_tn + ) + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + try: + # NOTE: this is a singleton factory impl specific detail + # which should be supported in the condensed + # `@singleton_acm` API? + mngr.debug_mode = debug_mode + + yield mngr + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in mngr.service_tasks: + # await mngr.cancel_service('samplerd') + tn.cancel_scope.cancel() + + + +def get_service_mngr() -> ServiceMngr: + ''' + Try to get the singleton service-mngr for this actor presuming it + has already been allocated using, + + .. code:: python + + async with open_<@singleton_acm(func)>() as mngr` + ... this block kept open ... + + If not yet allocated raise a `ServiceError`. + + ''' + # https://stackoverflow.com/a/12627202 + # https://docs.python.org/3/library/inspect.html#inspect.Signature + maybe_mngr: ServiceMngr|None = inspect.signature( + open_service_mngr + ).parameters['_singleton'].default[0] + + if maybe_mngr is None: + raise RuntimeError( + 'Someone must allocate a `ServiceMngr` using\n\n' + '`async with open_service_mngr()` beforehand!!\n' + ) + + return maybe_mngr + + # TODO: we need remote wrapping and a general soln: # - factor this into a ``tractor.highlevel`` extension # pack for the # library. @@ -46,31 +183,46 @@ from ._util import ( # to the pikerd actor for starting services remotely! # - prolly rename this to ActorServicesNursery since it spawns # new actors and supervises them to completion? -class Services: +@dataclass +class ServiceMngr: +# class ServiceMngr(msgspec.Struct): + ''' + A multi-subactor-as-service manager. - actor_n: tractor._supervise.ActorNursery + Spawn, supervise and monitor service/daemon subactors in a SC + process tree. + + ''' + actor_n: ActorNursery service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag + debug_mode: bool = False # tractor sub-actor debug mode flag + service_tasks: dict[ str, tuple[ trio.CancelScope, + Context, Portal, trio.Event, ] - ] = {} - locks = defaultdict(trio.Lock) + ] = field(default_factory=dict) + + # internal per-service task mutexs + _locks = defaultdict(trio.Lock) - @classmethod async def start_service_task( self, name: str, portal: Portal, + + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? target: Callable, + allow_overruns: bool = False, **ctx_kwargs, - ) -> (trio.CancelScope, Context): + ) -> (trio.CancelScope, Context, Any): ''' Open a context in a service sub-actor, add to a stack that gets unwound at ``pikerd`` teardown. @@ -83,6 +235,7 @@ class Services: task_status: TaskStatus[ tuple[ trio.CancelScope, + Context, trio.Event, Any, ] @@ -90,64 +243,87 @@ class Services: ) -> Any: + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? with trio.CancelScope() as cs: + try: + async with portal.open_context( + target, + allow_overruns=allow_overruns, + **ctx_kwargs, - async with portal.open_context( - target, - allow_overruns=allow_overruns, - **ctx_kwargs, + ) as (ctx, started): - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res: Any = await ctx.result() + ctx_res: Any = await ctx.wait_for_result() # NOTE: blocks indefinitely until cancelled # either by error from the target context # function or by being cancelled here by the # surrounding cancel scope. - return (await portal.result(), ctx_res) - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.channel.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service {name} was remotely cancelled?\n' - f'remote canceller: {canceller}\n' - f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' - ) - else: - raise + return ( + await portal.wait_for_result(), + ctx_res, + ) + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise - cs, complete, first = await self.service_n.start(open_context_in_task) + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, sub_ctx, complete, started = await self.service_n.start( + open_context_in_task + ) # store the cancel scope and portal for later cancellation or # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) + self.service_tasks[name] = (cs, sub_ctx, portal, complete) + return cs, sub_ctx, started - return cs, first - - @classmethod async def cancel_service( self, name: str, @@ -158,8 +334,80 @@ class Services: ''' log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() + cs, sub_ctx, portal, complete = self.service_tasks[name] + + # cs.cancel() + await sub_ctx.cancel() await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Serice task for {name} not terminated?' + ) + + # assert name not in self.service_tasks, \ + # f'Serice task for {name} not terminated?' + + async def start_service( + self, + daemon_name: str, + ctx_ep: Callable, # kwargs must `partial`-ed in! + + debug_mode: bool = False, + **tractor_actor_kwargs, + + ) -> Context: + ''' + Start a "service" task in a new sub-actor (daemon) and manage it's lifetime + indefinitely. + + Services can be cancelled/shutdown using `.cancel_service()`. + + ''' + entry: tuple|None = self.service_tasks.get(daemon_name) + if entry: + (cs, sub_ctx, portal, complete) = entry + return sub_ctx + + if daemon_name not in self.service_tasks: + portal = await self.actor_n.start_actor( + daemon_name, + debug_mode=( # maybe set globally during allocate + debug_mode + or + self.debug_mode + ), + **tractor_actor_kwargs, + ) + ctx_kwargs: dict[str, Any] = {} + if isinstance(ctx_ep, functools.partial): + ctx_kwargs: dict[str, Any] = ctx_ep.keywords + ctx_ep: Callable = ctx_ep.func + + (cs, sub_ctx, started) = await self.start_service_task( + daemon_name, + portal, + ctx_ep, + **ctx_kwargs, + ) + + return sub_ctx + + +# TODO: +# -[ ] factor all the common shit from `.data._sampling` +# and `.brokers._daemon` into here / `ServiceMngr` +# in terms of allocating the `Portal` as part of the +# "service-in-subactor" starting! +# -[ ] move to `tractor.hilevel._service`, import and use here! +# NOTE: purposely leaks the ref to the mod-scope Bo +# import tractor +# from tractor.hilevel import ( +# open_service_mngr, +# ServiceMngr, +# ) +# mngr: ServiceMngr|None = None +# with tractor.hilevel.open_service_mngr() as mngr: +# Services = proxy(mngr) diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 902f4fde..b1a13722 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -21,11 +21,13 @@ from typing import ( TYPE_CHECKING, ) +# TODO: oof, needs to be changed to `httpx`! import asks if TYPE_CHECKING: import docker from ._ahab import DockerContainer + from . import ServiceMngr from ._util import log # sub-sys logger from ._util import ( @@ -127,7 +129,7 @@ def start_elasticsearch( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index c9f49420..852b967c 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -53,7 +53,7 @@ import pendulum # import purerpc from ..data.feed import maybe_open_feed -from . import Services +from . import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -233,7 +233,7 @@ def start_marketstore( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/tests/conftest.py b/tests/conftest.py index 366d5d95..cf77e76e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ from piker import ( config, ) from piker.service import ( - Services, + get_service_mngr, ) from piker.log import get_console_log @@ -129,7 +129,7 @@ async def _open_test_pikerd( ) as service_manager, ): # this proc/actor is the pikerd - assert service_manager is Services + assert service_manager is get_service_mngr() async with tractor.wait_for_actor( 'pikerd', diff --git a/tests/test_ems.py b/tests/test_ems.py index c2f5d7a8..e0305999 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -26,7 +26,7 @@ import pytest import tractor from uuid import uuid4 -from piker.service import Services +from piker.service import ServiceMngr from piker.log import get_logger from piker.clearing._messages import ( Order, @@ -158,7 +158,7 @@ def load_and_check_pos( def test_ems_err_on_bad_broker( - open_test_pikerd: Services, + open_test_pikerd: ServiceMngr, loglevel: str, ): async def load_bad_fqme(): diff --git a/tests/test_services.py b/tests/test_services.py index 433e97f3..ca093929 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -15,7 +15,7 @@ import tractor from piker.service import ( find_service, - Services, + ServiceMngr, ) from piker.data import ( open_feed, @@ -44,7 +44,7 @@ def test_runtime_boot( async def main(): port = 6666 daemon_addr = ('127.0.0.1', port) - services: Services + services: ServiceMngr async with ( open_test_pikerd(