Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.
Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.
Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
			
			
			
		
							parent
							
								
									e987d7d7c4
								
							
						
					
					
						commit
						020919f2ea
					
				|  | @ -23,6 +23,7 @@ from __future__ import annotations | ||||||
| from contextlib import ( | from contextlib import ( | ||||||
|     asynccontextmanager as acm, |     asynccontextmanager as acm, | ||||||
| ) | ) | ||||||
|  | from functools import partial | ||||||
| from types import ModuleType | from types import ModuleType | ||||||
| from typing import ( | from typing import ( | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
|  | @ -190,14 +191,17 @@ def broker_init( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def spawn_brokerd( | async def spawn_brokerd( | ||||||
| 
 |  | ||||||
|     brokername: str, |     brokername: str, | ||||||
|     loglevel: str | None = None, |     loglevel: str | None = None, | ||||||
| 
 | 
 | ||||||
|     **tractor_kwargs, |     **tractor_kwargs, | ||||||
| 
 | 
 | ||||||
| ) -> bool: | ) -> bool: | ||||||
|  |     ''' | ||||||
|  |     Spawn a `brokerd.<backendname>` subactor service daemon | ||||||
|  |     using `pikerd`'s service mngr. | ||||||
| 
 | 
 | ||||||
|  |     ''' | ||||||
|     from piker.service._util import log  # use service mngr log |     from piker.service._util import log  # use service mngr log | ||||||
|     log.info(f'Spawning {brokername} broker daemon') |     log.info(f'Spawning {brokername} broker daemon') | ||||||
| 
 | 
 | ||||||
|  | @ -217,27 +221,35 @@ async def spawn_brokerd( | ||||||
| 
 | 
 | ||||||
|     # ask `pikerd` to spawn a new sub-actor and manage it under its |     # ask `pikerd` to spawn a new sub-actor and manage it under its | ||||||
|     # actor nursery |     # actor nursery | ||||||
|     from piker.service import Services |     from piker.service import ( | ||||||
| 
 |         get_service_mngr, | ||||||
|  |         ServiceMngr, | ||||||
|  |     ) | ||||||
|     dname: str = tractor_kwargs.pop('name')  # f'brokerd.{brokername}' |     dname: str = tractor_kwargs.pop('name')  # f'brokerd.{brokername}' | ||||||
|     portal = await Services.actor_n.start_actor( |     mngr: ServiceMngr = get_service_mngr() | ||||||
|         dname, |     ctx: tractor.Context = await mngr.start_service( | ||||||
|         enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), |         daemon_name=dname, | ||||||
|         debug_mode=Services.debug_mode, |         ctx_ep=partial( | ||||||
|  |             # signature of target root-task endpoint | ||||||
|  |             daemon_fixture_ep, | ||||||
|  | 
 | ||||||
|  |             # passed to daemon_fixture_ep(**kwargs) | ||||||
|  |             brokername=brokername, | ||||||
|  |             loglevel=loglevel, | ||||||
|  |         ), | ||||||
|  |         debug_mode=mngr.debug_mode, | ||||||
|  |         loglevel=loglevel, | ||||||
|  |         enable_modules=( | ||||||
|  |             _data_mods | ||||||
|  |             + | ||||||
|  |             tractor_kwargs.pop('enable_modules') | ||||||
|  |         ), | ||||||
|         **tractor_kwargs |         **tractor_kwargs | ||||||
|     ) |     ) | ||||||
| 
 |     assert ( | ||||||
|     # NOTE: the service mngr expects an already spawned actor + its |         not ctx.cancel_called | ||||||
|     # portal ref in order to do non-blocking setup of brokerd |         and ctx.portal  # parent side | ||||||
|     # service nursery. |         and dname in ctx.chan.uid  # subactor is named as desired | ||||||
|     await Services.start_service_task( |  | ||||||
|         dname, |  | ||||||
|         portal, |  | ||||||
| 
 |  | ||||||
|         # signature of target root-task endpoint |  | ||||||
|         daemon_fixture_ep, |  | ||||||
|         brokername=brokername, |  | ||||||
|         loglevel=loglevel, |  | ||||||
|     ) |     ) | ||||||
|     return True |     return True | ||||||
| 
 | 
 | ||||||
|  | @ -262,8 +274,7 @@ async def maybe_spawn_brokerd( | ||||||
|     from piker.service import maybe_spawn_daemon |     from piker.service import maybe_spawn_daemon | ||||||
| 
 | 
 | ||||||
|     async with maybe_spawn_daemon( |     async with maybe_spawn_daemon( | ||||||
| 
 |         service_name=f'brokerd.{brokername}', | ||||||
|         f'brokerd.{brokername}', |  | ||||||
|         service_task_target=spawn_brokerd, |         service_task_target=spawn_brokerd, | ||||||
|         spawn_args={ |         spawn_args={ | ||||||
|             'brokername': brokername, |             'brokername': brokername, | ||||||
|  |  | ||||||
|  | @ -25,6 +25,7 @@ from collections import ( | ||||||
|     defaultdict, |     defaultdict, | ||||||
| ) | ) | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
|  | from functools import partial | ||||||
| import time | import time | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|  | @ -42,7 +43,7 @@ from tractor.trionics import ( | ||||||
|     maybe_open_nursery, |     maybe_open_nursery, | ||||||
| ) | ) | ||||||
| import trio | import trio | ||||||
| from trio_typing import TaskStatus | from trio import TaskStatus | ||||||
| 
 | 
 | ||||||
| from .ticktools import ( | from .ticktools import ( | ||||||
|     frame_ticks, |     frame_ticks, | ||||||
|  | @ -70,6 +71,7 @@ if TYPE_CHECKING: | ||||||
| _default_delay_s: float = 1.0 | _default_delay_s: float = 1.0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: use new `tractor.singleton_acm` API for this! | ||||||
| class Sampler: | class Sampler: | ||||||
|     ''' |     ''' | ||||||
|     Global sampling engine registry. |     Global sampling engine registry. | ||||||
|  | @ -79,9 +81,9 @@ class Sampler: | ||||||
| 
 | 
 | ||||||
|     This non-instantiated type is meant to be a singleton within |     This non-instantiated type is meant to be a singleton within | ||||||
|     a `samplerd` actor-service spawned once by the user wishing to |     a `samplerd` actor-service spawned once by the user wishing to | ||||||
|     time-step-sample (real-time) quote feeds, see |     time-step-sample a (real-time) quote feeds, see | ||||||
|     ``.service.maybe_open_samplerd()`` and the below |     `.service.maybe_open_samplerd()` and the below | ||||||
|     ``register_with_sampler()``. |     `register_with_sampler()`. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     service_nursery: None | trio.Nursery = None |     service_nursery: None | trio.Nursery = None | ||||||
|  | @ -375,7 +377,10 @@ async def register_with_sampler( | ||||||
|                 assert Sampler.ohlcv_shms |                 assert Sampler.ohlcv_shms | ||||||
| 
 | 
 | ||||||
|             # unblock caller |             # unblock caller | ||||||
|             await ctx.started(set(Sampler.ohlcv_shms.keys())) |             await ctx.started( | ||||||
|  |                 # XXX bc msgpack only allows one array type! | ||||||
|  |                 list(Sampler.ohlcv_shms.keys()) | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|             if open_index_stream: |             if open_index_stream: | ||||||
|                 try: |                 try: | ||||||
|  | @ -419,7 +424,6 @@ async def register_with_sampler( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def spawn_samplerd( | async def spawn_samplerd( | ||||||
| 
 |  | ||||||
|     loglevel: str | None = None, |     loglevel: str | None = None, | ||||||
|     **extra_tractor_kwargs |     **extra_tractor_kwargs | ||||||
| 
 | 
 | ||||||
|  | @ -429,7 +433,10 @@ async def spawn_samplerd( | ||||||
|     update and increment count write and stream broadcasting. |     update and increment count write and stream broadcasting. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     from piker.service import Services |     from piker.service import ( | ||||||
|  |         get_service_mngr, | ||||||
|  |         ServiceMngr, | ||||||
|  |     ) | ||||||
| 
 | 
 | ||||||
|     dname = 'samplerd' |     dname = 'samplerd' | ||||||
|     log.info(f'Spawning `{dname}`') |     log.info(f'Spawning `{dname}`') | ||||||
|  | @ -437,26 +444,33 @@ async def spawn_samplerd( | ||||||
|     # singleton lock creation of ``samplerd`` since we only ever want |     # singleton lock creation of ``samplerd`` since we only ever want | ||||||
|     # one daemon per ``pikerd`` proc tree. |     # one daemon per ``pikerd`` proc tree. | ||||||
|     # TODO: make this built-into the service api? |     # TODO: make this built-into the service api? | ||||||
|     async with Services.locks[dname + '_singleton']: |     mngr: ServiceMngr = get_service_mngr() | ||||||
|  |     already_started: bool = dname in mngr.service_tasks | ||||||
| 
 | 
 | ||||||
|         if dname not in Services.service_tasks: |     async with mngr._locks[dname + '_singleton']: | ||||||
| 
 |         ctx: Context = await mngr.start_service( | ||||||
|             portal = await Services.actor_n.start_actor( |             daemon_name=dname, | ||||||
|                 dname, |             ctx_ep=partial( | ||||||
|                 enable_modules=[ |  | ||||||
|                     'piker.data._sampling', |  | ||||||
|                 ], |  | ||||||
|                 loglevel=loglevel, |  | ||||||
|                 debug_mode=Services.debug_mode,  # set by pikerd flag |  | ||||||
|                 **extra_tractor_kwargs |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             await Services.start_service_task( |  | ||||||
|                 dname, |  | ||||||
|                 portal, |  | ||||||
|                 register_with_sampler, |                 register_with_sampler, | ||||||
|                 period_s=1, |                 period_s=1, | ||||||
|                 sub_for_broadcasts=False, |                 sub_for_broadcasts=False, | ||||||
|  |             ), | ||||||
|  |             debug_mode=mngr.debug_mode,  # set by pikerd flag | ||||||
|  | 
 | ||||||
|  |             # proxy-through to tractor | ||||||
|  |             enable_modules=[ | ||||||
|  |                 'piker.data._sampling', | ||||||
|  |             ], | ||||||
|  |             loglevel=loglevel, | ||||||
|  |             **extra_tractor_kwargs | ||||||
|  |         ) | ||||||
|  |         if not already_started: | ||||||
|  |             assert ( | ||||||
|  |                 ctx | ||||||
|  |                 and | ||||||
|  |                 ctx.portal | ||||||
|  |                 and | ||||||
|  |                 not ctx.cancel_called | ||||||
|             ) |             ) | ||||||
|             return True |             return True | ||||||
| 
 | 
 | ||||||
|  | @ -889,6 +903,7 @@ async def uniform_rate_send( | ||||||
|             # to consumers which crash or lose network connection. |             # to consumers which crash or lose network connection. | ||||||
|             # I.e. we **DO NOT** want to crash and propagate up to |             # I.e. we **DO NOT** want to crash and propagate up to | ||||||
|             # ``pikerd`` these kinds of errors! |             # ``pikerd`` these kinds of errors! | ||||||
|  |             trio.EndOfChannel, | ||||||
|             trio.ClosedResourceError, |             trio.ClosedResourceError, | ||||||
|             trio.BrokenResourceError, |             trio.BrokenResourceError, | ||||||
|             ConnectionResetError, |             ConnectionResetError, | ||||||
|  |  | ||||||
|  | @ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for, | ||||||
|   => TODO: maybe to (re)move elsewhere? |   => TODO: maybe to (re)move elsewhere? | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from ._mngr import Services as Services | from ._mngr import ( | ||||||
|  |     get_service_mngr as get_service_mngr, | ||||||
|  |     open_service_mngr as open_service_mngr, | ||||||
|  |     ServiceMngr as ServiceMngr, | ||||||
|  | ) | ||||||
| from ._registry import ( | from ._registry import ( | ||||||
|     _tractor_kwargs as _tractor_kwargs, |     _tractor_kwargs as _tractor_kwargs, | ||||||
|     _default_reg_addr as _default_reg_addr, |     _default_reg_addr as _default_reg_addr, | ||||||
|  |  | ||||||
|  | @ -21,7 +21,6 @@ | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| import os | import os | ||||||
| from typing import ( | from typing import ( | ||||||
|     Optional, |  | ||||||
|     Any, |     Any, | ||||||
|     ClassVar, |     ClassVar, | ||||||
| ) | ) | ||||||
|  | @ -30,13 +29,13 @@ from contextlib import ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| import tractor | import tractor | ||||||
| import trio |  | ||||||
| 
 | 
 | ||||||
| from ._util import ( | from ._util import ( | ||||||
|     get_console_log, |     get_console_log, | ||||||
| ) | ) | ||||||
| from ._mngr import ( | from ._mngr import ( | ||||||
|     Services, |     open_service_mngr, | ||||||
|  |     ServiceMngr, | ||||||
| ) | ) | ||||||
| from ._registry import (  # noqa | from ._registry import (  # noqa | ||||||
|     _tractor_kwargs, |     _tractor_kwargs, | ||||||
|  | @ -59,7 +58,7 @@ async def open_piker_runtime( | ||||||
|     registry_addrs: list[tuple[str, int]] = [], |     registry_addrs: list[tuple[str, int]] = [], | ||||||
| 
 | 
 | ||||||
|     enable_modules: list[str] = [], |     enable_modules: list[str] = [], | ||||||
|     loglevel: Optional[str] = None, |     loglevel: str|None = None, | ||||||
| 
 | 
 | ||||||
|     # XXX NOTE XXX: you should pretty much never want debug mode |     # XXX NOTE XXX: you should pretty much never want debug mode | ||||||
|     # for data daemons when running in production. |     # for data daemons when running in production. | ||||||
|  | @ -69,7 +68,7 @@ async def open_piker_runtime( | ||||||
|     # and spawn the service tree distributed per that. |     # and spawn the service tree distributed per that. | ||||||
|     start_method: str = 'trio', |     start_method: str = 'trio', | ||||||
| 
 | 
 | ||||||
|     tractor_runtime_overrides: dict | None = None, |     tractor_runtime_overrides: dict|None = None, | ||||||
|     **tractor_kwargs, |     **tractor_kwargs, | ||||||
| 
 | 
 | ||||||
| ) -> tuple[ | ) -> tuple[ | ||||||
|  | @ -119,6 +118,10 @@ async def open_piker_runtime( | ||||||
|                 # spawn other specialized daemons I think? |                 # spawn other specialized daemons I think? | ||||||
|                 enable_modules=enable_modules, |                 enable_modules=enable_modules, | ||||||
| 
 | 
 | ||||||
|  |                 # TODO: how to configure this? | ||||||
|  |                 # keep it on by default if debug mode is set? | ||||||
|  |                 # maybe_enable_greenback=debug_mode, | ||||||
|  | 
 | ||||||
|                 **tractor_kwargs, |                 **tractor_kwargs, | ||||||
|             ) as actor, |             ) as actor, | ||||||
| 
 | 
 | ||||||
|  | @ -167,12 +170,13 @@ async def open_pikerd( | ||||||
| 
 | 
 | ||||||
|     **kwargs, |     **kwargs, | ||||||
| 
 | 
 | ||||||
| ) -> Services: | ) -> ServiceMngr: | ||||||
|     ''' |     ''' | ||||||
|     Start a root piker daemon with an indefinite lifetime. |     Start a root piker daemon actor (aka `pikerd`) with an indefinite | ||||||
|  |     lifetime. | ||||||
| 
 | 
 | ||||||
|     A root actor nursery is created which can be used to create and keep |     A root actor-nursery is created which can be used to spawn and | ||||||
|     alive underling services (see below). |     supervise underling service sub-actors (see below). | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     # NOTE: for the root daemon we always enable the root |     # NOTE: for the root daemon we always enable the root | ||||||
|  | @ -199,8 +203,6 @@ async def open_pikerd( | ||||||
|             root_actor, |             root_actor, | ||||||
|             reg_addrs, |             reg_addrs, | ||||||
|         ), |         ), | ||||||
|         tractor.open_nursery() as actor_nursery, |  | ||||||
|         trio.open_nursery() as service_nursery, |  | ||||||
|     ): |     ): | ||||||
|         for addr in reg_addrs: |         for addr in reg_addrs: | ||||||
|             if addr not in root_actor.accept_addrs: |             if addr not in root_actor.accept_addrs: | ||||||
|  | @ -209,25 +211,17 @@ async def open_pikerd( | ||||||
|                     'Maybe you have another daemon already running?' |                     'Maybe you have another daemon already running?' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|         # assign globally for future daemon/task creation |         mngr: ServiceMngr | ||||||
|         Services.actor_n = actor_nursery |         async with open_service_mngr( | ||||||
|         Services.service_n = service_nursery |             debug_mode=debug_mode, | ||||||
|         Services.debug_mode = debug_mode |         ) as mngr: | ||||||
| 
 |             yield mngr | ||||||
|         try: |  | ||||||
|             yield Services |  | ||||||
| 
 |  | ||||||
|         finally: |  | ||||||
|             # TODO: is this more clever/efficient? |  | ||||||
|             # if 'samplerd' in Services.service_tasks: |  | ||||||
|             #     await Services.cancel_service('samplerd') |  | ||||||
|             service_nursery.cancel_scope.cancel() |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: do we even need this? | # TODO: do we even need this? | ||||||
| # @acm | # @acm | ||||||
| # async def maybe_open_runtime( | # async def maybe_open_runtime( | ||||||
| #     loglevel: Optional[str] = None, | #     loglevel: str|None = None, | ||||||
| #     **kwargs, | #     **kwargs, | ||||||
| 
 | 
 | ||||||
| # ) -> None: | # ) -> None: | ||||||
|  | @ -256,7 +250,7 @@ async def maybe_open_pikerd( | ||||||
|     loglevel: str | None = None, |     loglevel: str | None = None, | ||||||
|     **kwargs, |     **kwargs, | ||||||
| 
 | 
 | ||||||
| ) -> tractor._portal.Portal | ClassVar[Services]: | ) -> tractor._portal.Portal | ClassVar[ServiceMngr]: | ||||||
|     ''' |     ''' | ||||||
|     If no ``pikerd`` daemon-root-actor can be found start it and |     If no ``pikerd`` daemon-root-actor can be found start it and | ||||||
|     yield up (we should probably figure out returning a portal to self |     yield up (we should probably figure out returning a portal to self | ||||||
|  |  | ||||||
|  | @ -49,7 +49,7 @@ from requests.exceptions import ( | ||||||
|     ReadTimeout, |     ReadTimeout, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| from ._mngr import Services | from ._mngr import ServiceMngr | ||||||
| from ._util import ( | from ._util import ( | ||||||
|     log,  # sub-sys logger |     log,  # sub-sys logger | ||||||
|     get_console_log, |     get_console_log, | ||||||
|  | @ -453,7 +453,7 @@ async def open_ahabd( | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def start_ahab_service( | async def start_ahab_service( | ||||||
|     services: Services, |     services: ServiceMngr, | ||||||
|     service_name: str, |     service_name: str, | ||||||
| 
 | 
 | ||||||
|     # endpoint config passed as **kwargs |     # endpoint config passed as **kwargs | ||||||
|  | @ -549,7 +549,8 @@ async def start_ahab_service( | ||||||
|         log.warning('Failed to cancel root permsed container') |         log.warning('Failed to cancel root permsed container') | ||||||
| 
 | 
 | ||||||
|     except ( |     except ( | ||||||
|         trio.MultiError, |         # trio.MultiError, | ||||||
|  |         ExceptionGroup, | ||||||
|     ) as err: |     ) as err: | ||||||
|         for subexc in err.exceptions: |         for subexc in err.exceptions: | ||||||
|             if isinstance(subexc, PermissionError): |             if isinstance(subexc, PermissionError): | ||||||
|  |  | ||||||
|  | @ -26,14 +26,17 @@ from typing import ( | ||||||
| from contextlib import ( | from contextlib import ( | ||||||
|     asynccontextmanager as acm, |     asynccontextmanager as acm, | ||||||
| ) | ) | ||||||
|  | from collections import defaultdict | ||||||
| 
 | 
 | ||||||
| import tractor | import tractor | ||||||
|  | import trio | ||||||
| 
 | 
 | ||||||
| from ._util import ( | from ._util import ( | ||||||
|     log,  # sub-sys logger |     log,  # sub-sys logger | ||||||
| ) | ) | ||||||
| from ._mngr import ( | from ._mngr import ( | ||||||
|     Services, |     get_service_mngr, | ||||||
|  |     ServiceMngr, | ||||||
| ) | ) | ||||||
| from ._actor_runtime import maybe_open_pikerd | from ._actor_runtime import maybe_open_pikerd | ||||||
| from ._registry import find_service | from ._registry import find_service | ||||||
|  | @ -41,15 +44,14 @@ from ._registry import find_service | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def maybe_spawn_daemon( | async def maybe_spawn_daemon( | ||||||
| 
 |  | ||||||
|     service_name: str, |     service_name: str, | ||||||
|     service_task_target: Callable, |     service_task_target: Callable, | ||||||
| 
 |  | ||||||
|     spawn_args: dict[str, Any], |     spawn_args: dict[str, Any], | ||||||
| 
 | 
 | ||||||
|     loglevel: str | None = None, |     loglevel: str | None = None, | ||||||
|     singleton: bool = False, |     singleton: bool = False, | ||||||
| 
 | 
 | ||||||
|  |     _locks = defaultdict(trio.Lock), | ||||||
|     **pikerd_kwargs, |     **pikerd_kwargs, | ||||||
| 
 | 
 | ||||||
| ) -> tractor.Portal: | ) -> tractor.Portal: | ||||||
|  | @ -67,7 +69,7 @@ async def maybe_spawn_daemon( | ||||||
|     ''' |     ''' | ||||||
|     # serialize access to this section to avoid |     # serialize access to this section to avoid | ||||||
|     # 2 or more tasks racing to create a daemon |     # 2 or more tasks racing to create a daemon | ||||||
|     lock = Services.locks[service_name] |     lock = _locks[service_name] | ||||||
|     await lock.acquire() |     await lock.acquire() | ||||||
| 
 | 
 | ||||||
|     async with find_service( |     async with find_service( | ||||||
|  | @ -147,21 +149,22 @@ async def spawn_emsd( | ||||||
|     """ |     """ | ||||||
|     log.info('Spawning emsd') |     log.info('Spawning emsd') | ||||||
| 
 | 
 | ||||||
|     portal = await Services.actor_n.start_actor( |     smngr: ServiceMngr = get_service_mngr() | ||||||
|  |     portal = await smngr.actor_n.start_actor( | ||||||
|         'emsd', |         'emsd', | ||||||
|         enable_modules=[ |         enable_modules=[ | ||||||
|             'piker.clearing._ems', |             'piker.clearing._ems', | ||||||
|             'piker.clearing._client', |             'piker.clearing._client', | ||||||
|         ], |         ], | ||||||
|         loglevel=loglevel, |         loglevel=loglevel, | ||||||
|         debug_mode=Services.debug_mode,  # set by pikerd flag |         debug_mode=smngr.debug_mode,  # set by pikerd flag | ||||||
|         **extra_tractor_kwargs |         **extra_tractor_kwargs | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     # non-blocking setup of clearing service |     # non-blocking setup of clearing service | ||||||
|     from ..clearing._ems import _setup_persistent_emsd |     from ..clearing._ems import _setup_persistent_emsd | ||||||
| 
 | 
 | ||||||
|     await Services.start_service_task( |     await smngr.start_service_task( | ||||||
|         'emsd', |         'emsd', | ||||||
|         portal, |         portal, | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -18,16 +18,29 @@ | ||||||
| daemon-service management API. | daemon-service management API. | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
|  | from __future__ import annotations | ||||||
|  | from contextlib import ( | ||||||
|  |     asynccontextmanager as acm, | ||||||
|  |     # contextmanager as cm, | ||||||
|  | ) | ||||||
| from collections import defaultdict | from collections import defaultdict | ||||||
|  | from dataclasses import ( | ||||||
|  |     dataclass, | ||||||
|  |     field, | ||||||
|  | ) | ||||||
|  | import functools | ||||||
|  | import inspect | ||||||
| from typing import ( | from typing import ( | ||||||
|     Callable, |     Callable, | ||||||
|     Any, |     Any, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| import trio | import msgspec | ||||||
| from trio_typing import TaskStatus |  | ||||||
| import tractor | import tractor | ||||||
|  | import trio | ||||||
|  | from trio import TaskStatus | ||||||
| from tractor import ( | from tractor import ( | ||||||
|  |     ActorNursery, | ||||||
|     current_actor, |     current_actor, | ||||||
|     ContextCancelled, |     ContextCancelled, | ||||||
|     Context, |     Context, | ||||||
|  | @ -39,6 +52,130 @@ from ._util import ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: implement a singleton deco-API for wrapping the below | ||||||
|  | # factory's impl for general actor-singleton use? | ||||||
|  | # | ||||||
|  | # @singleton | ||||||
|  | # async def open_service_mngr( | ||||||
|  | #     **init_kwargs, | ||||||
|  | # ) -> ServiceMngr: | ||||||
|  | #     ''' | ||||||
|  | #     Note this function body is invoke IFF no existing singleton instance already | ||||||
|  | #     exists in this proc's memory. | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     # setup | ||||||
|  | #     yield ServiceMngr(**init_kwargs) | ||||||
|  | #     # teardown | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: singleton factory API instead of a class API | ||||||
|  | @acm | ||||||
|  | async def open_service_mngr( | ||||||
|  |     *, | ||||||
|  |     debug_mode: bool = False, | ||||||
|  | 
 | ||||||
|  |     # impl deat which ensures a single global instance | ||||||
|  |     _singleton: list[ServiceMngr|None] = [None], | ||||||
|  |     **init_kwargs, | ||||||
|  | 
 | ||||||
|  | ) -> ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     Open a multi-subactor-as-service-daemon tree supervisor. | ||||||
|  | 
 | ||||||
|  |     The delivered `ServiceMngr` is a singleton instance for each | ||||||
|  |     actor-process and is allocated on first open and never | ||||||
|  |     de-allocated unless explicitly deleted by al call to | ||||||
|  |     `del_service_mngr()`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # TODO: factor this an allocation into | ||||||
|  |     # a `._mngr.open_service_mngr()` and put in the | ||||||
|  |     # once-n-only-once setup/`.__aenter__()` part! | ||||||
|  |     # -[ ] how to make this only happen on the `mngr == None` case? | ||||||
|  |     #  |_ use `.trionics.maybe_open_context()` (for generic | ||||||
|  |     #     async-with-style-only-once of the factory impl, though | ||||||
|  |     #     what do we do for the allocation case? | ||||||
|  |     #    / `.maybe_open_nursery()` (since for this specific case | ||||||
|  |     #    it's simpler?) to activate | ||||||
|  |     async with ( | ||||||
|  |         tractor.open_nursery() as an, | ||||||
|  |         trio.open_nursery() as tn, | ||||||
|  |     ): | ||||||
|  |         # impl specific obvi.. | ||||||
|  |         init_kwargs.update({ | ||||||
|  |             'actor_n': an, | ||||||
|  |             'service_n': tn, | ||||||
|  |         }) | ||||||
|  | 
 | ||||||
|  |         mngr: ServiceMngr|None | ||||||
|  |         if (mngr := _singleton[0]) is None: | ||||||
|  | 
 | ||||||
|  |             log.info('Allocating a new service mngr!') | ||||||
|  |             mngr = _singleton[0] = ServiceMngr(**init_kwargs) | ||||||
|  | 
 | ||||||
|  |             # TODO: put into `.__aenter__()` section of | ||||||
|  |             # eventual `@singleton_acm` API wrapper. | ||||||
|  |             # | ||||||
|  |             # assign globally for future daemon/task creation | ||||||
|  |             mngr.actor_n = an | ||||||
|  |             mngr.service_n = tn | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             assert ( | ||||||
|  |                 mngr.actor_n | ||||||
|  |                 and | ||||||
|  |                 mngr.service_tn | ||||||
|  |             ) | ||||||
|  |             log.info( | ||||||
|  |                 'Using extant service mngr!\n\n' | ||||||
|  |                 f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         try: | ||||||
|  |             # NOTE: this is a singleton factory impl specific detail | ||||||
|  |             # which should be supported in the condensed | ||||||
|  |             # `@singleton_acm` API? | ||||||
|  |             mngr.debug_mode = debug_mode | ||||||
|  | 
 | ||||||
|  |             yield mngr | ||||||
|  |         finally: | ||||||
|  |             # TODO: is this more clever/efficient? | ||||||
|  |             # if 'samplerd' in mngr.service_tasks: | ||||||
|  |             #     await mngr.cancel_service('samplerd') | ||||||
|  |             tn.cancel_scope.cancel() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_service_mngr() -> ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     Try to get the singleton service-mngr for this actor presuming it | ||||||
|  |     has already been allocated using, | ||||||
|  | 
 | ||||||
|  |     .. code:: python | ||||||
|  | 
 | ||||||
|  |         async with open_<@singleton_acm(func)>() as mngr` | ||||||
|  |             ... this block kept open ... | ||||||
|  | 
 | ||||||
|  |     If not yet allocated raise a `ServiceError`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # https://stackoverflow.com/a/12627202 | ||||||
|  |     # https://docs.python.org/3/library/inspect.html#inspect.Signature | ||||||
|  |     maybe_mngr: ServiceMngr|None = inspect.signature( | ||||||
|  |         open_service_mngr | ||||||
|  |     ).parameters['_singleton'].default[0] | ||||||
|  | 
 | ||||||
|  |     if maybe_mngr is None: | ||||||
|  |         raise RuntimeError( | ||||||
|  |             'Someone must allocate a `ServiceMngr` using\n\n' | ||||||
|  |             '`async with open_service_mngr()` beforehand!!\n' | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     return maybe_mngr | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # TODO: we need remote wrapping and a general soln: | # TODO: we need remote wrapping and a general soln: | ||||||
| # - factor this into a ``tractor.highlevel`` extension # pack for the | # - factor this into a ``tractor.highlevel`` extension # pack for the | ||||||
| #   library. | #   library. | ||||||
|  | @ -46,31 +183,46 @@ from ._util import ( | ||||||
| #   to the pikerd actor for starting services remotely! | #   to the pikerd actor for starting services remotely! | ||||||
| # - prolly rename this to ActorServicesNursery since it spawns | # - prolly rename this to ActorServicesNursery since it spawns | ||||||
| #   new actors and supervises them to completion? | #   new actors and supervises them to completion? | ||||||
| class Services: | @dataclass | ||||||
|  | class ServiceMngr: | ||||||
|  | # class ServiceMngr(msgspec.Struct): | ||||||
|  |     ''' | ||||||
|  |     A multi-subactor-as-service manager. | ||||||
| 
 | 
 | ||||||
|     actor_n: tractor._supervise.ActorNursery |     Spawn, supervise and monitor service/daemon subactors in a SC | ||||||
|  |     process tree. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     actor_n: ActorNursery | ||||||
|     service_n: trio.Nursery |     service_n: trio.Nursery | ||||||
|     debug_mode: bool  # tractor sub-actor debug mode flag |     debug_mode: bool = False # tractor sub-actor debug mode flag | ||||||
|  | 
 | ||||||
|     service_tasks: dict[ |     service_tasks: dict[ | ||||||
|         str, |         str, | ||||||
|         tuple[ |         tuple[ | ||||||
|             trio.CancelScope, |             trio.CancelScope, | ||||||
|  |             Context, | ||||||
|             Portal, |             Portal, | ||||||
|             trio.Event, |             trio.Event, | ||||||
|         ] |         ] | ||||||
|     ] = {} |     ] = field(default_factory=dict) | ||||||
|     locks = defaultdict(trio.Lock) | 
 | ||||||
|  |     # internal per-service task mutexs | ||||||
|  |     _locks = defaultdict(trio.Lock) | ||||||
| 
 | 
 | ||||||
|     @classmethod |  | ||||||
|     async def start_service_task( |     async def start_service_task( | ||||||
|         self, |         self, | ||||||
|         name: str, |         name: str, | ||||||
|         portal: Portal, |         portal: Portal, | ||||||
|  | 
 | ||||||
|  |         # TODO: typevar for the return type of the target and then | ||||||
|  |         # use it below for `ctx_res`? | ||||||
|         target: Callable, |         target: Callable, | ||||||
|  | 
 | ||||||
|         allow_overruns: bool = False, |         allow_overruns: bool = False, | ||||||
|         **ctx_kwargs, |         **ctx_kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> (trio.CancelScope, Context): |     ) -> (trio.CancelScope, Context, Any): | ||||||
|         ''' |         ''' | ||||||
|         Open a context in a service sub-actor, add to a stack |         Open a context in a service sub-actor, add to a stack | ||||||
|         that gets unwound at ``pikerd`` teardown. |         that gets unwound at ``pikerd`` teardown. | ||||||
|  | @ -83,6 +235,7 @@ class Services: | ||||||
|             task_status: TaskStatus[ |             task_status: TaskStatus[ | ||||||
|                 tuple[ |                 tuple[ | ||||||
|                     trio.CancelScope, |                     trio.CancelScope, | ||||||
|  |                     Context, | ||||||
|                     trio.Event, |                     trio.Event, | ||||||
|                     Any, |                     Any, | ||||||
|                 ] |                 ] | ||||||
|  | @ -90,64 +243,87 @@ class Services: | ||||||
| 
 | 
 | ||||||
|         ) -> Any: |         ) -> Any: | ||||||
| 
 | 
 | ||||||
|  |             # TODO: use the ctx._scope directly here instead? | ||||||
|  |             # -[ ] actually what semantics do we expect for this | ||||||
|  |             #   usage!? | ||||||
|             with trio.CancelScope() as cs: |             with trio.CancelScope() as cs: | ||||||
|  |                 try: | ||||||
|  |                     async with portal.open_context( | ||||||
|  |                         target, | ||||||
|  |                         allow_overruns=allow_overruns, | ||||||
|  |                         **ctx_kwargs, | ||||||
| 
 | 
 | ||||||
|                 async with portal.open_context( |                     ) as (ctx, started): | ||||||
|                     target, |  | ||||||
|                     allow_overruns=allow_overruns, |  | ||||||
|                     **ctx_kwargs, |  | ||||||
| 
 | 
 | ||||||
|                 ) as (ctx, first): |                         # unblock once the remote context has started | ||||||
| 
 |                         complete = trio.Event() | ||||||
|                     # unblock once the remote context has started |                         task_status.started(( | ||||||
|                     complete = trio.Event() |                             cs, | ||||||
|                     task_status.started((cs, complete, first)) |                             ctx, | ||||||
|                     log.info( |                             complete, | ||||||
|                         f'`pikerd` service {name} started with value {first}' |                             started, | ||||||
|                     ) |                         )) | ||||||
|                     try: |                         log.info( | ||||||
|  |                             f'`pikerd` service {name} started with value {started}' | ||||||
|  |                         ) | ||||||
|                         # wait on any context's return value |                         # wait on any context's return value | ||||||
|                         # and any final portal result from the |                         # and any final portal result from the | ||||||
|                         # sub-actor. |                         # sub-actor. | ||||||
|                         ctx_res: Any = await ctx.result() |                         ctx_res: Any = await ctx.wait_for_result() | ||||||
| 
 | 
 | ||||||
|                         # NOTE: blocks indefinitely until cancelled |                         # NOTE: blocks indefinitely until cancelled | ||||||
|                         # either by error from the target context |                         # either by error from the target context | ||||||
|                         # function or by being cancelled here by the |                         # function or by being cancelled here by the | ||||||
|                         # surrounding cancel scope. |                         # surrounding cancel scope. | ||||||
|                         return (await portal.result(), ctx_res) |                         return ( | ||||||
|                     except ContextCancelled as ctxe: |                             await portal.wait_for_result(), | ||||||
|                         canceller: tuple[str, str] = ctxe.canceller |                             ctx_res, | ||||||
|                         our_uid: tuple[str, str] = current_actor().uid |                         ) | ||||||
|                         if ( |  | ||||||
|                             canceller != portal.channel.uid |  | ||||||
|                             and |  | ||||||
|                             canceller != our_uid |  | ||||||
|                         ): |  | ||||||
|                             log.cancel( |  | ||||||
|                                 f'Actor-service {name} was remotely cancelled?\n' |  | ||||||
|                                 f'remote canceller: {canceller}\n' |  | ||||||
|                                 f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' |  | ||||||
|                             ) |  | ||||||
|                         else: |  | ||||||
|                             raise |  | ||||||
| 
 | 
 | ||||||
|  |                 except ContextCancelled as ctxe: | ||||||
|  |                     canceller: tuple[str, str] = ctxe.canceller | ||||||
|  |                     our_uid: tuple[str, str] = current_actor().uid | ||||||
|  |                     if ( | ||||||
|  |                         canceller != portal.chan.uid | ||||||
|  |                         and | ||||||
|  |                         canceller != our_uid | ||||||
|  |                     ): | ||||||
|  |                         log.cancel( | ||||||
|  |                             f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||||
| 
 | 
 | ||||||
|  |                             # TODO: this would be a good spot to use | ||||||
|  |                             # a respawn feature Bo | ||||||
|  |                             f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||||
| 
 | 
 | ||||||
|                     finally: |                             f'cancellee: {portal.chan.uid}\n' | ||||||
|                         await portal.cancel_actor() |                             f'canceller: {canceller}\n' | ||||||
|                         complete.set() |                         ) | ||||||
|                         self.service_tasks.pop(name) |                     else: | ||||||
|  |                         raise | ||||||
| 
 | 
 | ||||||
|         cs, complete, first = await self.service_n.start(open_context_in_task) |                 finally: | ||||||
|  |                     # NOTE: the ctx MUST be cancelled first if we | ||||||
|  |                     # don't want the above `ctx.wait_for_result()` to | ||||||
|  |                     # raise a self-ctxc. WHY, well since from the ctx's | ||||||
|  |                     # perspective the cancel request will have | ||||||
|  |                     # arrived out-out-of-band at the `Actor.cancel()` | ||||||
|  |                     # level, thus `Context.cancel_called == False`, | ||||||
|  |                     # meaning `ctx._is_self_cancelled() == False`. | ||||||
|  |                     # with trio.CancelScope(shield=True): | ||||||
|  |                     # await ctx.cancel() | ||||||
|  |                     await portal.cancel_actor() | ||||||
|  |                     complete.set() | ||||||
|  |                     self.service_tasks.pop(name) | ||||||
|  | 
 | ||||||
|  |         cs, sub_ctx, complete, started = await self.service_n.start( | ||||||
|  |             open_context_in_task | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         # store the cancel scope and portal for later cancellation or |         # store the cancel scope and portal for later cancellation or | ||||||
|         # retstart if needed. |         # retstart if needed. | ||||||
|         self.service_tasks[name] = (cs, portal, complete) |         self.service_tasks[name] = (cs, sub_ctx, portal, complete) | ||||||
|  |         return cs, sub_ctx, started | ||||||
| 
 | 
 | ||||||
|         return cs, first |  | ||||||
| 
 |  | ||||||
|     @classmethod |  | ||||||
|     async def cancel_service( |     async def cancel_service( | ||||||
|         self, |         self, | ||||||
|         name: str, |         name: str, | ||||||
|  | @ -158,8 +334,80 @@ class Services: | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         log.info(f'Cancelling `pikerd` service {name}') |         log.info(f'Cancelling `pikerd` service {name}') | ||||||
|         cs, portal, complete = self.service_tasks[name] |         cs, sub_ctx, portal, complete = self.service_tasks[name] | ||||||
|         cs.cancel() | 
 | ||||||
|  |         # cs.cancel() | ||||||
|  |         await sub_ctx.cancel() | ||||||
|         await complete.wait() |         await complete.wait() | ||||||
|         assert name not in self.service_tasks, \ | 
 | ||||||
|             f'Serice task for {name} not terminated?' |         if name in self.service_tasks: | ||||||
|  |             # TODO: custom err? | ||||||
|  |             # raise ServiceError( | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 f'Serice task for {name} not terminated?' | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         # assert name not in self.service_tasks, \ | ||||||
|  |         #     f'Serice task for {name} not terminated?' | ||||||
|  | 
 | ||||||
|  |     async def start_service( | ||||||
|  |         self, | ||||||
|  |         daemon_name: str, | ||||||
|  |         ctx_ep: Callable,  # kwargs must `partial`-ed in! | ||||||
|  | 
 | ||||||
|  |         debug_mode: bool = False, | ||||||
|  |         **tractor_actor_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> Context: | ||||||
|  |         ''' | ||||||
|  |         Start a "service" task in a new sub-actor (daemon) and manage it's lifetime | ||||||
|  |         indefinitely. | ||||||
|  | 
 | ||||||
|  |         Services can be cancelled/shutdown using `.cancel_service()`. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         entry: tuple|None = self.service_tasks.get(daemon_name) | ||||||
|  |         if entry: | ||||||
|  |             (cs, sub_ctx, portal, complete) = entry | ||||||
|  |             return sub_ctx | ||||||
|  | 
 | ||||||
|  |         if daemon_name not in self.service_tasks: | ||||||
|  |             portal = await self.actor_n.start_actor( | ||||||
|  |                 daemon_name, | ||||||
|  |                 debug_mode=(  # maybe set globally during allocate | ||||||
|  |                     debug_mode | ||||||
|  |                     or | ||||||
|  |                     self.debug_mode | ||||||
|  |                 ), | ||||||
|  |                 **tractor_actor_kwargs, | ||||||
|  |             ) | ||||||
|  |             ctx_kwargs: dict[str, Any] = {} | ||||||
|  |             if isinstance(ctx_ep, functools.partial): | ||||||
|  |                 ctx_kwargs: dict[str, Any] = ctx_ep.keywords | ||||||
|  |                 ctx_ep: Callable = ctx_ep.func | ||||||
|  | 
 | ||||||
|  |             (cs, sub_ctx, started) = await self.start_service_task( | ||||||
|  |                 daemon_name, | ||||||
|  |                 portal, | ||||||
|  |                 ctx_ep, | ||||||
|  |                 **ctx_kwargs, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             return sub_ctx | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: | ||||||
|  | # -[ ] factor all the common shit from `.data._sampling` | ||||||
|  | #   and `.brokers._daemon` into here / `ServiceMngr` | ||||||
|  | #   in terms of allocating the `Portal` as part of the | ||||||
|  | #   "service-in-subactor" starting! | ||||||
|  | # -[ ] move to `tractor.hilevel._service`, import and use here! | ||||||
|  | # NOTE: purposely leaks the ref to the mod-scope Bo | ||||||
|  | # import tractor | ||||||
|  | # from tractor.hilevel import ( | ||||||
|  | #     open_service_mngr, | ||||||
|  | #     ServiceMngr, | ||||||
|  | # ) | ||||||
|  | # mngr: ServiceMngr|None = None | ||||||
|  | # with tractor.hilevel.open_service_mngr() as mngr: | ||||||
|  | #     Services = proxy(mngr) | ||||||
|  |  | ||||||
|  | @ -21,11 +21,13 @@ from typing import ( | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | # TODO: oof, needs to be changed to `httpx`! | ||||||
| import asks | import asks | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|     import docker |     import docker | ||||||
|     from ._ahab import DockerContainer |     from ._ahab import DockerContainer | ||||||
|  |     from . import ServiceMngr | ||||||
| 
 | 
 | ||||||
| from ._util import log  # sub-sys logger | from ._util import log  # sub-sys logger | ||||||
| from ._util import ( | from ._util import ( | ||||||
|  | @ -127,7 +129,7 @@ def start_elasticsearch( | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def start_ahab_daemon( | async def start_ahab_daemon( | ||||||
|     service_mngr: Services, |     service_mngr: ServiceMngr, | ||||||
|     user_config: dict | None = None, |     user_config: dict | None = None, | ||||||
|     loglevel: str | None = None, |     loglevel: str | None = None, | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -53,7 +53,7 @@ import pendulum | ||||||
| # import purerpc | # import purerpc | ||||||
| 
 | 
 | ||||||
| from ..data.feed import maybe_open_feed | from ..data.feed import maybe_open_feed | ||||||
| from . import Services | from . import ServiceMngr | ||||||
| from ._util import ( | from ._util import ( | ||||||
|     log,  # sub-sys logger |     log,  # sub-sys logger | ||||||
|     get_console_log, |     get_console_log, | ||||||
|  | @ -233,7 +233,7 @@ def start_marketstore( | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def start_ahab_daemon( | async def start_ahab_daemon( | ||||||
|     service_mngr: Services, |     service_mngr: ServiceMngr, | ||||||
|     user_config: dict | None = None, |     user_config: dict | None = None, | ||||||
|     loglevel: str | None = None, |     loglevel: str | None = None, | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -10,7 +10,7 @@ from piker import ( | ||||||
|     config, |     config, | ||||||
| ) | ) | ||||||
| from piker.service import ( | from piker.service import ( | ||||||
|     Services, |     get_service_mngr, | ||||||
| ) | ) | ||||||
| from piker.log import get_console_log | from piker.log import get_console_log | ||||||
| 
 | 
 | ||||||
|  | @ -129,7 +129,7 @@ async def _open_test_pikerd( | ||||||
|         ) as service_manager, |         ) as service_manager, | ||||||
|     ): |     ): | ||||||
|         # this proc/actor is the pikerd |         # this proc/actor is the pikerd | ||||||
|         assert service_manager is Services |         assert service_manager is get_service_mngr() | ||||||
| 
 | 
 | ||||||
|         async with tractor.wait_for_actor( |         async with tractor.wait_for_actor( | ||||||
|             'pikerd', |             'pikerd', | ||||||
|  |  | ||||||
|  | @ -26,7 +26,7 @@ import pytest | ||||||
| import tractor | import tractor | ||||||
| from uuid import uuid4 | from uuid import uuid4 | ||||||
| 
 | 
 | ||||||
| from piker.service import Services | from piker.service import ServiceMngr | ||||||
| from piker.log import get_logger | from piker.log import get_logger | ||||||
| from piker.clearing._messages import ( | from piker.clearing._messages import ( | ||||||
|     Order, |     Order, | ||||||
|  | @ -158,7 +158,7 @@ def load_and_check_pos( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def test_ems_err_on_bad_broker( | def test_ems_err_on_bad_broker( | ||||||
|     open_test_pikerd: Services, |     open_test_pikerd: ServiceMngr, | ||||||
|     loglevel: str, |     loglevel: str, | ||||||
| ): | ): | ||||||
|     async def load_bad_fqme(): |     async def load_bad_fqme(): | ||||||
|  |  | ||||||
|  | @ -15,7 +15,7 @@ import tractor | ||||||
| 
 | 
 | ||||||
| from piker.service import ( | from piker.service import ( | ||||||
|     find_service, |     find_service, | ||||||
|     Services, |     ServiceMngr, | ||||||
| ) | ) | ||||||
| from piker.data import ( | from piker.data import ( | ||||||
|     open_feed, |     open_feed, | ||||||
|  | @ -44,7 +44,7 @@ def test_runtime_boot( | ||||||
|     async def main(): |     async def main(): | ||||||
|         port = 6666 |         port = 6666 | ||||||
|         daemon_addr = ('127.0.0.1', port) |         daemon_addr = ('127.0.0.1', port) | ||||||
|         services: Services |         services: ServiceMngr | ||||||
| 
 | 
 | ||||||
|         async with ( |         async with ( | ||||||
|             open_test_pikerd( |             open_test_pikerd( | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue