From c11948aeeb1f2a8e004c51a3ca06882066b705f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 11:21:29 -0500 Subject: [PATCH 1/7] Delegate to `tractor.msg.pretty_struct` since it was factored from here! --- piker/types.py | 228 +------------------------------------------------ 1 file changed, 1 insertion(+), 227 deletions(-) diff --git a/piker/types.py b/piker/types.py index cda3fb44..385f83b0 100644 --- a/piker/types.py +++ b/piker/types.py @@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib types. ''' -from __future__ import annotations -from collections import UserList -from pprint import ( - saferepr, -) -from typing import Any - -from msgspec import ( - msgpack, - Struct as _Struct, - structs, -) - - -class DiffDump(UserList): - ''' - Very simple list delegator that repr() dumps (presumed) tuple - elements of the form `tuple[str, Any, Any]` in a nice - multi-line readable form for analyzing `Struct` diffs. - - ''' - def __repr__(self) -> str: - if not len(self): - return super().__repr__() - - # format by displaying item pair's ``repr()`` on multiple, - # indented lines such that they are more easily visually - # comparable when printed to console when printed to - # console. - repstr: str = '[\n' - for k, left, right in self: - repstr += ( - f'({k},\n' - f'\t{repr(left)},\n' - f'\t{repr(right)},\n' - ')\n' - ) - repstr += ']\n' - return repstr - - -class Struct( - _Struct, - - # https://jcristharif.com/msgspec/structs.html#tagged-unions - # tag='pikerstruct', - # tag=True, -): - ''' - A "human friendlier" (aka repl buddy) struct subtype. - - ''' - def _sin_props(self) -> Iterator[ - tuple[ - structs.FieldIinfo, - str, - Any, - ] - ]: - ''' - Iterate over all non-@property fields of this struct. - - ''' - fi: structs.FieldInfo - for fi in structs.fields(self): - key: str = fi.name - val: Any = getattr(self, key) - yield fi, key, val - - def to_dict( - self, - include_non_members: bool = True, - - ) -> dict: - ''' - Like it sounds.. direct delegation to: - https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict - - BUT, by default we pop all non-member (aka not defined as - struct fields) fields by default. - - ''' - asdict: dict = structs.asdict(self) - if include_non_members: - return asdict - - # only return a dict of the struct members - # which were provided as input, NOT anything - # added as type-defined `@property` methods! - sin_props: dict = {} - fi: structs.FieldInfo - for fi, k, v in self._sin_props(): - sin_props[k] = asdict[k] - - return sin_props - - def pformat( - self, - field_indent: int = 2, - indent: int = 0, - - ) -> str: - ''' - Recursion-safe `pprint.pformat()` style formatting of - a `msgspec.Struct` for sane reading by a human using a REPL. - - ''' - # global whitespace indent - ws: str = ' '*indent - - # field whitespace indent - field_ws: str = ' '*(field_indent + indent) - - # qtn: str = ws + self.__class__.__qualname__ - qtn: str = self.__class__.__qualname__ - - obj_str: str = '' # accumulator - fi: structs.FieldInfo - k: str - v: Any - for fi, k, v in self._sin_props(): - - # TODO: how can we prefer `Literal['option1', 'option2, - # ..]` over .__name__ == `Literal` but still get only the - # latter for simple types like `str | int | None` etc..? - ft: type = fi.type - typ_name: str = getattr(ft, '__name__', str(ft)) - - # recurse to get sub-struct's `.pformat()` output Bo - if isinstance(v, Struct): - val_str: str = v.pformat( - indent=field_indent + indent, - field_indent=indent + field_indent, - ) - - else: # the `pprint` recursion-safe format: - # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr - val_str: str = saferepr(v) - - obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') - - return ( - f'{qtn}(\n' - f'{obj_str}' - f'{ws})' - ) - - # TODO: use a pprint.PrettyPrinter instance around ONLY rendering - # inside a known tty? - # def __repr__(self) -> str: - # ... - - # __str__ = __repr__ = pformat - __repr__ = pformat - - def copy( - self, - update: dict | None = None, - - ) -> Struct: - ''' - Validate-typecast all self defined fields, return a copy of - us with all such fields. - - NOTE: This is kinda like the default behaviour in - `pydantic.BaseModel` except a copy of the object is - returned making it compat with `frozen=True`. - - ''' - if update: - for k, v in update.items(): - setattr(self, k, v) - - # NOTE: roundtrip serialize to validate - # - enode to msgpack binary format, - # - decode that back to a struct. - return msgpack.Decoder(type=type(self)).decode( - msgpack.Encoder().encode(self) - ) - - def typecast( - self, - - # TODO: allow only casting a named subset? - # fields: set[str] | None = None, - - ) -> None: - ''' - Cast all fields using their declared type annotations - (kinda like what `pydantic` does by default). - - NOTE: this of course won't work on frozen types, use - ``.copy()`` above in such cases. - - ''' - # https://jcristharif.com/msgspec/api.html#msgspec.structs.fields - fi: structs.FieldInfo - for fi in structs.fields(self): - setattr( - self, - fi.name, - fi.type(getattr(self, fi.name)), - ) - - def __sub__( - self, - other: Struct, - - ) -> DiffDump[tuple[str, Any, Any]]: - ''' - Compare fields/items key-wise and return a ``DiffDump`` - for easy visual REPL comparison B) - - ''' - diffs: DiffDump[tuple[str, Any, Any]] = DiffDump() - for fi in structs.fields(self): - attr_name: str = fi.name - ours: Any = getattr(self, attr_name) - theirs: Any = getattr(other, attr_name) - if ours != theirs: - diffs.append(( - attr_name, - ours, - theirs, - )) - - return diffs +from tractor.msg import Struct as Struct -- 2.34.1 From 2539d1a2892558a3df1f7c3f586d9e1c4257f20e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 12:30:41 -0500 Subject: [PATCH 2/7] Enable `greenback` for `.pause_from_sync()` by default? --- piker/service/_actor_runtime.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 33f23453..14751304 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -124,6 +124,10 @@ async def open_piker_runtime( enable_modules=enable_modules, hide_tb=False, + # TODO: how to configure this? + # keep it on by default if debug mode is set? + maybe_enable_greenback=False, + **tractor_kwargs, ) as actor, -- 2.34.1 From 13e8ad96142d21273e5ca77a31b6fdaf318cc550 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Jun 2024 15:34:57 -0400 Subject: [PATCH 3/7] Prep service mngr for move to `tractor.hilevel` Given it's a fairly simple yet useful abstraction, it makes sense to offer this sub-sys alongside the core `tractor` runtime lib. Without going into extreme detail on the impl changes (it'll come in the commit that moves to the other repo) here is the high level summary: ------ - ------ - rename `Services` -> `ServiceMngr` and use an factory `@acm` to guarantee a single-instance-per-actor using a niche approach for a singleton object using a default keyword-arg B) - the mod level `open_service_mngr()` and `get_service_mngr()` are the new allocation/access API. - add a `ServiceMngr.start_service()` method which does the work of both spawning a new subactor (for the daemon) and uses its portal to start the mngr side supervision task. - open actor/task nurseries inside the `@acm` allocator. Adjust other dependent subsystems to match: ------ - ------ - use `open_service_mngr()` when first allocated in `open_pikerd()`. - use `get_service_mngr()` instead of importing the class ref inside `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()` and `.data._sampling.spawn_samplerd()` using a `partial` to pack in the endpoint ctx kwargs (unpacked inside `.start_service()` XD). --- piker/brokers/_daemon.py | 53 +++-- piker/data/_sampling.py | 60 +++--- piker/service/__init__.py | 6 +- piker/service/_actor_runtime.py | 45 ++-- piker/service/_ahab.py | 7 +- piker/service/_daemon.py | 20 +- piker/service/_mngr.py | 350 +++++++++++++++++++++++++++----- piker/service/elastic.py | 4 +- piker/service/marketstore.py | 4 +- tests/conftest.py | 4 +- tests/test_ems.py | 4 +- tests/test_services.py | 4 +- 12 files changed, 416 insertions(+), 145 deletions(-) diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5414bfb9..a683a501 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -23,6 +23,7 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) +from functools import partial from types import ModuleType from typing import ( TYPE_CHECKING, @@ -193,14 +194,17 @@ def broker_init( async def spawn_brokerd( - brokername: str, loglevel: str | None = None, **tractor_kwargs, ) -> bool: + ''' + Spawn a `brokerd.` subactor service daemon + using `pikerd`'s service mngr. + ''' from piker.service._util import log # use service mngr log log.info(f'Spawning {brokername} broker daemon') @@ -220,27 +224,35 @@ async def spawn_brokerd( # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery - from piker.service import Services - + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), - debug_mode=Services.debug_mode, + mngr: ServiceMngr = get_service_mngr() + ctx: tractor.Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( + # signature of target root-task endpoint + daemon_fixture_ep, + + # passed to daemon_fixture_ep(**kwargs) + brokername=brokername, + loglevel=loglevel, + ), + debug_mode=mngr.debug_mode, + loglevel=loglevel, + enable_modules=( + _data_mods + + + tractor_kwargs.pop('enable_modules') + ), **tractor_kwargs ) - - # NOTE: the service mngr expects an already spawned actor + its - # portal ref in order to do non-blocking setup of brokerd - # service nursery. - await Services.start_service_task( - dname, - portal, - - # signature of target root-task endpoint - daemon_fixture_ep, - brokername=brokername, - loglevel=loglevel, + assert ( + not ctx.cancel_called + and ctx.portal # parent side + and dname in ctx.chan.uid # subactor is named as desired ) return True @@ -265,8 +277,7 @@ async def maybe_spawn_brokerd( from piker.service import maybe_spawn_daemon async with maybe_spawn_daemon( - - f'brokerd.{brokername}', + service_name=f'brokerd.{brokername}', service_task_target=spawn_brokerd, spawn_args={ 'brokername': brokername, diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index cc32af91..2feb6ad4 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -25,6 +25,7 @@ from collections import ( defaultdict, ) from contextlib import asynccontextmanager as acm +from functools import partial import time from typing import ( Any, @@ -42,7 +43,7 @@ from tractor.trionics import ( maybe_open_nursery, ) import trio -from trio_typing import TaskStatus +from trio import TaskStatus from .ticktools import ( frame_ticks, @@ -70,6 +71,7 @@ if TYPE_CHECKING: _default_delay_s: float = 1.0 +# TODO: use new `tractor.singleton_acm` API for this! class Sampler: ''' Global sampling engine registry. @@ -79,9 +81,9 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step-sample (real-time) quote feeds, see - ``.service.maybe_open_samplerd()`` and the below - ``register_with_sampler()``. + time-step-sample a (real-time) quote feeds, see + `.service.maybe_open_samplerd()` and the below + `register_with_sampler()`. ''' service_nursery: None | trio.Nursery = None @@ -381,7 +383,10 @@ async def register_with_sampler( assert Sampler.ohlcv_shms # unblock caller - await ctx.started(set(Sampler.ohlcv_shms.keys())) + await ctx.started( + # XXX bc msgpack only allows one array type! + list(Sampler.ohlcv_shms.keys()) + ) if open_index_stream: try: @@ -426,7 +431,6 @@ async def register_with_sampler( async def spawn_samplerd( - loglevel: str | None = None, **extra_tractor_kwargs @@ -436,7 +440,10 @@ async def spawn_samplerd( update and increment count write and stream broadcasting. ''' - from piker.service import Services + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname = 'samplerd' log.info(f'Spawning `{dname}`') @@ -444,26 +451,33 @@ async def spawn_samplerd( # singleton lock creation of ``samplerd`` since we only ever want # one daemon per ``pikerd`` proc tree. # TODO: make this built-into the service api? - async with Services.locks[dname + '_singleton']: + mngr: ServiceMngr = get_service_mngr() + already_started: bool = dname in mngr.service_tasks - if dname not in Services.service_tasks: - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=[ - 'piker.data._sampling', - ], - loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag - **extra_tractor_kwargs - ) - - await Services.start_service_task( - dname, - portal, + async with mngr._locks[dname + '_singleton']: + ctx: Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( register_with_sampler, period_s=1, sub_for_broadcasts=False, + ), + debug_mode=mngr.debug_mode, # set by pikerd flag + + # proxy-through to tractor + enable_modules=[ + 'piker.data._sampling', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + if not already_started: + assert ( + ctx + and + ctx.portal + and + not ctx.cancel_called ) return True diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 29360620..beb9c70b 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for, => TODO: maybe to (re)move elsewhere? ''' -from ._mngr import Services as Services +from ._mngr import ( + get_service_mngr as get_service_mngr, + open_service_mngr as open_service_mngr, + ServiceMngr as ServiceMngr, +) from ._registry import ( _tractor_kwargs as _tractor_kwargs, _default_reg_addr as _default_reg_addr, diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 14751304..43a57f8c 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -21,7 +21,6 @@ from __future__ import annotations import os from typing import ( - Optional, Any, ClassVar, ) @@ -30,13 +29,13 @@ from contextlib import ( ) import tractor -import trio from ._util import ( get_console_log, ) from ._mngr import ( - Services, + open_service_mngr, + ServiceMngr, ) from ._registry import ( # noqa _tractor_kwargs, @@ -59,7 +58,7 @@ async def open_piker_runtime( registry_addrs: list[tuple[str, int]] = [], enable_modules: list[str] = [], - loglevel: Optional[str] = None, + loglevel: str|None = None, # XXX NOTE XXX: you should pretty much never want debug mode # for data daemons when running in production. @@ -69,7 +68,7 @@ async def open_piker_runtime( # and spawn the service tree distributed per that. start_method: str = 'trio', - tractor_runtime_overrides: dict | None = None, + tractor_runtime_overrides: dict|None = None, **tractor_kwargs, ) -> tuple[ @@ -126,7 +125,7 @@ async def open_piker_runtime( # TODO: how to configure this? # keep it on by default if debug mode is set? - maybe_enable_greenback=False, + # maybe_enable_greenback=debug_mode, **tractor_kwargs, ) as actor, @@ -176,12 +175,13 @@ async def open_pikerd( **kwargs, -) -> Services: +) -> ServiceMngr: ''' - Start a root piker daemon with an indefinite lifetime. + Start a root piker daemon actor (aka `pikerd`) with an indefinite + lifetime. - A root actor nursery is created which can be used to create and keep - alive underling services (see below). + A root actor-nursery is created which can be used to spawn and + supervise underling service sub-actors (see below). ''' # NOTE: for the root daemon we always enable the root @@ -208,9 +208,6 @@ async def open_pikerd( root_actor, reg_addrs, ), - tractor.open_nursery() as actor_nursery, - tractor.trionics.collapse_eg(), - trio.open_nursery() as service_tn, ): for addr in reg_addrs: if addr not in root_actor.accept_addrs: @@ -219,25 +216,17 @@ async def open_pikerd( 'Maybe you have another daemon already running?' ) - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_tn - Services.debug_mode = debug_mode - - try: - yield Services - - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_tn.cancel_scope.cancel() + mngr: ServiceMngr + async with open_service_mngr( + debug_mode=debug_mode, + ) as mngr: + yield mngr # TODO: do we even need this? # @acm # async def maybe_open_runtime( -# loglevel: Optional[str] = None, +# loglevel: str|None = None, # **kwargs, # ) -> None: @@ -268,7 +257,7 @@ async def maybe_open_pikerd( ) -> ( tractor._portal.Portal - |ClassVar[Services] + |ClassVar[ServiceMngr] ): ''' If no ``pikerd`` daemon-root-actor can be found start it and diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 4cccf855..0bdd1688 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -49,7 +49,7 @@ from requests.exceptions import ( ReadTimeout, ) -from ._mngr import Services +from ._mngr import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -453,7 +453,7 @@ async def open_ahabd( @acm async def start_ahab_service( - services: Services, + services: ServiceMngr, service_name: str, # endpoint config passed as **kwargs @@ -549,7 +549,8 @@ async def start_ahab_service( log.warning('Failed to cancel root permsed container') except ( - trio.MultiError, + # trio.MultiError, + ExceptionGroup, ) as err: for subexc in err.exceptions: if isinstance(subexc, PermissionError): diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 89d7f28d..1f385520 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -26,15 +26,17 @@ from typing import ( from contextlib import ( asynccontextmanager as acm, ) +from collections import defaultdict import tractor -from trio.lowlevel import current_task +import trio from ._util import ( log, # sub-sys logger ) from ._mngr import ( - Services, + get_service_mngr, + ServiceMngr, ) from ._actor_runtime import maybe_open_pikerd from ._registry import find_service @@ -42,15 +44,14 @@ from ._registry import find_service @acm async def maybe_spawn_daemon( - service_name: str, service_task_target: Callable, - spawn_args: dict[str, Any], loglevel: str | None = None, singleton: bool = False, + _locks = defaultdict(trio.Lock), **pikerd_kwargs, ) -> tractor.Portal: @@ -68,7 +69,7 @@ async def maybe_spawn_daemon( ''' # serialize access to this section to avoid # 2 or more tasks racing to create a daemon - lock = Services.locks[service_name] + lock = _locks[service_name] await lock.acquire() try: @@ -141,7 +142,7 @@ async def maybe_spawn_daemon( if ( lock.locked() and - lock.statistics().owner is current_task() + lock.statistics().owner is trio.lowlevel.current_task() ): log.exception( f'Releasing stale lock after crash..?' @@ -163,21 +164,22 @@ async def spawn_emsd( """ log.info('Spawning emsd') - portal = await Services.actor_n.start_actor( + smngr: ServiceMngr = get_service_mngr() + portal = await smngr.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag + debug_mode=smngr.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await Services.start_service_task( + await smngr.start_service_task( 'emsd', portal, diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 726a34c8..9bc79aeb 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,16 +18,29 @@ daemon-service management API. """ +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + # contextmanager as cm, +) from collections import defaultdict +from dataclasses import ( + dataclass, + field, +) +import functools +import inspect from typing import ( Callable, Any, ) -import trio -from trio_typing import TaskStatus +# import msgspec import tractor +import trio +from trio import TaskStatus from tractor import ( + ActorNursery, current_actor, ContextCancelled, Context, @@ -39,6 +52,130 @@ from ._util import ( ) +# TODO: implement a singleton deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + + +# TODO: singleton factory API instead of a class API +@acm +async def open_service_mngr( + *, + debug_mode: bool = False, + + # impl deat which ensures a single global instance + _singleton: list[ServiceMngr|None] = [None], + **init_kwargs, + +) -> ServiceMngr: + ''' + Open a multi-subactor-as-service-daemon tree supervisor. + + The delivered `ServiceMngr` is a singleton instance for each + actor-process and is allocated on first open and never + de-allocated unless explicitly deleted by al call to + `del_service_mngr()`. + + ''' + # TODO: factor this an allocation into + # a `._mngr.open_service_mngr()` and put in the + # once-n-only-once setup/`.__aenter__()` part! + # -[ ] how to make this only happen on the `mngr == None` case? + # |_ use `.trionics.maybe_open_context()` (for generic + # async-with-style-only-once of the factory impl, though + # what do we do for the allocation case? + # / `.maybe_open_nursery()` (since for this specific case + # it's simpler?) to activate + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as tn, + ): + # impl specific obvi.. + init_kwargs.update({ + 'actor_n': an, + 'service_n': tn, + }) + + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + + # TODO: put into `.__aenter__()` section of + # eventual `@singleton_acm` API wrapper. + # + # assign globally for future daemon/task creation + mngr.actor_n = an + mngr.service_n = tn + + else: + assert ( + mngr.actor_n + and + mngr.service_tn + ) + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + try: + # NOTE: this is a singleton factory impl specific detail + # which should be supported in the condensed + # `@singleton_acm` API? + mngr.debug_mode = debug_mode + + yield mngr + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in mngr.service_tasks: + # await mngr.cancel_service('samplerd') + tn.cancel_scope.cancel() + + + +def get_service_mngr() -> ServiceMngr: + ''' + Try to get the singleton service-mngr for this actor presuming it + has already been allocated using, + + .. code:: python + + async with open_<@singleton_acm(func)>() as mngr` + ... this block kept open ... + + If not yet allocated raise a `ServiceError`. + + ''' + # https://stackoverflow.com/a/12627202 + # https://docs.python.org/3/library/inspect.html#inspect.Signature + maybe_mngr: ServiceMngr|None = inspect.signature( + open_service_mngr + ).parameters['_singleton'].default[0] + + if maybe_mngr is None: + raise RuntimeError( + 'Someone must allocate a `ServiceMngr` using\n\n' + '`async with open_service_mngr()` beforehand!!\n' + ) + + return maybe_mngr + + # TODO: we need remote wrapping and a general soln: # - factor this into a ``tractor.highlevel`` extension # pack for the # library. @@ -46,31 +183,46 @@ from ._util import ( # to the pikerd actor for starting services remotely! # - prolly rename this to ActorServicesNursery since it spawns # new actors and supervises them to completion? -class Services: +@dataclass +class ServiceMngr: +# class ServiceMngr(msgspec.Struct): + ''' + A multi-subactor-as-service manager. - actor_n: tractor._supervise.ActorNursery + Spawn, supervise and monitor service/daemon subactors in a SC + process tree. + + ''' + actor_n: ActorNursery service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag + debug_mode: bool = False # tractor sub-actor debug mode flag + service_tasks: dict[ str, tuple[ trio.CancelScope, + Context, Portal, trio.Event, ] - ] = {} - locks = defaultdict(trio.Lock) + ] = field(default_factory=dict) + + # internal per-service task mutexs + _locks = defaultdict(trio.Lock) - @classmethod async def start_service_task( self, name: str, portal: Portal, + + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? target: Callable, + allow_overruns: bool = False, **ctx_kwargs, - ) -> (trio.CancelScope, Context): + ) -> (trio.CancelScope, Context, Any): ''' Open a context in a service sub-actor, add to a stack that gets unwound at ``pikerd`` teardown. @@ -83,6 +235,7 @@ class Services: task_status: TaskStatus[ tuple[ trio.CancelScope, + Context, trio.Event, Any, ] @@ -90,22 +243,29 @@ class Services: ) -> Any: + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? with trio.CancelScope() as cs: + try: + async with portal.open_context( + target, + allow_overruns=allow_overruns, + **ctx_kwargs, - async with portal.open_context( - target, - allow_overruns=allow_overruns, - **ctx_kwargs, + ) as (ctx, started): - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) # wait on any context's return value # and any final portal result from the # sub-actor. @@ -115,39 +275,55 @@ class Services: # either by error from the target context # function or by being cancelled here by the # surrounding cancel scope. - return (await portal.result(), ctx_res) - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.channel.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service {name} was remotely cancelled?\n' - f'remote canceller: {canceller}\n' - f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' - ) - else: - raise + return ( + await portal.wait_for_result(), + ctx_res, + ) + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise - cs, complete, first = await self.service_n.start(open_context_in_task) + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, sub_ctx, complete, started = await self.service_n.start( + open_context_in_task + ) # store the cancel scope and portal for later cancellation or # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) + self.service_tasks[name] = (cs, sub_ctx, portal, complete) + return cs, sub_ctx, started - return cs, first - - @classmethod async def cancel_service( self, name: str, @@ -158,8 +334,80 @@ class Services: ''' log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() + cs, sub_ctx, portal, complete = self.service_tasks[name] + + # cs.cancel() + await sub_ctx.cancel() await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Serice task for {name} not terminated?' + ) + + # assert name not in self.service_tasks, \ + # f'Serice task for {name} not terminated?' + + async def start_service( + self, + daemon_name: str, + ctx_ep: Callable, # kwargs must `partial`-ed in! + + debug_mode: bool = False, + **tractor_actor_kwargs, + + ) -> Context: + ''' + Start a "service" task in a new sub-actor (daemon) and manage it's lifetime + indefinitely. + + Services can be cancelled/shutdown using `.cancel_service()`. + + ''' + entry: tuple|None = self.service_tasks.get(daemon_name) + if entry: + (cs, sub_ctx, portal, complete) = entry + return sub_ctx + + if daemon_name not in self.service_tasks: + portal = await self.actor_n.start_actor( + daemon_name, + debug_mode=( # maybe set globally during allocate + debug_mode + or + self.debug_mode + ), + **tractor_actor_kwargs, + ) + ctx_kwargs: dict[str, Any] = {} + if isinstance(ctx_ep, functools.partial): + ctx_kwargs: dict[str, Any] = ctx_ep.keywords + ctx_ep: Callable = ctx_ep.func + + (cs, sub_ctx, started) = await self.start_service_task( + daemon_name, + portal, + ctx_ep, + **ctx_kwargs, + ) + + return sub_ctx + + +# TODO: +# -[ ] factor all the common shit from `.data._sampling` +# and `.brokers._daemon` into here / `ServiceMngr` +# in terms of allocating the `Portal` as part of the +# "service-in-subactor" starting! +# -[ ] move to `tractor.hilevel._service`, import and use here! +# NOTE: purposely leaks the ref to the mod-scope Bo +# import tractor +# from tractor.hilevel import ( +# open_service_mngr, +# ServiceMngr, +# ) +# mngr: ServiceMngr|None = None +# with tractor.hilevel.open_service_mngr() as mngr: +# Services = proxy(mngr) diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 902f4fde..b1a13722 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -21,11 +21,13 @@ from typing import ( TYPE_CHECKING, ) +# TODO: oof, needs to be changed to `httpx`! import asks if TYPE_CHECKING: import docker from ._ahab import DockerContainer + from . import ServiceMngr from ._util import log # sub-sys logger from ._util import ( @@ -127,7 +129,7 @@ def start_elasticsearch( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index c9f49420..852b967c 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -53,7 +53,7 @@ import pendulum # import purerpc from ..data.feed import maybe_open_feed -from . import Services +from . import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -233,7 +233,7 @@ def start_marketstore( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/tests/conftest.py b/tests/conftest.py index 22d1af3c..db071054 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ from piker import ( config, ) from piker.service import ( - Services, + get_service_mngr, ) from piker.log import get_console_log @@ -135,7 +135,7 @@ async def _open_test_pikerd( ) as service_manager, ): # this proc/actor is the pikerd - assert service_manager is Services + assert service_manager is get_service_mngr() async with tractor.wait_for_actor( 'pikerd', diff --git a/tests/test_ems.py b/tests/test_ems.py index 07e28c33..e348fc8b 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -26,7 +26,7 @@ import pytest import tractor from uuid import uuid4 -from piker.service import Services +from piker.service import ServiceMngr from piker.log import get_logger from piker.clearing._messages import ( Order, @@ -158,7 +158,7 @@ def load_and_check_pos( def test_ems_err_on_bad_broker( - open_test_pikerd: Services, + open_test_pikerd: ServiceMngr, loglevel: str, ): async def load_bad_fqme(): diff --git a/tests/test_services.py b/tests/test_services.py index 433e97f3..ca093929 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -15,7 +15,7 @@ import tractor from piker.service import ( find_service, - Services, + ServiceMngr, ) from piker.data import ( open_feed, @@ -44,7 +44,7 @@ def test_runtime_boot( async def main(): port = 6666 daemon_addr = ('127.0.0.1', port) - services: Services + services: ServiceMngr async with ( open_test_pikerd( -- 2.34.1 From 92d80cb0a3228d7a95d1b7d98595a31759461dd4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Aug 2024 12:16:17 -0400 Subject: [PATCH 4/7] Drop `.cancel_actor()` from `maybe_spawn_daemon()` Since `tractor`'s new and improved inter-actor cancellation semantics are much more pedantic, AND bc we use the `ServiceMngr` for spawning service actors on-demand, the caller of `maybe_spawn_daemon()` should NEVER conduct a so called "out of band" `Actor`-runtime cancel request since this is precisely the job of our `ServiceMngr` XD Add a super in depth note explaining the underlying issue and adding a todo list of how we should prolly augment `tractor` to make such cases easier to grok and fix in the future! --- piker/service/_daemon.py | 60 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 1f385520..b1d136b7 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -135,7 +135,65 @@ async def maybe_spawn_daemon( async with tractor.wait_for_actor(service_name) as portal: lock.release() yield portal - await portal.cancel_actor() + # --- ---- --- + # XXX NOTE XXX + # --- ---- --- + # DO NOT PUT A `portal.cancel_actor()` here (as was prior)! + # + # Doing so will cause an "out-of-band" ctxc + # (`tractor.ContextCancelled`) to be raised inside the + # `ServiceMngr.open_context_in_task()`'s call to + # `ctx.wait_for_result()` AND the internal self-ctxc + # "graceful capture" WILL NOT CATCH IT! + # + # This can cause certain types of operations to raise + # that ctxc BEFORE THEY `return`, resulting in + # a "false-negative" ctxc being raised when really + # nothing actually failed, other then our semantic + # "failure" to suppress an expected, graceful, + # self-cancel scenario.. + # + # bUt wHy duZ It WorK lIKe dis.. + # ------------------------------ + # from the perspective of the `tractor.Context` this + # cancel request was conducted "out of band" since + # `Context.cancel()` was never called and thus the + # `._cancel_called: bool` was never set. Despite the + # remote `.canceller` being set to `pikerd` (i.e. the + # same `Actor.uid` of the raising service-mngr task) the + # service-task's ctx itself was never marked as having + # requested cancellation and thus still raises the ctxc + # bc it was unaware of any such request. + # + # How to make grokin these cases easier tho? + # ------------------------------------------ + # Because `Portal.cancel_actor()` was called it requests + # "full-`Actor`-runtime-cancellation" of it's peer + # process which IS NOT THE SAME as a single inter-actor + # RPC task cancelling its local context with a remote + # peer `Task` in that same peer process. + # + # ?TODO? It might be better if we do one (or all) of the + # following: + # + # -[ ] at least set a special message for the + # `ContextCancelled` when raised locally by the + # unaware ctx task such that we check for the + # `.canceller` being *our `Actor`* and in the case + # where `Context._cancel_called == False` we specially + # note that this is likely an "out-of-band" + # runtime-cancel request triggered by some call to + # `Portal.cancel_actor()`, possibly even reporting the + # exact LOC of that caller by tracking it inside our + # portal-type? + # -[ ] possibly add another field `ContextCancelled` like + # maybe a, + # `.request_type: Literal['os', 'proc', 'actor', + # 'ctx']` type thing which would allow immediately + # being able to tell what kind of cancellation caused + # the unexpected ctxc? + # -[ ] REMOVE THIS COMMENT, once we've settled on how to + # better augment `tractor` to be more explicit on this! except BaseException as _err: err = _err -- 2.34.1 From 618639df450597e937272c5e761ee7bca861f091 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Aug 2024 13:26:05 -0400 Subject: [PATCH 5/7] More service-mngr clarity notes Nothing changing functionally here just adding more `tractor` operational notes, tips for debug tooling and typing fixes B) Of particular note is adding further details about the reason we do not need to call `Context.cancel()` inside the `finally:` block of `.open_context_in_task()` thanks to `tractor`'s new and improved inter-actor cancellation semantics Bo --- piker/service/_mngr.py | 63 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 9bc79aeb..4b37150e 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -144,6 +144,9 @@ async def open_service_mngr( # TODO: is this more clever/efficient? # if 'samplerd' in mngr.service_tasks: # await mngr.cancel_service('samplerd') + + # await tractor.pause(shield=True) + # ^XXX, if needed mk sure to shield it ;) tn.cancel_scope.cancel() @@ -241,7 +244,11 @@ class ServiceMngr: ] ] = trio.TASK_STATUS_IGNORED, - ) -> Any: + ) -> tuple[ + trio.CancelScope, + Context, + Any, # started value from ctx + ]: # TODO: use the ctx._scope directly here instead? # -[ ] actually what semantics do we expect for this @@ -251,6 +258,10 @@ class ServiceMngr: async with portal.open_context( target, allow_overruns=allow_overruns, + + # hide_tb=False, + # ^XXX^ HAWT TIPZ + **ctx_kwargs, ) as (ctx, started): @@ -269,7 +280,9 @@ class ServiceMngr: # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res: Any = await ctx.wait_for_result() + ctx_res: Any = await ctx.wait_for_result( + # hide_tb=False, + ) # NOTE: blocks indefinitely until cancelled # either by error from the target context @@ -304,25 +317,53 @@ class ServiceMngr: finally: # NOTE: the ctx MUST be cancelled first if we # don't want the above `ctx.wait_for_result()` to - # raise a self-ctxc. WHY, well since from the ctx's + # raise a self-ctxc. + # + # WHY, well since from the ctx's # perspective the cancel request will have # arrived out-out-of-band at the `Actor.cancel()` - # level, thus `Context.cancel_called == False`, + # level (since pikerd will have called + # `Portal.cancel_actor()`), and thus + # `Context.cancel_called == False`, # meaning `ctx._is_self_cancelled() == False`. - # with trio.CancelScope(shield=True): - # await ctx.cancel() + # + # HOWEVER, this should happen implicitly WITHOUT + # a manual `ctx.cancel()` call HERE since, + # + # - in the mngr shutdown case the surrounding + # `.service_n.cancel_scope` should be + # `.cancel_called == True` and the + # `Portal.open_context()` internals should take + # care of it. + # + # - in the specific-service cancellation case, + # `.cancel_service()` makes the manual + # `ctx.cancel()` call for us which SHOULD mean + # the ctxc is never raised above (since, again, + # it will be gracefully suppressed by + # `.open_context()` internals) and thus we only + # need to shut down the service actor. await portal.cancel_actor() - complete.set() self.service_tasks.pop(name) + complete.set() - cs, sub_ctx, complete, started = await self.service_n.start( + ( + cs, # internally allocated + sub_ctx, # RPC peer-actor ctx + complete, # termination syncing + started, # proxyed from internal `.open_context()` entry. + ) = await self.service_n.start( open_context_in_task ) # store the cancel scope and portal for later cancellation or # retstart if needed. self.service_tasks[name] = (cs, sub_ctx, portal, complete) - return cs, sub_ctx, started + return ( + cs, + sub_ctx, + started, + ) async def cancel_service( self, @@ -341,11 +382,11 @@ class ServiceMngr: await complete.wait() if name in self.service_tasks: - # TODO: custom err? - # raise ServiceError( raise RuntimeError( f'Serice task for {name} not terminated?' ) + # raise ServiceError( + # ^TODO? custom err type? # assert name not in self.service_tasks, \ # f'Serice task for {name} not terminated?' -- 2.34.1 From f71ec6504b7a90a65192b57f92c4a1a5b87f6816 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Feb 2025 10:34:34 -0500 Subject: [PATCH 6/7] Official service-mngr to `tractor.hilevel` move Such that we maintain that subsys in the actor-runtime repo (with hopefully an extensive test suite XD). Port deats, - rewrite `open_service_mngr()` as a thin wrapper that delegates into the new `tractor.hilevel.open_service_mngr()` but with maintenance of the `Services` class-singleton for now. - port `.service._daemon` usage to the new `ServiceMngr.start_service_ctx()` a rename from `.start_service_task()` which is now likely destined for the soon supported `tractor.trionics.TaskMngr` nursery extension. - ref the new `ServiceMngr.an: ActorNursery` instance var name. Other, - always enable the `tractor.pause_from_sync()` support via `greenback` whenever `debug_mode` is set at `pikerd` init. --- piker/service/_daemon.py | 18 +- piker/service/_mngr.py | 441 ++------------------------------------- 2 files changed, 31 insertions(+), 428 deletions(-) diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index b1d136b7..f81d8b13 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -105,6 +105,12 @@ async def maybe_spawn_daemon( # service task for that actor. started: bool if pikerd_portal is None: + + # await tractor.pause() + if tractor.is_debug(): + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + started = await service_task_target( loglevel=loglevel, **spawn_args, @@ -223,7 +229,7 @@ async def spawn_emsd( log.info('Spawning emsd') smngr: ServiceMngr = get_service_mngr() - portal = await smngr.actor_n.start_actor( + portal = await smngr.an.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', @@ -237,12 +243,10 @@ async def spawn_emsd( # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await smngr.start_service_task( - 'emsd', - portal, - - # signature of target root-task endpoint - _setup_persistent_emsd, + await smngr.start_service_ctx( + name='emsd', + portal=portal, + ctx_fn=_setup_persistent_emsd, loglevel=loglevel, ) return True diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 4b37150e..9557a828 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,425 +18,16 @@ daemon-service management API. """ -from __future__ import annotations from contextlib import ( asynccontextmanager as acm, - # contextmanager as cm, -) -from collections import defaultdict -from dataclasses import ( - dataclass, - field, -) -import functools -import inspect -from typing import ( - Callable, - Any, ) -# import msgspec import tractor -import trio -from trio import TaskStatus -from tractor import ( - ActorNursery, - current_actor, - ContextCancelled, - Context, - Portal, +from tractor.hilevel import ( + ServiceMngr, + # open_service_mngr as _open_service_mngr, + get_service_mngr as get_service_mngr, ) - -from ._util import ( - log, # sub-sys logger -) - - -# TODO: implement a singleton deco-API for wrapping the below -# factory's impl for general actor-singleton use? -# -# @singleton -# async def open_service_mngr( -# **init_kwargs, -# ) -> ServiceMngr: -# ''' -# Note this function body is invoke IFF no existing singleton instance already -# exists in this proc's memory. - -# ''' -# # setup -# yield ServiceMngr(**init_kwargs) -# # teardown - - - -# TODO: singleton factory API instead of a class API -@acm -async def open_service_mngr( - *, - debug_mode: bool = False, - - # impl deat which ensures a single global instance - _singleton: list[ServiceMngr|None] = [None], - **init_kwargs, - -) -> ServiceMngr: - ''' - Open a multi-subactor-as-service-daemon tree supervisor. - - The delivered `ServiceMngr` is a singleton instance for each - actor-process and is allocated on first open and never - de-allocated unless explicitly deleted by al call to - `del_service_mngr()`. - - ''' - # TODO: factor this an allocation into - # a `._mngr.open_service_mngr()` and put in the - # once-n-only-once setup/`.__aenter__()` part! - # -[ ] how to make this only happen on the `mngr == None` case? - # |_ use `.trionics.maybe_open_context()` (for generic - # async-with-style-only-once of the factory impl, though - # what do we do for the allocation case? - # / `.maybe_open_nursery()` (since for this specific case - # it's simpler?) to activate - async with ( - tractor.open_nursery() as an, - trio.open_nursery() as tn, - ): - # impl specific obvi.. - init_kwargs.update({ - 'actor_n': an, - 'service_n': tn, - }) - - mngr: ServiceMngr|None - if (mngr := _singleton[0]) is None: - - log.info('Allocating a new service mngr!') - mngr = _singleton[0] = ServiceMngr(**init_kwargs) - - # TODO: put into `.__aenter__()` section of - # eventual `@singleton_acm` API wrapper. - # - # assign globally for future daemon/task creation - mngr.actor_n = an - mngr.service_n = tn - - else: - assert ( - mngr.actor_n - and - mngr.service_tn - ) - log.info( - 'Using extant service mngr!\n\n' - f'{mngr!r}\n' # it has a nice `.__repr__()` of services state - ) - - try: - # NOTE: this is a singleton factory impl specific detail - # which should be supported in the condensed - # `@singleton_acm` API? - mngr.debug_mode = debug_mode - - yield mngr - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in mngr.service_tasks: - # await mngr.cancel_service('samplerd') - - # await tractor.pause(shield=True) - # ^XXX, if needed mk sure to shield it ;) - tn.cancel_scope.cancel() - - - -def get_service_mngr() -> ServiceMngr: - ''' - Try to get the singleton service-mngr for this actor presuming it - has already been allocated using, - - .. code:: python - - async with open_<@singleton_acm(func)>() as mngr` - ... this block kept open ... - - If not yet allocated raise a `ServiceError`. - - ''' - # https://stackoverflow.com/a/12627202 - # https://docs.python.org/3/library/inspect.html#inspect.Signature - maybe_mngr: ServiceMngr|None = inspect.signature( - open_service_mngr - ).parameters['_singleton'].default[0] - - if maybe_mngr is None: - raise RuntimeError( - 'Someone must allocate a `ServiceMngr` using\n\n' - '`async with open_service_mngr()` beforehand!!\n' - ) - - return maybe_mngr - - -# TODO: we need remote wrapping and a general soln: -# - factor this into a ``tractor.highlevel`` extension # pack for the -# library. -# - wrap a "remote api" wherein you can get a method proxy -# to the pikerd actor for starting services remotely! -# - prolly rename this to ActorServicesNursery since it spawns -# new actors and supervises them to completion? -@dataclass -class ServiceMngr: -# class ServiceMngr(msgspec.Struct): - ''' - A multi-subactor-as-service manager. - - Spawn, supervise and monitor service/daemon subactors in a SC - process tree. - - ''' - actor_n: ActorNursery - service_n: trio.Nursery - debug_mode: bool = False # tractor sub-actor debug mode flag - - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - Context, - Portal, - trio.Event, - ] - ] = field(default_factory=dict) - - # internal per-service task mutexs - _locks = defaultdict(trio.Lock) - - async def start_service_task( - self, - name: str, - portal: Portal, - - # TODO: typevar for the return type of the target and then - # use it below for `ctx_res`? - target: Callable, - - allow_overruns: bool = False, - **ctx_kwargs, - - ) -> (trio.CancelScope, Context, Any): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - Context, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> tuple[ - trio.CancelScope, - Context, - Any, # started value from ctx - ]: - - # TODO: use the ctx._scope directly here instead? - # -[ ] actually what semantics do we expect for this - # usage!? - with trio.CancelScope() as cs: - try: - async with portal.open_context( - target, - allow_overruns=allow_overruns, - - # hide_tb=False, - # ^XXX^ HAWT TIPZ - - **ctx_kwargs, - - ) as (ctx, started): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started(( - cs, - ctx, - complete, - started, - )) - log.info( - f'`pikerd` service {name} started with value {started}' - ) - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res: Any = await ctx.wait_for_result( - # hide_tb=False, - ) - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return ( - await portal.wait_for_result(), - ctx_res, - ) - - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.chan.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service `{name}` was remotely cancelled by a peer?\n' - - # TODO: this would be a good spot to use - # a respawn feature Bo - f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - - f'cancellee: {portal.chan.uid}\n' - f'canceller: {canceller}\n' - ) - else: - raise - - finally: - # NOTE: the ctx MUST be cancelled first if we - # don't want the above `ctx.wait_for_result()` to - # raise a self-ctxc. - # - # WHY, well since from the ctx's - # perspective the cancel request will have - # arrived out-out-of-band at the `Actor.cancel()` - # level (since pikerd will have called - # `Portal.cancel_actor()`), and thus - # `Context.cancel_called == False`, - # meaning `ctx._is_self_cancelled() == False`. - # - # HOWEVER, this should happen implicitly WITHOUT - # a manual `ctx.cancel()` call HERE since, - # - # - in the mngr shutdown case the surrounding - # `.service_n.cancel_scope` should be - # `.cancel_called == True` and the - # `Portal.open_context()` internals should take - # care of it. - # - # - in the specific-service cancellation case, - # `.cancel_service()` makes the manual - # `ctx.cancel()` call for us which SHOULD mean - # the ctxc is never raised above (since, again, - # it will be gracefully suppressed by - # `.open_context()` internals) and thus we only - # need to shut down the service actor. - await portal.cancel_actor() - self.service_tasks.pop(name) - complete.set() - - ( - cs, # internally allocated - sub_ctx, # RPC peer-actor ctx - complete, # termination syncing - started, # proxyed from internal `.open_context()` entry. - ) = await self.service_n.start( - open_context_in_task - ) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, sub_ctx, portal, complete) - return ( - cs, - sub_ctx, - started, - ) - - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, sub_ctx, portal, complete = self.service_tasks[name] - - # cs.cancel() - await sub_ctx.cancel() - await complete.wait() - - if name in self.service_tasks: - raise RuntimeError( - f'Serice task for {name} not terminated?' - ) - # raise ServiceError( - # ^TODO? custom err type? - - # assert name not in self.service_tasks, \ - # f'Serice task for {name} not terminated?' - - async def start_service( - self, - daemon_name: str, - ctx_ep: Callable, # kwargs must `partial`-ed in! - - debug_mode: bool = False, - **tractor_actor_kwargs, - - ) -> Context: - ''' - Start a "service" task in a new sub-actor (daemon) and manage it's lifetime - indefinitely. - - Services can be cancelled/shutdown using `.cancel_service()`. - - ''' - entry: tuple|None = self.service_tasks.get(daemon_name) - if entry: - (cs, sub_ctx, portal, complete) = entry - return sub_ctx - - if daemon_name not in self.service_tasks: - portal = await self.actor_n.start_actor( - daemon_name, - debug_mode=( # maybe set globally during allocate - debug_mode - or - self.debug_mode - ), - **tractor_actor_kwargs, - ) - ctx_kwargs: dict[str, Any] = {} - if isinstance(ctx_ep, functools.partial): - ctx_kwargs: dict[str, Any] = ctx_ep.keywords - ctx_ep: Callable = ctx_ep.func - - (cs, sub_ctx, started) = await self.start_service_task( - daemon_name, - portal, - ctx_ep, - **ctx_kwargs, - ) - - return sub_ctx - - # TODO: # -[ ] factor all the common shit from `.data._sampling` # and `.brokers._daemon` into here / `ServiceMngr` @@ -444,11 +35,19 @@ class ServiceMngr: # "service-in-subactor" starting! # -[ ] move to `tractor.hilevel._service`, import and use here! # NOTE: purposely leaks the ref to the mod-scope Bo -# import tractor -# from tractor.hilevel import ( -# open_service_mngr, -# ServiceMngr, -# ) -# mngr: ServiceMngr|None = None -# with tractor.hilevel.open_service_mngr() as mngr: -# Services = proxy(mngr) + +Services: ServiceMngr|None = None + +@acm +async def open_service_mngr( + **kwargs, +) -> ServiceMngr: + + global Services + async with tractor.hilevel.open_service_mngr( + **kwargs, + ) as mngr: + # Services = proxy(mngr) + Services = mngr + yield mngr + Services = None -- 2.34.1 From 6a367a3db825fcf17393570dd37e53bc5827842c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Feb 2025 11:01:52 -0500 Subject: [PATCH 7/7] Support `tractor.pause_from_sync()` in `brokerd`s By passing down the `tractor.hilevel.ServiceMngr.debug_mode: bool` (normally proxied in from the `--pdb` CLI flag) to `spawn_brokerd()` and adjusting the `_setup_persistent_brokerd()` endpoint to do the `tractor.devx._debug.maybe_init_greenback()` if needed. Also in the `broker_init()` factory merge all `tractor` related `kwargs` (i.e. `start_actor_kwargs | datad_kwargs | spawn_kws`) into the 2nd element returned as to be passed to `ActorNursery.start_actor()`. Start re-naming some internal vars/fields as `datad` as well. --- piker/brokers/_daemon.py | 59 ++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index a683a501..ec7a85a3 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -61,12 +61,13 @@ async def _setup_persistent_brokerd( ctx: tractor.Context, brokername: str, loglevel: str | None = None, + debug_mode: bool = False, ) -> None: ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. + Allocate a actor-wide service nursery in `brokerd` such that + feeds can be run in the background persistently by the broker + backend as needed. ''' # NOTE: we only need to setup logging once (and only) here @@ -87,6 +88,18 @@ async def _setup_persistent_brokerd( from piker.data import feed assert not feed._bus + if ( + debug_mode + and + tractor.current_actor().is_infected_aio() + ): + # NOTE, whenever running `asyncio` in provider's actor + # runtime be sure we enabled `breakpoint()` support + # for non-`trio.Task` usage. + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + # breakpoint() # XXX, SHOULD WORK from `trio.Task`! + # allocate a nursery to the bus for spawning background # tasks to service client IPC requests, normally # `tractor.Context` connections to explicitly required @@ -149,18 +162,21 @@ def broker_init( above. ''' - from ..brokers import get_brokermod - brokermod = get_brokermod(brokername) + brokermod: ModuleType = get_brokermod(brokername) modpath: str = brokermod.__name__ - - start_actor_kwargs['name'] = f'brokerd.{brokername}' - start_actor_kwargs.update( - getattr( - brokermod, - '_spawn_kwargs', - {}, - ) + spawn_kws: dict = getattr( + brokermod, + '_spawn_kwargs', + {}, ) + # ^^ NOTE, here we pull any runtime parameters specific + # to spawning the sub-actor for the backend. For ex. + # both `ib` and `deribit` rely on, + # `'infect_asyncio': True,` since they both + # use `tractor`'s "infected `asyncio` mode" + # for their libs but you could also do something like + # `'debug_mode: True` which would be like passing + # `--pdb` for just that provider backend. # XXX TODO: make this not so hacky/monkeypatched.. # -> we need a sane way to configure the logging level for all @@ -170,8 +186,7 @@ def broker_init( # lookup actor-enabled modules declared by the backend offering the # `brokerd` endpoint(s). - enabled: list[str] - enabled = start_actor_kwargs['enable_modules'] = [ + enabled: list[str] = [ __name__, # so that eps from THIS mod can be invoked modpath, ] @@ -183,9 +198,13 @@ def broker_init( subpath: str = f'{modpath}.{submodname}' enabled.append(subpath) + datad_kwargs: dict = { + 'name': f'brokerd.{brokername}', + 'enable_modules': enabled, + } return ( brokermod, - start_actor_kwargs, # to `ActorNursery.start_actor()` + start_actor_kwargs | datad_kwargs | spawn_kws, # to `ActorNursery.start_actor()` # XXX see impl above; contains all (actor global) # setup/teardown expected in all `brokerd` actor instances. @@ -218,10 +237,6 @@ async def spawn_brokerd( **tractor_kwargs, ) - brokermod = get_brokermod(brokername) - extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - tractor_kwargs.update(extra_tractor_kwargs) - # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery from piker.service import ( @@ -239,8 +254,12 @@ async def spawn_brokerd( # passed to daemon_fixture_ep(**kwargs) brokername=brokername, loglevel=loglevel, + debug_mode=mngr.debug_mode, ), debug_mode=mngr.debug_mode, + # ^TODO, allow overriding this per-daemon from client side? + # |_ it's already supported in `tractor` so.. + loglevel=loglevel, enable_modules=( _data_mods -- 2.34.1