diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5414bfb9..ec7a85a3 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -23,6 +23,7 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) +from functools import partial from types import ModuleType from typing import ( TYPE_CHECKING, @@ -60,12 +61,13 @@ async def _setup_persistent_brokerd( ctx: tractor.Context, brokername: str, loglevel: str | None = None, + debug_mode: bool = False, ) -> None: ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. + Allocate a actor-wide service nursery in `brokerd` such that + feeds can be run in the background persistently by the broker + backend as needed. ''' # NOTE: we only need to setup logging once (and only) here @@ -86,6 +88,18 @@ async def _setup_persistent_brokerd( from piker.data import feed assert not feed._bus + if ( + debug_mode + and + tractor.current_actor().is_infected_aio() + ): + # NOTE, whenever running `asyncio` in provider's actor + # runtime be sure we enabled `breakpoint()` support + # for non-`trio.Task` usage. + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + # breakpoint() # XXX, SHOULD WORK from `trio.Task`! + # allocate a nursery to the bus for spawning background # tasks to service client IPC requests, normally # `tractor.Context` connections to explicitly required @@ -148,18 +162,21 @@ def broker_init( above. ''' - from ..brokers import get_brokermod - brokermod = get_brokermod(brokername) + brokermod: ModuleType = get_brokermod(brokername) modpath: str = brokermod.__name__ - - start_actor_kwargs['name'] = f'brokerd.{brokername}' - start_actor_kwargs.update( - getattr( - brokermod, - '_spawn_kwargs', - {}, - ) + spawn_kws: dict = getattr( + brokermod, + '_spawn_kwargs', + {}, ) + # ^^ NOTE, here we pull any runtime parameters specific + # to spawning the sub-actor for the backend. For ex. + # both `ib` and `deribit` rely on, + # `'infect_asyncio': True,` since they both + # use `tractor`'s "infected `asyncio` mode" + # for their libs but you could also do something like + # `'debug_mode: True` which would be like passing + # `--pdb` for just that provider backend. # XXX TODO: make this not so hacky/monkeypatched.. # -> we need a sane way to configure the logging level for all @@ -169,8 +186,7 @@ def broker_init( # lookup actor-enabled modules declared by the backend offering the # `brokerd` endpoint(s). - enabled: list[str] - enabled = start_actor_kwargs['enable_modules'] = [ + enabled: list[str] = [ __name__, # so that eps from THIS mod can be invoked modpath, ] @@ -182,9 +198,13 @@ def broker_init( subpath: str = f'{modpath}.{submodname}' enabled.append(subpath) + datad_kwargs: dict = { + 'name': f'brokerd.{brokername}', + 'enable_modules': enabled, + } return ( brokermod, - start_actor_kwargs, # to `ActorNursery.start_actor()` + start_actor_kwargs | datad_kwargs | spawn_kws, # to `ActorNursery.start_actor()` # XXX see impl above; contains all (actor global) # setup/teardown expected in all `brokerd` actor instances. @@ -193,14 +213,17 @@ def broker_init( async def spawn_brokerd( - brokername: str, loglevel: str | None = None, **tractor_kwargs, ) -> bool: + ''' + Spawn a `brokerd.` subactor service daemon + using `pikerd`'s service mngr. + ''' from piker.service._util import log # use service mngr log log.info(f'Spawning {brokername} broker daemon') @@ -214,33 +237,41 @@ async def spawn_brokerd( **tractor_kwargs, ) - brokermod = get_brokermod(brokername) - extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - tractor_kwargs.update(extra_tractor_kwargs) - # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery - from piker.service import Services - + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), - debug_mode=Services.debug_mode, + mngr: ServiceMngr = get_service_mngr() + ctx: tractor.Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( + # signature of target root-task endpoint + daemon_fixture_ep, + + # passed to daemon_fixture_ep(**kwargs) + brokername=brokername, + loglevel=loglevel, + debug_mode=mngr.debug_mode, + ), + debug_mode=mngr.debug_mode, + # ^TODO, allow overriding this per-daemon from client side? + # |_ it's already supported in `tractor` so.. + + loglevel=loglevel, + enable_modules=( + _data_mods + + + tractor_kwargs.pop('enable_modules') + ), **tractor_kwargs ) - - # NOTE: the service mngr expects an already spawned actor + its - # portal ref in order to do non-blocking setup of brokerd - # service nursery. - await Services.start_service_task( - dname, - portal, - - # signature of target root-task endpoint - daemon_fixture_ep, - brokername=brokername, - loglevel=loglevel, + assert ( + not ctx.cancel_called + and ctx.portal # parent side + and dname in ctx.chan.uid # subactor is named as desired ) return True @@ -265,8 +296,7 @@ async def maybe_spawn_brokerd( from piker.service import maybe_spawn_daemon async with maybe_spawn_daemon( - - f'brokerd.{brokername}', + service_name=f'brokerd.{brokername}', service_task_target=spawn_brokerd, spawn_args={ 'brokername': brokername, diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index cc32af91..2feb6ad4 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -25,6 +25,7 @@ from collections import ( defaultdict, ) from contextlib import asynccontextmanager as acm +from functools import partial import time from typing import ( Any, @@ -42,7 +43,7 @@ from tractor.trionics import ( maybe_open_nursery, ) import trio -from trio_typing import TaskStatus +from trio import TaskStatus from .ticktools import ( frame_ticks, @@ -70,6 +71,7 @@ if TYPE_CHECKING: _default_delay_s: float = 1.0 +# TODO: use new `tractor.singleton_acm` API for this! class Sampler: ''' Global sampling engine registry. @@ -79,9 +81,9 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step-sample (real-time) quote feeds, see - ``.service.maybe_open_samplerd()`` and the below - ``register_with_sampler()``. + time-step-sample a (real-time) quote feeds, see + `.service.maybe_open_samplerd()` and the below + `register_with_sampler()`. ''' service_nursery: None | trio.Nursery = None @@ -381,7 +383,10 @@ async def register_with_sampler( assert Sampler.ohlcv_shms # unblock caller - await ctx.started(set(Sampler.ohlcv_shms.keys())) + await ctx.started( + # XXX bc msgpack only allows one array type! + list(Sampler.ohlcv_shms.keys()) + ) if open_index_stream: try: @@ -426,7 +431,6 @@ async def register_with_sampler( async def spawn_samplerd( - loglevel: str | None = None, **extra_tractor_kwargs @@ -436,7 +440,10 @@ async def spawn_samplerd( update and increment count write and stream broadcasting. ''' - from piker.service import Services + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname = 'samplerd' log.info(f'Spawning `{dname}`') @@ -444,26 +451,33 @@ async def spawn_samplerd( # singleton lock creation of ``samplerd`` since we only ever want # one daemon per ``pikerd`` proc tree. # TODO: make this built-into the service api? - async with Services.locks[dname + '_singleton']: + mngr: ServiceMngr = get_service_mngr() + already_started: bool = dname in mngr.service_tasks - if dname not in Services.service_tasks: - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=[ - 'piker.data._sampling', - ], - loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag - **extra_tractor_kwargs - ) - - await Services.start_service_task( - dname, - portal, + async with mngr._locks[dname + '_singleton']: + ctx: Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( register_with_sampler, period_s=1, sub_for_broadcasts=False, + ), + debug_mode=mngr.debug_mode, # set by pikerd flag + + # proxy-through to tractor + enable_modules=[ + 'piker.data._sampling', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + if not already_started: + assert ( + ctx + and + ctx.portal + and + not ctx.cancel_called ) return True diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 29360620..beb9c70b 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for, => TODO: maybe to (re)move elsewhere? ''' -from ._mngr import Services as Services +from ._mngr import ( + get_service_mngr as get_service_mngr, + open_service_mngr as open_service_mngr, + ServiceMngr as ServiceMngr, +) from ._registry import ( _tractor_kwargs as _tractor_kwargs, _default_reg_addr as _default_reg_addr, diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 33f23453..43a57f8c 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -21,7 +21,6 @@ from __future__ import annotations import os from typing import ( - Optional, Any, ClassVar, ) @@ -30,13 +29,13 @@ from contextlib import ( ) import tractor -import trio from ._util import ( get_console_log, ) from ._mngr import ( - Services, + open_service_mngr, + ServiceMngr, ) from ._registry import ( # noqa _tractor_kwargs, @@ -59,7 +58,7 @@ async def open_piker_runtime( registry_addrs: list[tuple[str, int]] = [], enable_modules: list[str] = [], - loglevel: Optional[str] = None, + loglevel: str|None = None, # XXX NOTE XXX: you should pretty much never want debug mode # for data daemons when running in production. @@ -69,7 +68,7 @@ async def open_piker_runtime( # and spawn the service tree distributed per that. start_method: str = 'trio', - tractor_runtime_overrides: dict | None = None, + tractor_runtime_overrides: dict|None = None, **tractor_kwargs, ) -> tuple[ @@ -124,6 +123,10 @@ async def open_piker_runtime( enable_modules=enable_modules, hide_tb=False, + # TODO: how to configure this? + # keep it on by default if debug mode is set? + # maybe_enable_greenback=debug_mode, + **tractor_kwargs, ) as actor, @@ -172,12 +175,13 @@ async def open_pikerd( **kwargs, -) -> Services: +) -> ServiceMngr: ''' - Start a root piker daemon with an indefinite lifetime. + Start a root piker daemon actor (aka `pikerd`) with an indefinite + lifetime. - A root actor nursery is created which can be used to create and keep - alive underling services (see below). + A root actor-nursery is created which can be used to spawn and + supervise underling service sub-actors (see below). ''' # NOTE: for the root daemon we always enable the root @@ -204,9 +208,6 @@ async def open_pikerd( root_actor, reg_addrs, ), - tractor.open_nursery() as actor_nursery, - tractor.trionics.collapse_eg(), - trio.open_nursery() as service_tn, ): for addr in reg_addrs: if addr not in root_actor.accept_addrs: @@ -215,25 +216,17 @@ async def open_pikerd( 'Maybe you have another daemon already running?' ) - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_tn - Services.debug_mode = debug_mode - - try: - yield Services - - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_tn.cancel_scope.cancel() + mngr: ServiceMngr + async with open_service_mngr( + debug_mode=debug_mode, + ) as mngr: + yield mngr # TODO: do we even need this? # @acm # async def maybe_open_runtime( -# loglevel: Optional[str] = None, +# loglevel: str|None = None, # **kwargs, # ) -> None: @@ -264,7 +257,7 @@ async def maybe_open_pikerd( ) -> ( tractor._portal.Portal - |ClassVar[Services] + |ClassVar[ServiceMngr] ): ''' If no ``pikerd`` daemon-root-actor can be found start it and diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 4cccf855..0bdd1688 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -49,7 +49,7 @@ from requests.exceptions import ( ReadTimeout, ) -from ._mngr import Services +from ._mngr import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -453,7 +453,7 @@ async def open_ahabd( @acm async def start_ahab_service( - services: Services, + services: ServiceMngr, service_name: str, # endpoint config passed as **kwargs @@ -549,7 +549,8 @@ async def start_ahab_service( log.warning('Failed to cancel root permsed container') except ( - trio.MultiError, + # trio.MultiError, + ExceptionGroup, ) as err: for subexc in err.exceptions: if isinstance(subexc, PermissionError): diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 89d7f28d..f81d8b13 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -26,15 +26,17 @@ from typing import ( from contextlib import ( asynccontextmanager as acm, ) +from collections import defaultdict import tractor -from trio.lowlevel import current_task +import trio from ._util import ( log, # sub-sys logger ) from ._mngr import ( - Services, + get_service_mngr, + ServiceMngr, ) from ._actor_runtime import maybe_open_pikerd from ._registry import find_service @@ -42,15 +44,14 @@ from ._registry import find_service @acm async def maybe_spawn_daemon( - service_name: str, service_task_target: Callable, - spawn_args: dict[str, Any], loglevel: str | None = None, singleton: bool = False, + _locks = defaultdict(trio.Lock), **pikerd_kwargs, ) -> tractor.Portal: @@ -68,7 +69,7 @@ async def maybe_spawn_daemon( ''' # serialize access to this section to avoid # 2 or more tasks racing to create a daemon - lock = Services.locks[service_name] + lock = _locks[service_name] await lock.acquire() try: @@ -104,6 +105,12 @@ async def maybe_spawn_daemon( # service task for that actor. started: bool if pikerd_portal is None: + + # await tractor.pause() + if tractor.is_debug(): + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + started = await service_task_target( loglevel=loglevel, **spawn_args, @@ -134,14 +141,72 @@ async def maybe_spawn_daemon( async with tractor.wait_for_actor(service_name) as portal: lock.release() yield portal - await portal.cancel_actor() + # --- ---- --- + # XXX NOTE XXX + # --- ---- --- + # DO NOT PUT A `portal.cancel_actor()` here (as was prior)! + # + # Doing so will cause an "out-of-band" ctxc + # (`tractor.ContextCancelled`) to be raised inside the + # `ServiceMngr.open_context_in_task()`'s call to + # `ctx.wait_for_result()` AND the internal self-ctxc + # "graceful capture" WILL NOT CATCH IT! + # + # This can cause certain types of operations to raise + # that ctxc BEFORE THEY `return`, resulting in + # a "false-negative" ctxc being raised when really + # nothing actually failed, other then our semantic + # "failure" to suppress an expected, graceful, + # self-cancel scenario.. + # + # bUt wHy duZ It WorK lIKe dis.. + # ------------------------------ + # from the perspective of the `tractor.Context` this + # cancel request was conducted "out of band" since + # `Context.cancel()` was never called and thus the + # `._cancel_called: bool` was never set. Despite the + # remote `.canceller` being set to `pikerd` (i.e. the + # same `Actor.uid` of the raising service-mngr task) the + # service-task's ctx itself was never marked as having + # requested cancellation and thus still raises the ctxc + # bc it was unaware of any such request. + # + # How to make grokin these cases easier tho? + # ------------------------------------------ + # Because `Portal.cancel_actor()` was called it requests + # "full-`Actor`-runtime-cancellation" of it's peer + # process which IS NOT THE SAME as a single inter-actor + # RPC task cancelling its local context with a remote + # peer `Task` in that same peer process. + # + # ?TODO? It might be better if we do one (or all) of the + # following: + # + # -[ ] at least set a special message for the + # `ContextCancelled` when raised locally by the + # unaware ctx task such that we check for the + # `.canceller` being *our `Actor`* and in the case + # where `Context._cancel_called == False` we specially + # note that this is likely an "out-of-band" + # runtime-cancel request triggered by some call to + # `Portal.cancel_actor()`, possibly even reporting the + # exact LOC of that caller by tracking it inside our + # portal-type? + # -[ ] possibly add another field `ContextCancelled` like + # maybe a, + # `.request_type: Literal['os', 'proc', 'actor', + # 'ctx']` type thing which would allow immediately + # being able to tell what kind of cancellation caused + # the unexpected ctxc? + # -[ ] REMOVE THIS COMMENT, once we've settled on how to + # better augment `tractor` to be more explicit on this! except BaseException as _err: err = _err if ( lock.locked() and - lock.statistics().owner is current_task() + lock.statistics().owner is trio.lowlevel.current_task() ): log.exception( f'Releasing stale lock after crash..?' @@ -163,26 +228,25 @@ async def spawn_emsd( """ log.info('Spawning emsd') - portal = await Services.actor_n.start_actor( + smngr: ServiceMngr = get_service_mngr() + portal = await smngr.an.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag + debug_mode=smngr.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await Services.start_service_task( - 'emsd', - portal, - - # signature of target root-task endpoint - _setup_persistent_emsd, + await smngr.start_service_ctx( + name='emsd', + portal=portal, + ctx_fn=_setup_persistent_emsd, loglevel=loglevel, ) return True diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 726a34c8..9557a828 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,148 +18,36 @@ daemon-service management API. """ -from collections import defaultdict -from typing import ( - Callable, - Any, +from contextlib import ( + asynccontextmanager as acm, ) -import trio -from trio_typing import TaskStatus import tractor -from tractor import ( - current_actor, - ContextCancelled, - Context, - Portal, +from tractor.hilevel import ( + ServiceMngr, + # open_service_mngr as _open_service_mngr, + get_service_mngr as get_service_mngr, ) +# TODO: +# -[ ] factor all the common shit from `.data._sampling` +# and `.brokers._daemon` into here / `ServiceMngr` +# in terms of allocating the `Portal` as part of the +# "service-in-subactor" starting! +# -[ ] move to `tractor.hilevel._service`, import and use here! +# NOTE: purposely leaks the ref to the mod-scope Bo -from ._util import ( - log, # sub-sys logger -) +Services: ServiceMngr|None = None +@acm +async def open_service_mngr( + **kwargs, +) -> ServiceMngr: -# TODO: we need remote wrapping and a general soln: -# - factor this into a ``tractor.highlevel`` extension # pack for the -# library. -# - wrap a "remote api" wherein you can get a method proxy -# to the pikerd actor for starting services remotely! -# - prolly rename this to ActorServicesNursery since it spawns -# new actors and supervises them to completion? -class Services: - - actor_n: tractor._supervise.ActorNursery - service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - Portal, - trio.Event, - ] - ] = {} - locks = defaultdict(trio.Lock) - - @classmethod - async def start_service_task( - self, - name: str, - portal: Portal, - target: Callable, - allow_overruns: bool = False, - **ctx_kwargs, - - ) -> (trio.CancelScope, Context): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> Any: - - with trio.CancelScope() as cs: - - async with portal.open_context( - target, - allow_overruns=allow_overruns, - **ctx_kwargs, - - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res: Any = await ctx.wait_for_result() - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return (await portal.result(), ctx_res) - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.channel.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service {name} was remotely cancelled?\n' - f'remote canceller: {canceller}\n' - f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' - ) - else: - raise - - - - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) - - cs, complete, first = await self.service_n.start(open_context_in_task) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) - - return cs, first - - @classmethod - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() - await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' + global Services + async with tractor.hilevel.open_service_mngr( + **kwargs, + ) as mngr: + # Services = proxy(mngr) + Services = mngr + yield mngr + Services = None diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 902f4fde..b1a13722 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -21,11 +21,13 @@ from typing import ( TYPE_CHECKING, ) +# TODO: oof, needs to be changed to `httpx`! import asks if TYPE_CHECKING: import docker from ._ahab import DockerContainer + from . import ServiceMngr from ._util import log # sub-sys logger from ._util import ( @@ -127,7 +129,7 @@ def start_elasticsearch( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index c9f49420..852b967c 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -53,7 +53,7 @@ import pendulum # import purerpc from ..data.feed import maybe_open_feed -from . import Services +from . import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -233,7 +233,7 @@ def start_marketstore( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/piker/types.py b/piker/types.py index cda3fb44..385f83b0 100644 --- a/piker/types.py +++ b/piker/types.py @@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib types. ''' -from __future__ import annotations -from collections import UserList -from pprint import ( - saferepr, -) -from typing import Any - -from msgspec import ( - msgpack, - Struct as _Struct, - structs, -) - - -class DiffDump(UserList): - ''' - Very simple list delegator that repr() dumps (presumed) tuple - elements of the form `tuple[str, Any, Any]` in a nice - multi-line readable form for analyzing `Struct` diffs. - - ''' - def __repr__(self) -> str: - if not len(self): - return super().__repr__() - - # format by displaying item pair's ``repr()`` on multiple, - # indented lines such that they are more easily visually - # comparable when printed to console when printed to - # console. - repstr: str = '[\n' - for k, left, right in self: - repstr += ( - f'({k},\n' - f'\t{repr(left)},\n' - f'\t{repr(right)},\n' - ')\n' - ) - repstr += ']\n' - return repstr - - -class Struct( - _Struct, - - # https://jcristharif.com/msgspec/structs.html#tagged-unions - # tag='pikerstruct', - # tag=True, -): - ''' - A "human friendlier" (aka repl buddy) struct subtype. - - ''' - def _sin_props(self) -> Iterator[ - tuple[ - structs.FieldIinfo, - str, - Any, - ] - ]: - ''' - Iterate over all non-@property fields of this struct. - - ''' - fi: structs.FieldInfo - for fi in structs.fields(self): - key: str = fi.name - val: Any = getattr(self, key) - yield fi, key, val - - def to_dict( - self, - include_non_members: bool = True, - - ) -> dict: - ''' - Like it sounds.. direct delegation to: - https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict - - BUT, by default we pop all non-member (aka not defined as - struct fields) fields by default. - - ''' - asdict: dict = structs.asdict(self) - if include_non_members: - return asdict - - # only return a dict of the struct members - # which were provided as input, NOT anything - # added as type-defined `@property` methods! - sin_props: dict = {} - fi: structs.FieldInfo - for fi, k, v in self._sin_props(): - sin_props[k] = asdict[k] - - return sin_props - - def pformat( - self, - field_indent: int = 2, - indent: int = 0, - - ) -> str: - ''' - Recursion-safe `pprint.pformat()` style formatting of - a `msgspec.Struct` for sane reading by a human using a REPL. - - ''' - # global whitespace indent - ws: str = ' '*indent - - # field whitespace indent - field_ws: str = ' '*(field_indent + indent) - - # qtn: str = ws + self.__class__.__qualname__ - qtn: str = self.__class__.__qualname__ - - obj_str: str = '' # accumulator - fi: structs.FieldInfo - k: str - v: Any - for fi, k, v in self._sin_props(): - - # TODO: how can we prefer `Literal['option1', 'option2, - # ..]` over .__name__ == `Literal` but still get only the - # latter for simple types like `str | int | None` etc..? - ft: type = fi.type - typ_name: str = getattr(ft, '__name__', str(ft)) - - # recurse to get sub-struct's `.pformat()` output Bo - if isinstance(v, Struct): - val_str: str = v.pformat( - indent=field_indent + indent, - field_indent=indent + field_indent, - ) - - else: # the `pprint` recursion-safe format: - # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr - val_str: str = saferepr(v) - - obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') - - return ( - f'{qtn}(\n' - f'{obj_str}' - f'{ws})' - ) - - # TODO: use a pprint.PrettyPrinter instance around ONLY rendering - # inside a known tty? - # def __repr__(self) -> str: - # ... - - # __str__ = __repr__ = pformat - __repr__ = pformat - - def copy( - self, - update: dict | None = None, - - ) -> Struct: - ''' - Validate-typecast all self defined fields, return a copy of - us with all such fields. - - NOTE: This is kinda like the default behaviour in - `pydantic.BaseModel` except a copy of the object is - returned making it compat with `frozen=True`. - - ''' - if update: - for k, v in update.items(): - setattr(self, k, v) - - # NOTE: roundtrip serialize to validate - # - enode to msgpack binary format, - # - decode that back to a struct. - return msgpack.Decoder(type=type(self)).decode( - msgpack.Encoder().encode(self) - ) - - def typecast( - self, - - # TODO: allow only casting a named subset? - # fields: set[str] | None = None, - - ) -> None: - ''' - Cast all fields using their declared type annotations - (kinda like what `pydantic` does by default). - - NOTE: this of course won't work on frozen types, use - ``.copy()`` above in such cases. - - ''' - # https://jcristharif.com/msgspec/api.html#msgspec.structs.fields - fi: structs.FieldInfo - for fi in structs.fields(self): - setattr( - self, - fi.name, - fi.type(getattr(self, fi.name)), - ) - - def __sub__( - self, - other: Struct, - - ) -> DiffDump[tuple[str, Any, Any]]: - ''' - Compare fields/items key-wise and return a ``DiffDump`` - for easy visual REPL comparison B) - - ''' - diffs: DiffDump[tuple[str, Any, Any]] = DiffDump() - for fi in structs.fields(self): - attr_name: str = fi.name - ours: Any = getattr(self, attr_name) - theirs: Any = getattr(other, attr_name) - if ours != theirs: - diffs.append(( - attr_name, - ours, - theirs, - )) - - return diffs +from tractor.msg import Struct as Struct diff --git a/tests/conftest.py b/tests/conftest.py index 22d1af3c..db071054 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ from piker import ( config, ) from piker.service import ( - Services, + get_service_mngr, ) from piker.log import get_console_log @@ -135,7 +135,7 @@ async def _open_test_pikerd( ) as service_manager, ): # this proc/actor is the pikerd - assert service_manager is Services + assert service_manager is get_service_mngr() async with tractor.wait_for_actor( 'pikerd', diff --git a/tests/test_ems.py b/tests/test_ems.py index 07e28c33..e348fc8b 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -26,7 +26,7 @@ import pytest import tractor from uuid import uuid4 -from piker.service import Services +from piker.service import ServiceMngr from piker.log import get_logger from piker.clearing._messages import ( Order, @@ -158,7 +158,7 @@ def load_and_check_pos( def test_ems_err_on_bad_broker( - open_test_pikerd: Services, + open_test_pikerd: ServiceMngr, loglevel: str, ): async def load_bad_fqme(): diff --git a/tests/test_services.py b/tests/test_services.py index 433e97f3..ca093929 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -15,7 +15,7 @@ import tractor from piker.service import ( find_service, - Services, + ServiceMngr, ) from piker.data import ( open_feed, @@ -44,7 +44,7 @@ def test_runtime_boot( async def main(): port = 6666 daemon_addr = ('127.0.0.1', port) - services: Services + services: ServiceMngr async with ( open_test_pikerd(